Home | History | Annotate | Download | only in src
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 /* ThreadPool */
     18 
     19 #include "sles_allinclusive.h"
     20 
     21 // Entry point for each worker thread
     22 
     23 static void *ThreadPool_start(void *context)
     24 {
     25     ThreadPool *tp = (ThreadPool *) context;
     26     assert(NULL != tp);
     27     for (;;) {
     28         Closure *pClosure = ThreadPool_remove(tp);
     29         // closure is NULL when thread pool is being destroyed
     30         if (NULL == pClosure) {
     31             break;
     32         }
     33         // make a copy of parameters, then free the parameters
     34         const Closure closure = *pClosure;
     35         free(pClosure);
     36         // extract parameters and call the right method depending on kind
     37         ClosureKind kind = closure.mKind;
     38         void *context1 = closure.mContext1;
     39         void *context2 = closure.mContext2;
     40         int parameter1 = closure.mParameter1;
     41         switch (kind) {
     42           case CLOSURE_KIND_PPI:
     43             {
     44             ClosureHandler_ppi handler_ppi = closure.mHandler.mHandler_ppi;
     45             assert(NULL != handler_ppi);
     46             (*handler_ppi)(context1, context2, parameter1);
     47             }
     48             break;
     49           case CLOSURE_KIND_PPII:
     50             {
     51             ClosureHandler_ppii handler_ppii = closure.mHandler.mHandler_ppii;
     52             assert(NULL != handler_ppii);
     53             int parameter2 = closure.mParameter2;
     54             (*handler_ppii)(context1, context2, parameter1, parameter2);
     55             }
     56             break;
     57           case CLOSURE_KIND_PIIPP:
     58             {
     59             ClosureHandler_piipp handler_piipp = closure.mHandler.mHandler_piipp;
     60             assert(NULL != handler_piipp);
     61             int parameter2 = closure.mParameter2;
     62             void *context3 = closure.mContext3;
     63             (*handler_piipp)(context1, parameter1, parameter2, context2, context3);
     64             }
     65             break;
     66           default:
     67             SL_LOGE("Unexpected callback kind %d", kind);
     68             assert(false);
     69             break;
     70         }
     71     }
     72     return NULL;
     73 }
     74 
     75 #define INITIALIZED_NONE         0
     76 #define INITIALIZED_MUTEX        1
     77 #define INITIALIZED_CONDNOTFULL  2
     78 #define INITIALIZED_CONDNOTEMPTY 4
     79 #define INITIALIZED_ALL          7
     80 
     81 static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads);
     82 
     83 // Initialize a ThreadPool
     84 // maxClosures defaults to CLOSURE_TYPICAL if 0
     85 // maxThreads defaults to THREAD_TYPICAL if 0
     86 
     87 SLresult ThreadPool_init(ThreadPool *tp, unsigned maxClosures, unsigned maxThreads)
     88 {
     89     assert(NULL != tp);
     90     memset(tp, 0, sizeof(ThreadPool));
     91     tp->mShutdown = SL_BOOLEAN_FALSE;
     92     unsigned initialized = INITIALIZED_NONE;    // which objects were successfully initialized
     93     unsigned nThreads = 0;                      // number of threads successfully created
     94     int err;
     95     SLresult result;
     96 
     97     // initialize mutex and condition variables
     98     err = pthread_mutex_init(&tp->mMutex, (const pthread_mutexattr_t *) NULL);
     99     result = err_to_result(err);
    100     if (SL_RESULT_SUCCESS != result)
    101         goto fail;
    102     initialized |= INITIALIZED_MUTEX;
    103     err = pthread_cond_init(&tp->mCondNotFull, (const pthread_condattr_t *) NULL);
    104     result = err_to_result(err);
    105     if (SL_RESULT_SUCCESS != result)
    106         goto fail;
    107     initialized |= INITIALIZED_CONDNOTFULL;
    108     err = pthread_cond_init(&tp->mCondNotEmpty, (const pthread_condattr_t *) NULL);
    109     result = err_to_result(err);
    110     if (SL_RESULT_SUCCESS != result)
    111         goto fail;
    112     initialized |= INITIALIZED_CONDNOTEMPTY;
    113 
    114     // use default values for parameters, if not specified explicitly
    115     tp->mWaitingNotFull = 0;
    116     tp->mWaitingNotEmpty = 0;
    117     if (0 == maxClosures)
    118         maxClosures = CLOSURE_TYPICAL;
    119     tp->mMaxClosures = maxClosures;
    120     if (0 == maxThreads)
    121         maxThreads = THREAD_TYPICAL;
    122     tp->mMaxThreads = maxThreads;
    123 
    124     // initialize circular buffer for closures
    125     if (CLOSURE_TYPICAL >= maxClosures) {
    126         tp->mClosureArray = tp->mClosureTypical;
    127     } else {
    128         tp->mClosureArray = (Closure **) malloc((maxClosures + 1) * sizeof(Closure *));
    129         if (NULL == tp->mClosureArray) {
    130             result = SL_RESULT_RESOURCE_ERROR;
    131             goto fail;
    132         }
    133     }
    134     tp->mClosureFront = tp->mClosureArray;
    135     tp->mClosureRear = tp->mClosureArray;
    136 
    137     // initialize thread pool
    138     if (THREAD_TYPICAL >= maxThreads) {
    139         tp->mThreadArray = tp->mThreadTypical;
    140     } else {
    141         tp->mThreadArray = (pthread_t *) malloc(maxThreads * sizeof(pthread_t));
    142         if (NULL == tp->mThreadArray) {
    143             result = SL_RESULT_RESOURCE_ERROR;
    144             goto fail;
    145         }
    146     }
    147     unsigned i;
    148     for (i = 0; i < maxThreads; ++i) {
    149         int err = pthread_create(&tp->mThreadArray[i], (const pthread_attr_t *) NULL,
    150             ThreadPool_start, tp);
    151         result = err_to_result(err);
    152         if (SL_RESULT_SUCCESS != result)
    153             goto fail;
    154         ++nThreads;
    155     }
    156     tp->mInitialized = initialized;
    157 
    158     // done
    159     return SL_RESULT_SUCCESS;
    160 
    161     // here on any kind of error
    162 fail:
    163     ThreadPool_deinit_internal(tp, initialized, nThreads);
    164     return result;
    165 }
    166 
    167 static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads)
    168 {
    169     int ok;
    170 
    171     assert(NULL != tp);
    172     // Destroy all threads
    173     if (0 < nThreads) {
    174         assert(INITIALIZED_ALL == initialized);
    175         ok = pthread_mutex_lock(&tp->mMutex);
    176         assert(0 == ok);
    177         tp->mShutdown = SL_BOOLEAN_TRUE;
    178         ok = pthread_cond_broadcast(&tp->mCondNotEmpty);
    179         assert(0 == ok);
    180         ok = pthread_cond_broadcast(&tp->mCondNotFull);
    181         assert(0 == ok);
    182         ok = pthread_mutex_unlock(&tp->mMutex);
    183         assert(0 == ok);
    184         unsigned i;
    185         for (i = 0; i < nThreads; ++i) {
    186             ok = pthread_join(tp->mThreadArray[i], (void **) NULL);
    187             assert(ok == 0);
    188         }
    189 
    190         // Empty out the circular buffer of closures
    191         ok = pthread_mutex_lock(&tp->mMutex);
    192         assert(0 == ok);
    193         Closure **oldFront = tp->mClosureFront;
    194         while (oldFront != tp->mClosureRear) {
    195             Closure **newFront = oldFront;
    196             if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1])
    197                 newFront = tp->mClosureArray;
    198             Closure *pClosure = *oldFront;
    199             assert(NULL != pClosure);
    200             *oldFront = NULL;
    201             tp->mClosureFront = newFront;
    202             ok = pthread_mutex_unlock(&tp->mMutex);
    203             assert(0 == ok);
    204             free(pClosure);
    205             ok = pthread_mutex_lock(&tp->mMutex);
    206             assert(0 == ok);
    207         }
    208         ok = pthread_mutex_unlock(&tp->mMutex);
    209         assert(0 == ok);
    210         // Note that we can't be sure when mWaitingNotFull will drop to zero
    211     }
    212 
    213     // destroy the mutex and condition variables
    214     if (initialized & INITIALIZED_CONDNOTEMPTY) {
    215         ok = pthread_cond_destroy(&tp->mCondNotEmpty);
    216         assert(0 == ok);
    217     }
    218     if (initialized & INITIALIZED_CONDNOTFULL) {
    219         ok = pthread_cond_destroy(&tp->mCondNotFull);
    220         assert(0 == ok);
    221     }
    222     if (initialized & INITIALIZED_MUTEX) {
    223         ok = pthread_mutex_destroy(&tp->mMutex);
    224         assert(0 == ok);
    225     }
    226     tp->mInitialized = INITIALIZED_NONE;
    227 
    228     // release the closure circular buffer
    229     if (tp->mClosureTypical != tp->mClosureArray && NULL != tp->mClosureArray) {
    230         free(tp->mClosureArray);
    231         tp->mClosureArray = NULL;
    232     }
    233 
    234     // release the thread pool
    235     if (tp->mThreadTypical != tp->mThreadArray && NULL != tp->mThreadArray) {
    236         free(tp->mThreadArray);
    237         tp->mThreadArray = NULL;
    238     }
    239 
    240 }
    241 
    242 void ThreadPool_deinit(ThreadPool *tp)
    243 {
    244     ThreadPool_deinit_internal(tp, tp->mInitialized, tp->mMaxThreads);
    245 }
    246 
    247 // Enqueue a closure to be executed later by a worker thread.
    248 // Note that this raw interface requires an explicit "kind" and full parameter list.
    249 // There are convenience methods below that make this easier to use.
    250 SLresult ThreadPool_add(ThreadPool *tp, ClosureKind kind, ClosureHandler_generic handler,
    251         void *context1, void *context2, void *context3, int parameter1, int parameter2)
    252 {
    253     assert(NULL != tp);
    254     assert(NULL != handler);
    255     Closure *closure = (Closure *) malloc(sizeof(Closure));
    256     if (NULL == closure) {
    257         return SL_RESULT_RESOURCE_ERROR;
    258     }
    259     closure->mKind = kind;
    260     switch (kind) {
    261       case CLOSURE_KIND_PPI:
    262         closure->mHandler.mHandler_ppi = (ClosureHandler_ppi)handler;
    263         break;
    264       case CLOSURE_KIND_PPII:
    265         closure->mHandler.mHandler_ppii = (ClosureHandler_ppii)handler;
    266         break;
    267       case CLOSURE_KIND_PIIPP:
    268         closure->mHandler.mHandler_piipp = (ClosureHandler_piipp)handler;
    269         break;
    270       default:
    271         SL_LOGE("ThreadPool_add() invalid closure kind %d", kind);
    272         assert(false);
    273     }
    274     closure->mContext1 = context1;
    275     closure->mContext2 = context2;
    276     closure->mContext3 = context3;
    277     closure->mParameter1 = parameter1;
    278     closure->mParameter2 = parameter2;
    279     int ok;
    280     ok = pthread_mutex_lock(&tp->mMutex);
    281     assert(0 == ok);
    282     // can't enqueue while thread pool shutting down
    283     if (tp->mShutdown) {
    284         ok = pthread_mutex_unlock(&tp->mMutex);
    285         assert(0 == ok);
    286         free(closure);
    287         return SL_RESULT_PRECONDITIONS_VIOLATED;
    288     }
    289     for (;;) {
    290         Closure **oldRear = tp->mClosureRear;
    291         Closure **newRear = oldRear;
    292         if (++newRear == &tp->mClosureArray[tp->mMaxClosures + 1])
    293             newRear = tp->mClosureArray;
    294         // if closure circular buffer is full, then wait for it to become non-full
    295         if (newRear == tp->mClosureFront) {
    296             ++tp->mWaitingNotFull;
    297             ok = pthread_cond_wait(&tp->mCondNotFull, &tp->mMutex);
    298             assert(0 == ok);
    299             // can't enqueue while thread pool shutting down
    300             if (tp->mShutdown) {
    301                 assert(0 < tp->mWaitingNotFull);
    302                 --tp->mWaitingNotFull;
    303                 ok = pthread_mutex_unlock(&tp->mMutex);
    304                 assert(0 == ok);
    305                 free(closure);
    306                 return SL_RESULT_PRECONDITIONS_VIOLATED;
    307             }
    308             continue;
    309         }
    310         assert(NULL == *oldRear);
    311         *oldRear = closure;
    312         tp->mClosureRear = newRear;
    313         // if a worker thread was waiting to dequeue, then suggest that it try again
    314         if (0 < tp->mWaitingNotEmpty) {
    315             --tp->mWaitingNotEmpty;
    316             ok = pthread_cond_signal(&tp->mCondNotEmpty);
    317             assert(0 == ok);
    318         }
    319         break;
    320     }
    321     ok = pthread_mutex_unlock(&tp->mMutex);
    322     assert(0 == ok);
    323     return SL_RESULT_SUCCESS;
    324 }
    325 
    326 // Called by a worker thread when it is ready to accept the next closure to execute
    327 Closure *ThreadPool_remove(ThreadPool *tp)
    328 {
    329     Closure *pClosure;
    330     int ok;
    331     ok = pthread_mutex_lock(&tp->mMutex);
    332     assert(0 == ok);
    333     for (;;) {
    334         // fail if thread pool is shutting down
    335         if (tp->mShutdown) {
    336             pClosure = NULL;
    337             break;
    338         }
    339         Closure **oldFront = tp->mClosureFront;
    340         // if closure circular buffer is empty, then wait for it to become non-empty
    341         if (oldFront == tp->mClosureRear) {
    342             ++tp->mWaitingNotEmpty;
    343             ok = pthread_cond_wait(&tp->mCondNotEmpty, &tp->mMutex);
    344             assert(0 == ok);
    345             // try again
    346             continue;
    347         }
    348         // dequeue the closure at front of circular buffer
    349         Closure **newFront = oldFront;
    350         if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1]) {
    351             newFront = tp->mClosureArray;
    352         }
    353         pClosure = *oldFront;
    354         assert(NULL != pClosure);
    355         *oldFront = NULL;
    356         tp->mClosureFront = newFront;
    357         // if a client thread was waiting to enqueue, then suggest that it try again
    358         if (0 < tp->mWaitingNotFull) {
    359             --tp->mWaitingNotFull;
    360             ok = pthread_cond_signal(&tp->mCondNotFull);
    361             assert(0 == ok);
    362         }
    363         break;
    364     }
    365     ok = pthread_mutex_unlock(&tp->mMutex);
    366     assert(0 == ok);
    367     return pClosure;
    368 }
    369 
    370 // Convenience methods for applications
    371 SLresult ThreadPool_add_ppi(ThreadPool *tp, ClosureHandler_ppi handler,
    372         void *context1, void *context2, int parameter1)
    373 {
    374     // function pointers are the same size so this is a safe cast
    375     return ThreadPool_add(tp, CLOSURE_KIND_PPI, (ClosureHandler_generic) handler,
    376             context1, context2, NULL, parameter1, 0);
    377 }
    378 
    379 SLresult ThreadPool_add_ppii(ThreadPool *tp, ClosureHandler_ppii handler,
    380         void *context1, void *context2, int parameter1, int parameter2)
    381 {
    382     // function pointers are the same size so this is a safe cast
    383     return ThreadPool_add(tp, CLOSURE_KIND_PPII, (ClosureHandler_generic) handler,
    384             context1, context2, NULL, parameter1, parameter2);
    385 }
    386 
    387 SLresult ThreadPool_add_piipp(ThreadPool *tp, ClosureHandler_piipp handler,
    388         void *cntxt1, int param1, int param2, void *cntxt2, void *cntxt3)
    389 {
    390     // function pointers are the same size so this is a safe cast
    391     return ThreadPool_add(tp, CLOSURE_KIND_PIIPP, (ClosureHandler_generic) handler,
    392             cntxt1, cntxt2, cntxt3, param1, param2);
    393 }
    394