1 /** 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "logging.h" 18 #include "status.h" 19 #include "worker.h" 20 21 #include <time.h> 22 23 //#define WORKER_DEBUG 24 #ifdef WORKER_DEBUG 25 26 #define DBG(...) LOGD(__VA_ARGS__) 27 28 #else 29 30 #define DBG(...) 31 32 #endif 33 34 void * WorkerThread::Work(void *param) { 35 WorkerThread *t = (WorkerThread *)param; 36 android_atomic_acquire_store(STATE_RUNNING, &t->state_); 37 void * v = t->Worker(t->workerParam_); 38 android_atomic_acquire_store(STATE_STOPPED, &t->state_); 39 return v; 40 } 41 42 bool WorkerThread::isRunning() { 43 DBG("WorkerThread::isRunning E"); 44 bool ret_value = android_atomic_acquire_load(&state_) == STATE_RUNNING; 45 DBG("WorkerThread::isRunning X ret_value=%d", ret_value); 46 return ret_value; 47 } 48 49 WorkerThread::WorkerThread() { 50 DBG("WorkerThread::WorkerThread E"); 51 state_ = STATE_INITIALIZED; 52 pthread_mutex_init(&mutex_, NULL); 53 pthread_cond_init(&cond_, NULL); 54 DBG("WorkerThread::WorkerThread X"); 55 } 56 57 WorkerThread::~WorkerThread() { 58 DBG("WorkerThread::~WorkerThread E"); 59 Stop(); 60 pthread_mutex_destroy(&mutex_); 61 DBG("WorkerThread::~WorkerThread X"); 62 } 63 64 // Return true if changed from STATE_RUNNING to STATE_STOPPING 65 bool WorkerThread::BeginStopping() { 66 DBG("WorkerThread::BeginStopping E"); 67 bool ret_value = (android_atomic_acquire_cas(STATE_RUNNING, STATE_STOPPING, &state_) == 0); 68 DBG("WorkerThread::BeginStopping X ret_value=%d", ret_value); 69 return ret_value; 70 } 71 72 // Wait until state is not STATE_STOPPING 73 void WorkerThread::WaitUntilStopped() { 74 DBG("WorkerThread::WaitUntilStopped E"); 75 pthread_cond_signal(&cond_); 76 while(android_atomic_release_load(&state_) == STATE_STOPPING) { 77 usleep(200000); 78 } 79 DBG("WorkerThread::WaitUntilStopped X"); 80 } 81 82 void WorkerThread::Stop() { 83 DBG("WorkerThread::Stop E"); 84 if (BeginStopping()) { 85 WaitUntilStopped(); 86 } 87 DBG("WorkerThread::Stop X"); 88 } 89 90 int WorkerThread::Run(void *workerParam) { 91 DBG("WorkerThread::Run E workerParam=%p", workerParam); 92 int status; 93 int ret; 94 95 workerParam_ = workerParam; 96 97 ret = pthread_attr_init(&attr_); 98 if (ret != 0) { 99 LOGE("RIL_Init X: pthread_attr_init failed err=%s", strerror(ret)); 100 return STATUS_ERR; 101 } 102 ret = pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_DETACHED); 103 if (ret != 0) { 104 LOGE("RIL_Init X: pthread_attr_setdetachstate failed err=%s", 105 strerror(ret)); 106 return STATUS_ERR; 107 } 108 ret = pthread_create(&tid_, &attr_, 109 (void * (*)(void *))&WorkerThread::Work, this); 110 if (ret != 0) { 111 LOGE("RIL_Init X: pthread_create failed err=%s", strerror(ret)); 112 return STATUS_ERR; 113 } 114 115 // Wait until worker is running 116 while (android_atomic_acquire_load(&state_) == STATE_INITIALIZED) { 117 usleep(200000); 118 } 119 120 DBG("WorkerThread::Run X workerParam=%p", workerParam); 121 return STATUS_OK; 122 } 123 124 125 class WorkerQueueThread : public WorkerThread { 126 private: 127 friend class WorkerQueue; 128 129 public: 130 WorkerQueueThread() { 131 } 132 133 virtual ~WorkerQueueThread() { 134 Stop(); 135 } 136 137 void * Worker(void *param) { 138 DBG("WorkerQueueThread::Worker E"); 139 WorkerQueue *wq = (WorkerQueue *)param; 140 141 // Do the work until we're told to stop 142 while (isRunning()) { 143 pthread_mutex_lock(&mutex_); 144 while (isRunning() && wq->q_.size() == 0) { 145 if (wq->delayed_q_.size() == 0) { 146 // Both queue's are empty so wait 147 pthread_cond_wait(&cond_, &mutex_); 148 } else { 149 // delayed_q_ is not empty, move any 150 // timed out records to q_. 151 int64_t now = android::elapsedRealtime(); 152 while((wq->delayed_q_.size() != 0) && 153 ((wq->delayed_q_.top()->time - now) <= 0)) { 154 struct WorkerQueue::Record *r = wq->delayed_q_.top(); 155 DBG("WorkerQueueThread::Worker move p=%p time=%lldms", 156 r->p, r->time); 157 wq->delayed_q_.pop(); 158 wq->q_.push_back(r); 159 } 160 161 if ((wq->q_.size() == 0) && (wq->delayed_q_.size() != 0)) { 162 // We need to do a timed wait 163 struct timeval tv; 164 struct timespec ts; 165 struct WorkerQueue::Record *r = wq->delayed_q_.top(); 166 int64_t delay_ms = r->time - now; 167 DBG("WorkerQueueThread::Worker wait" 168 " p=%p time=%lldms delay_ms=%lldms", 169 r->p, r->time, delay_ms); 170 gettimeofday(&tv, NULL); 171 ts.tv_sec = tv.tv_sec + (delay_ms / 1000); 172 ts.tv_nsec = (tv.tv_usec + 173 ((delay_ms % 1000) * 1000)) * 1000; 174 pthread_cond_timedwait(&cond_, &mutex_, &ts); 175 } 176 } 177 } 178 if (isRunning()) { 179 struct WorkerQueue::Record *r = wq->q_.front(); 180 wq->q_.pop_front(); 181 void *p = r->p; 182 wq->release_record(r); 183 pthread_mutex_unlock(&mutex_); 184 wq->Process(r->p); 185 } else { 186 pthread_mutex_unlock(&mutex_); 187 } 188 } 189 DBG("WorkerQueueThread::Worker X"); 190 return NULL; 191 } 192 }; 193 194 WorkerQueue::WorkerQueue() { 195 DBG("WorkerQueue::WorkerQueue E"); 196 wqt_ = new WorkerQueueThread(); 197 DBG("WorkerQueue::WorkerQueue X"); 198 } 199 200 WorkerQueue::~WorkerQueue() { 201 DBG("WorkerQueue::~WorkerQueue E"); 202 Stop(); 203 204 Record *r; 205 pthread_mutex_lock(&wqt_->mutex_); 206 while(free_list_.size() != 0) { 207 r = free_list_.front(); 208 free_list_.pop_front(); 209 DBG("WorkerQueue::~WorkerQueue delete free_list_ r=%p", r); 210 delete r; 211 } 212 while(delayed_q_.size() != 0) { 213 r = delayed_q_.top(); 214 delayed_q_.pop(); 215 DBG("WorkerQueue::~WorkerQueue delete delayed_q_ r=%p", r); 216 delete r; 217 } 218 pthread_mutex_unlock(&wqt_->mutex_); 219 220 delete wqt_; 221 DBG("WorkerQueue::~WorkerQueue X"); 222 } 223 224 int WorkerQueue::Run() { 225 return wqt_->Run(this); 226 } 227 228 void WorkerQueue::Stop() { 229 wqt_->Stop(); 230 } 231 232 /** 233 * Obtain a record from free_list if it is not empty, fill in the record with provided 234 * information: *p and delay_in_ms 235 */ 236 struct WorkerQueue::Record *WorkerQueue::obtain_record(void *p, int delay_in_ms) { 237 struct Record *r; 238 if (free_list_.size() == 0) { 239 r = new Record(); 240 DBG("WorkerQueue::obtain_record new r=%p", r); 241 } else { 242 r = free_list_.front(); 243 DBG("WorkerQueue::obtain_record reuse r=%p", r); 244 free_list_.pop_front(); 245 } 246 r->p = p; 247 if (delay_in_ms != 0) { 248 r->time = android::elapsedRealtime() + delay_in_ms; 249 } else { 250 r->time = 0; 251 } 252 return r; 253 } 254 255 /** 256 * release a record and insert into the front of the free_list 257 */ 258 void WorkerQueue::release_record(struct Record *r) { 259 DBG("WorkerQueue::release_record r=%p", r); 260 free_list_.push_front(r); 261 } 262 263 /** 264 * Add a record to processing queue q_ 265 */ 266 void WorkerQueue::Add(void *p) { 267 DBG("WorkerQueue::Add E:"); 268 pthread_mutex_lock(&wqt_->mutex_); 269 struct Record *r = obtain_record(p, 0); 270 q_.push_back(r); 271 if (q_.size() == 1) { 272 pthread_cond_signal(&wqt_->cond_); 273 } 274 pthread_mutex_unlock(&wqt_->mutex_); 275 DBG("WorkerQueue::Add X:"); 276 } 277 278 void WorkerQueue::AddDelayed(void *p, int delay_in_ms) { 279 DBG("WorkerQueue::AddDelayed E:"); 280 if (delay_in_ms <= 0) { 281 Add(p); 282 } else { 283 pthread_mutex_lock(&wqt_->mutex_); 284 struct Record *r = obtain_record(p, delay_in_ms); 285 delayed_q_.push(r); 286 #ifdef WORKER_DEBUG 287 int64_t now = android::elapsedRealtime(); 288 DBG("WorkerQueue::AddDelayed" 289 " p=%p delay_in_ms=%d now=%lldms top->p=%p" 290 " top->time=%lldms diff=%lldms", 291 p, delay_in_ms, now, delayed_q_.top()->p, 292 delayed_q_.top()->time, delayed_q_.top()->time - now); 293 #endif 294 if ((q_.size() == 0) && (delayed_q_.top() == r)) { 295 // q_ is empty and the new record is at delayed_q_.top 296 // so we signal the waiting thread so it can readjust 297 // the wait time. 298 DBG("WorkerQueue::AddDelayed signal"); 299 pthread_cond_signal(&wqt_->cond_); 300 } 301 pthread_mutex_unlock(&wqt_->mutex_); 302 } 303 DBG("WorkerQueue::AddDelayed X:"); 304 } 305 306 307 class TestWorkerQueue : public WorkerQueue { 308 virtual void Process(void *p) { 309 LOGD("TestWorkerQueue::Process: EX p=%p", p); 310 } 311 }; 312 313 class TesterThread : public WorkerThread { 314 public: 315 void * Worker(void *param) 316 { 317 LOGD("TesterThread::Worker E param=%p", param); 318 WorkerQueue *wq = (WorkerQueue *)param; 319 320 // Test AddDelayed 321 wq->AddDelayed((void *)1000, 1000); 322 wq->Add((void *)0); 323 wq->Add((void *)0); 324 wq->Add((void *)0); 325 wq->Add((void *)0); 326 wq->AddDelayed((void *)100, 100); 327 wq->AddDelayed((void *)2000, 2000); 328 329 for (int i = 1; isRunning(); i++) { 330 LOGD("TesterThread: looping %d", i); 331 wq->Add((void *)i); 332 wq->Add((void *)i); 333 wq->Add((void *)i); 334 wq->Add((void *)i); 335 sleep(1); 336 } 337 338 LOGD("TesterThread::Worker X param=%p", param); 339 340 return NULL; 341 } 342 }; 343 344 void testWorker() { 345 LOGD("testWorker E: ********"); 346 347 // Test we can create a thread and delete it 348 TesterThread *tester = new TesterThread(); 349 delete tester; 350 351 TestWorkerQueue *wq = new TestWorkerQueue(); 352 if (wq->Run() == STATUS_OK) { 353 LOGD("testWorker WorkerQueue %p running", wq); 354 355 // Test we can run a thread, stop it then delete it 356 tester = new TesterThread(); 357 tester->Run(wq); 358 LOGD("testWorker tester %p running", tester); 359 sleep(10); 360 LOGD("testWorker tester %p stopping", tester); 361 tester->Stop(); 362 LOGD("testWorker tester %p stopped", tester); 363 wq->Stop(); 364 LOGD("testWorker wq %p stopped", wq); 365 } 366 LOGD("testWorker X: ********\n"); 367 } 368