Home | History | Annotate | Download | only in cpp
      1 /**
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include "logging.h"
     18 #include "status.h"
     19 #include "worker.h"
     20 
     21 #include <time.h>
     22 
     23 //#define WORKER_DEBUG
     24 #ifdef  WORKER_DEBUG
     25 
     26 #define DBG(...) LOGD(__VA_ARGS__)
     27 
     28 #else
     29 
     30 #define DBG(...)
     31 
     32 #endif
     33 
     34 void * WorkerThread::Work(void *param) {
     35     WorkerThread *t = (WorkerThread *)param;
     36     android_atomic_acquire_store(STATE_RUNNING, &t->state_);
     37     void * v = t->Worker(t->workerParam_);
     38     android_atomic_acquire_store(STATE_STOPPED, &t->state_);
     39     return v;
     40 }
     41 
     42 bool WorkerThread::isRunning() {
     43     DBG("WorkerThread::isRunning E");
     44     bool ret_value = android_atomic_acquire_load(&state_) == STATE_RUNNING;
     45     DBG("WorkerThread::isRunning X ret_value=%d", ret_value);
     46     return ret_value;
     47 }
     48 
     49 WorkerThread::WorkerThread() {
     50     DBG("WorkerThread::WorkerThread E");
     51     state_ = STATE_INITIALIZED;
     52     pthread_mutex_init(&mutex_, NULL);
     53     pthread_cond_init(&cond_, NULL);
     54     DBG("WorkerThread::WorkerThread X");
     55 }
     56 
     57 WorkerThread::~WorkerThread() {
     58     DBG("WorkerThread::~WorkerThread E");
     59     Stop();
     60     pthread_mutex_destroy(&mutex_);
     61     DBG("WorkerThread::~WorkerThread X");
     62 }
     63 
     64 // Return true if changed from STATE_RUNNING to STATE_STOPPING
     65 bool WorkerThread::BeginStopping() {
     66     DBG("WorkerThread::BeginStopping E");
     67     bool ret_value = (android_atomic_acquire_cas(STATE_RUNNING, STATE_STOPPING, &state_) == 0);
     68     DBG("WorkerThread::BeginStopping X ret_value=%d", ret_value);
     69     return ret_value;
     70 }
     71 
     72 // Wait until state is not STATE_STOPPING
     73 void WorkerThread::WaitUntilStopped() {
     74     DBG("WorkerThread::WaitUntilStopped E");
     75     pthread_cond_signal(&cond_);
     76     while(android_atomic_release_load(&state_) == STATE_STOPPING) {
     77         usleep(200000);
     78     }
     79     DBG("WorkerThread::WaitUntilStopped X");
     80 }
     81 
     82 void WorkerThread::Stop() {
     83     DBG("WorkerThread::Stop E");
     84     if (BeginStopping()) {
     85         WaitUntilStopped();
     86     }
     87     DBG("WorkerThread::Stop X");
     88 }
     89 
     90 int WorkerThread::Run(void *workerParam) {
     91     DBG("WorkerThread::Run E workerParam=%p", workerParam);
     92     int status;
     93     int ret;
     94 
     95     workerParam_ = workerParam;
     96 
     97     ret = pthread_attr_init(&attr_);
     98     if (ret != 0) {
     99         LOGE("RIL_Init X: pthread_attr_init failed err=%s", strerror(ret));
    100         return STATUS_ERR;
    101     }
    102     ret = pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_DETACHED);
    103     if (ret != 0) {
    104         LOGE("RIL_Init X: pthread_attr_setdetachstate failed err=%s",
    105                 strerror(ret));
    106         return STATUS_ERR;
    107     }
    108     ret = pthread_create(&tid_, &attr_,
    109                 (void * (*)(void *))&WorkerThread::Work, this);
    110     if (ret != 0) {
    111         LOGE("RIL_Init X: pthread_create failed err=%s", strerror(ret));
    112         return STATUS_ERR;
    113     }
    114 
    115     // Wait until worker is running
    116     while (android_atomic_acquire_load(&state_) == STATE_INITIALIZED) {
    117         usleep(200000);
    118     }
    119 
    120     DBG("WorkerThread::Run X workerParam=%p", workerParam);
    121     return STATUS_OK;
    122 }
    123 
    124 
    125 class WorkerQueueThread : public WorkerThread {
    126   private:
    127     friend class WorkerQueue;
    128 
    129   public:
    130     WorkerQueueThread() {
    131     }
    132 
    133     virtual ~WorkerQueueThread() {
    134         Stop();
    135     }
    136 
    137     void * Worker(void *param) {
    138         DBG("WorkerQueueThread::Worker E");
    139         WorkerQueue *wq = (WorkerQueue *)param;
    140 
    141         // Do the work until we're told to stop
    142         while (isRunning()) {
    143             pthread_mutex_lock(&mutex_);
    144             while (isRunning() && wq->q_.size() == 0) {
    145                 if (wq->delayed_q_.size() == 0) {
    146                     // Both queue's are empty so wait
    147                     pthread_cond_wait(&cond_, &mutex_);
    148                 } else {
    149                     // delayed_q_ is not empty, move any
    150                     // timed out records to q_.
    151                     int64_t now = android::elapsedRealtime();
    152                     while((wq->delayed_q_.size() != 0) &&
    153                             ((wq->delayed_q_.top()->time - now) <= 0)) {
    154                         struct WorkerQueue::Record *r = wq->delayed_q_.top();
    155                         DBG("WorkerQueueThread::Worker move p=%p time=%lldms",
    156                                 r->p, r->time);
    157                         wq->delayed_q_.pop();
    158                         wq->q_.push_back(r);
    159                     }
    160 
    161                     if ((wq->q_.size() == 0) && (wq->delayed_q_.size() != 0)) {
    162                         // We need to do a timed wait
    163                         struct timeval tv;
    164                         struct timespec ts;
    165                         struct WorkerQueue::Record *r = wq->delayed_q_.top();
    166                         int64_t delay_ms = r->time - now;
    167                         DBG("WorkerQueueThread::Worker wait"
    168                             " p=%p time=%lldms delay_ms=%lldms",
    169                                 r->p, r->time, delay_ms);
    170                         gettimeofday(&tv, NULL);
    171                         ts.tv_sec = tv.tv_sec + (delay_ms / 1000);
    172                         ts.tv_nsec = (tv.tv_usec +
    173                                         ((delay_ms % 1000) * 1000)) * 1000;
    174                         pthread_cond_timedwait(&cond_, &mutex_, &ts);
    175                     }
    176                 }
    177             }
    178             if (isRunning()) {
    179                 struct WorkerQueue::Record *r = wq->q_.front();
    180                 wq->q_.pop_front();
    181                 void *p = r->p;
    182                 wq->release_record(r);
    183                 pthread_mutex_unlock(&mutex_);
    184                 wq->Process(r->p);
    185             } else {
    186                 pthread_mutex_unlock(&mutex_);
    187             }
    188         }
    189         DBG("WorkerQueueThread::Worker X");
    190         return NULL;
    191     }
    192 };
    193 
    194 WorkerQueue::WorkerQueue() {
    195     DBG("WorkerQueue::WorkerQueue E");
    196     wqt_ = new WorkerQueueThread();
    197     DBG("WorkerQueue::WorkerQueue X");
    198 }
    199 
    200 WorkerQueue::~WorkerQueue() {
    201     DBG("WorkerQueue::~WorkerQueue E");
    202     Stop();
    203 
    204     Record *r;
    205     pthread_mutex_lock(&wqt_->mutex_);
    206     while(free_list_.size() != 0) {
    207         r = free_list_.front();
    208         free_list_.pop_front();
    209         DBG("WorkerQueue::~WorkerQueue delete free_list_ r=%p", r);
    210         delete r;
    211     }
    212     while(delayed_q_.size() != 0) {
    213         r = delayed_q_.top();
    214         delayed_q_.pop();
    215         DBG("WorkerQueue::~WorkerQueue delete delayed_q_ r=%p", r);
    216         delete r;
    217     }
    218     pthread_mutex_unlock(&wqt_->mutex_);
    219 
    220     delete wqt_;
    221     DBG("WorkerQueue::~WorkerQueue X");
    222 }
    223 
    224 int WorkerQueue::Run() {
    225     return wqt_->Run(this);
    226 }
    227 
    228 void WorkerQueue::Stop() {
    229     wqt_->Stop();
    230 }
    231 
    232 /**
    233  * Obtain a record from free_list if it is not empty, fill in the record with provided
    234  * information: *p and delay_in_ms
    235  */
    236 struct WorkerQueue::Record *WorkerQueue::obtain_record(void *p, int delay_in_ms) {
    237     struct Record *r;
    238     if (free_list_.size() == 0) {
    239         r = new Record();
    240         DBG("WorkerQueue::obtain_record new r=%p", r);
    241     } else {
    242         r = free_list_.front();
    243         DBG("WorkerQueue::obtain_record reuse r=%p", r);
    244         free_list_.pop_front();
    245     }
    246     r->p = p;
    247     if (delay_in_ms != 0) {
    248         r->time = android::elapsedRealtime() + delay_in_ms;
    249     } else {
    250         r->time = 0;
    251     }
    252     return r;
    253 }
    254 
    255 /**
    256  * release a record and insert into the front of the free_list
    257  */
    258 void WorkerQueue::release_record(struct Record *r) {
    259     DBG("WorkerQueue::release_record r=%p", r);
    260     free_list_.push_front(r);
    261 }
    262 
    263 /**
    264  * Add a record to processing queue q_
    265  */
    266 void WorkerQueue::Add(void *p) {
    267     DBG("WorkerQueue::Add E:");
    268     pthread_mutex_lock(&wqt_->mutex_);
    269     struct Record *r = obtain_record(p, 0);
    270     q_.push_back(r);
    271     if (q_.size() == 1) {
    272         pthread_cond_signal(&wqt_->cond_);
    273     }
    274     pthread_mutex_unlock(&wqt_->mutex_);
    275     DBG("WorkerQueue::Add X:");
    276 }
    277 
    278 void WorkerQueue::AddDelayed(void *p, int delay_in_ms) {
    279     DBG("WorkerQueue::AddDelayed E:");
    280     if (delay_in_ms <= 0) {
    281         Add(p);
    282     } else {
    283         pthread_mutex_lock(&wqt_->mutex_);
    284         struct Record *r = obtain_record(p, delay_in_ms);
    285         delayed_q_.push(r);
    286 #ifdef WORKER_DEBUG
    287         int64_t now = android::elapsedRealtime();
    288         DBG("WorkerQueue::AddDelayed"
    289             " p=%p delay_in_ms=%d now=%lldms top->p=%p"
    290             " top->time=%lldms diff=%lldms",
    291                 p, delay_in_ms, now, delayed_q_.top()->p,
    292                 delayed_q_.top()->time, delayed_q_.top()->time - now);
    293 #endif
    294         if ((q_.size() == 0) && (delayed_q_.top() == r)) {
    295             // q_ is empty and the new record is at delayed_q_.top
    296             // so we signal the waiting thread so it can readjust
    297             // the wait time.
    298             DBG("WorkerQueue::AddDelayed signal");
    299             pthread_cond_signal(&wqt_->cond_);
    300         }
    301         pthread_mutex_unlock(&wqt_->mutex_);
    302     }
    303     DBG("WorkerQueue::AddDelayed X:");
    304 }
    305 
    306 
    307 class TestWorkerQueue : public WorkerQueue {
    308     virtual void Process(void *p) {
    309         LOGD("TestWorkerQueue::Process: EX p=%p", p);
    310     }
    311 };
    312 
    313 class TesterThread : public WorkerThread {
    314   public:
    315     void * Worker(void *param)
    316     {
    317         LOGD("TesterThread::Worker E param=%p", param);
    318         WorkerQueue *wq = (WorkerQueue *)param;
    319 
    320         // Test AddDelayed
    321         wq->AddDelayed((void *)1000, 1000);
    322         wq->Add((void *)0);
    323         wq->Add((void *)0);
    324         wq->Add((void *)0);
    325         wq->Add((void *)0);
    326         wq->AddDelayed((void *)100, 100);
    327         wq->AddDelayed((void *)2000, 2000);
    328 
    329         for (int i = 1; isRunning(); i++) {
    330             LOGD("TesterThread: looping %d", i);
    331             wq->Add((void *)i);
    332             wq->Add((void *)i);
    333             wq->Add((void *)i);
    334             wq->Add((void *)i);
    335             sleep(1);
    336         }
    337 
    338         LOGD("TesterThread::Worker X param=%p", param);
    339 
    340         return NULL;
    341     }
    342 };
    343 
    344 void testWorker() {
    345     LOGD("testWorker E: ********");
    346 
    347     // Test we can create a thread and delete it
    348     TesterThread *tester = new TesterThread();
    349     delete tester;
    350 
    351     TestWorkerQueue *wq = new TestWorkerQueue();
    352     if (wq->Run() == STATUS_OK) {
    353         LOGD("testWorker WorkerQueue %p running", wq);
    354 
    355         // Test we can run a thread, stop it then delete it
    356         tester = new TesterThread();
    357         tester->Run(wq);
    358         LOGD("testWorker tester %p running", tester);
    359         sleep(10);
    360         LOGD("testWorker tester %p stopping", tester);
    361         tester->Stop();
    362         LOGD("testWorker tester %p stopped", tester);
    363         wq->Stop();
    364         LOGD("testWorker wq %p stopped", wq);
    365     }
    366     LOGD("testWorker X: ********\n");
    367 }
    368