Home | History | Annotate | Download | only in libopensles
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 /* ThreadPool */
     18 
     19 #include "sles_allinclusive.h"
     20 
     21 // Entry point for each worker thread
     22 
     23 static void *ThreadPool_start(void *context)
     24 {
     25     ThreadPool *tp = (ThreadPool *) context;
     26     assert(NULL != tp);
     27     for (;;) {
     28         Closure *pClosure = ThreadPool_remove(tp);
     29         // closure is NULL when thread pool is being destroyed
     30         if (NULL == pClosure)
     31             break;
     32         void (*handler)(void *, int);
     33         handler = pClosure->mHandler;
     34         void *context = pClosure->mContext;
     35         int parameter = pClosure->mParameter;
     36         free(pClosure);
     37         assert(NULL != handler);
     38         (*handler)(context, parameter);
     39     }
     40     return NULL;
     41 }
     42 
     43 #define INITIALIZED_NONE         0
     44 #define INITIALIZED_MUTEX        1
     45 #define INITIALIZED_CONDNOTFULL  2
     46 #define INITIALIZED_CONDNOTEMPTY 4
     47 #define INITIALIZED_ALL          7
     48 
     49 static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads);
     50 
     51 // Initialize a ThreadPool
     52 // maxClosures defaults to CLOSURE_TYPICAL if 0
     53 // maxThreads defaults to THREAD_TYPICAL if 0
     54 
     55 SLresult ThreadPool_init(ThreadPool *tp, unsigned maxClosures, unsigned maxThreads)
     56 {
     57     assert(NULL != tp);
     58     memset(tp, 0, sizeof(ThreadPool));
     59     tp->mShutdown = SL_BOOLEAN_FALSE;
     60     unsigned initialized = INITIALIZED_NONE;    // which objects were successfully initialized
     61     unsigned nThreads = 0;                      // number of threads successfully created
     62     int err;
     63     SLresult result;
     64 
     65     // initialize mutex and condition variables
     66     err = pthread_mutex_init(&tp->mMutex, (const pthread_mutexattr_t *) NULL);
     67     result = err_to_result(err);
     68     if (SL_RESULT_SUCCESS != result)
     69         goto fail;
     70     initialized |= INITIALIZED_MUTEX;
     71     err = pthread_cond_init(&tp->mCondNotFull, (const pthread_condattr_t *) NULL);
     72     result = err_to_result(err);
     73     if (SL_RESULT_SUCCESS != result)
     74         goto fail;
     75     initialized |= INITIALIZED_CONDNOTFULL;
     76     err = pthread_cond_init(&tp->mCondNotEmpty, (const pthread_condattr_t *) NULL);
     77     result = err_to_result(err);
     78     if (SL_RESULT_SUCCESS != result)
     79         goto fail;
     80     initialized |= INITIALIZED_CONDNOTEMPTY;
     81 
     82     // use default values for parameters, if not specified explicitly
     83     tp->mWaitingNotFull = 0;
     84     tp->mWaitingNotEmpty = 0;
     85     if (0 == maxClosures)
     86         maxClosures = CLOSURE_TYPICAL;
     87     tp->mMaxClosures = maxClosures;
     88     if (0 == maxThreads)
     89         maxThreads = THREAD_TYPICAL;
     90     tp->mMaxThreads = maxThreads;
     91 
     92     // initialize circular buffer for closures
     93     if (CLOSURE_TYPICAL >= maxClosures) {
     94         tp->mClosureArray = tp->mClosureTypical;
     95     } else {
     96         tp->mClosureArray = (Closure **) malloc((maxClosures + 1) * sizeof(Closure *));
     97         if (NULL == tp->mClosureArray) {
     98             result = SL_RESULT_RESOURCE_ERROR;
     99             goto fail;
    100         }
    101     }
    102     tp->mClosureFront = tp->mClosureArray;
    103     tp->mClosureRear = tp->mClosureArray;
    104 
    105     // initialize thread pool
    106     if (THREAD_TYPICAL >= maxThreads) {
    107         tp->mThreadArray = tp->mThreadTypical;
    108     } else {
    109         tp->mThreadArray = (pthread_t *) malloc(maxThreads * sizeof(pthread_t));
    110         if (NULL == tp->mThreadArray) {
    111             result = SL_RESULT_RESOURCE_ERROR;
    112             goto fail;
    113         }
    114     }
    115     unsigned i;
    116     for (i = 0; i < maxThreads; ++i) {
    117         int err = pthread_create(&tp->mThreadArray[i], (const pthread_attr_t *) NULL,
    118             ThreadPool_start, tp);
    119         result = err_to_result(err);
    120         if (SL_RESULT_SUCCESS != result)
    121             goto fail;
    122         ++nThreads;
    123     }
    124     tp->mInitialized = initialized;
    125 
    126     // done
    127     return SL_RESULT_SUCCESS;
    128 
    129     // here on any kind of error
    130 fail:
    131     ThreadPool_deinit_internal(tp, initialized, nThreads);
    132     return result;
    133 }
    134 
    135 static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads)
    136 {
    137     int ok;
    138 
    139     assert(NULL != tp);
    140     // Destroy all threads
    141     if (0 < nThreads) {
    142         assert(INITIALIZED_ALL == initialized);
    143         ok = pthread_mutex_lock(&tp->mMutex);
    144         assert(0 == ok);
    145         tp->mShutdown = SL_BOOLEAN_TRUE;
    146         ok = pthread_cond_broadcast(&tp->mCondNotEmpty);
    147         assert(0 == ok);
    148         ok = pthread_cond_broadcast(&tp->mCondNotFull);
    149         assert(0 == ok);
    150         ok = pthread_mutex_unlock(&tp->mMutex);
    151         assert(0 == ok);
    152         unsigned i;
    153         for (i = 0; i < nThreads; ++i) {
    154             ok = pthread_join(tp->mThreadArray[i], (void **) NULL);
    155             assert(ok == 0);
    156         }
    157 
    158         // Empty out the circular buffer of closures
    159         ok = pthread_mutex_lock(&tp->mMutex);
    160         assert(0 == ok);
    161         assert(0 == tp->mWaitingNotEmpty);
    162         Closure **oldFront = tp->mClosureFront;
    163         while (oldFront != tp->mClosureRear) {
    164             Closure **newFront = oldFront;
    165             if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1])
    166                 newFront = tp->mClosureArray;
    167             Closure *pClosure = *oldFront;
    168             assert(NULL != pClosure);
    169             *oldFront = NULL;
    170             tp->mClosureFront = newFront;
    171             ok = pthread_mutex_unlock(&tp->mMutex);
    172             assert(0 == ok);
    173             free(pClosure);
    174             ok = pthread_mutex_lock(&tp->mMutex);
    175             assert(0 == ok);
    176         }
    177         ok = pthread_mutex_unlock(&tp->mMutex);
    178         assert(0 == ok);
    179         // Note that we can't be sure when mWaitingNotFull will drop to zero
    180     }
    181 
    182     // destroy the mutex and condition variables
    183     if (initialized & INITIALIZED_CONDNOTEMPTY) {
    184         ok = pthread_cond_destroy(&tp->mCondNotEmpty);
    185         assert(0 == ok);
    186     }
    187     if (initialized & INITIALIZED_CONDNOTFULL) {
    188         ok = pthread_cond_destroy(&tp->mCondNotFull);
    189         assert(0 == ok);
    190     }
    191     if (initialized & INITIALIZED_MUTEX) {
    192         ok = pthread_mutex_destroy(&tp->mMutex);
    193         assert(0 == ok);
    194     }
    195     tp->mInitialized = INITIALIZED_NONE;
    196 
    197     // release the closure circular buffer
    198     if (tp->mClosureTypical != tp->mClosureArray && NULL != tp->mClosureArray) {
    199         free(tp->mClosureArray);
    200         tp->mClosureArray = NULL;
    201     }
    202 
    203     // release the thread pool
    204     if (tp->mThreadTypical != tp->mThreadArray && NULL != tp->mThreadArray) {
    205         free(tp->mThreadArray);
    206         tp->mThreadArray = NULL;
    207     }
    208 
    209 }
    210 
    211 void ThreadPool_deinit(ThreadPool *tp)
    212 {
    213     ThreadPool_deinit_internal(tp, tp->mInitialized, tp->mMaxThreads);
    214 }
    215 
    216 // Enqueue a closure to be executed later by a worker thread
    217 SLresult ThreadPool_add(ThreadPool *tp, void (*handler)(void *, int), void *context, int parameter)
    218 {
    219     assert(NULL != tp);
    220     assert(NULL != handler);
    221     Closure *closure = (Closure *) malloc(sizeof(Closure));
    222     if (NULL == closure)
    223         return SL_RESULT_RESOURCE_ERROR;
    224     closure->mHandler = handler;
    225     closure->mContext = context;
    226     closure->mParameter = parameter;
    227     int ok;
    228     ok = pthread_mutex_lock(&tp->mMutex);
    229     assert(0 == ok);
    230     // can't enqueue while thread pool shutting down
    231     if (tp->mShutdown) {
    232         ok = pthread_mutex_unlock(&tp->mMutex);
    233         assert(0 == ok);
    234         free(closure);
    235         return SL_RESULT_PRECONDITIONS_VIOLATED;
    236     }
    237     for (;;) {
    238         Closure **oldRear = tp->mClosureRear;
    239         Closure **newRear = oldRear;
    240         if (++newRear == &tp->mClosureArray[tp->mMaxClosures + 1])
    241             newRear = tp->mClosureArray;
    242         // if closure circular buffer is full, then wait for it to become non-full
    243         if (newRear == tp->mClosureFront) {
    244             ++tp->mWaitingNotFull;
    245             ok = pthread_cond_wait(&tp->mCondNotFull, &tp->mMutex);
    246             assert(0 == ok);
    247             // can't enqueue while thread pool shutting down
    248             if (tp->mShutdown) {
    249                 assert(0 < tp->mWaitingNotFull);
    250                 --tp->mWaitingNotFull;
    251                 ok = pthread_mutex_unlock(&tp->mMutex);
    252                 assert(0 == ok);
    253                 free(closure);
    254                 return SL_RESULT_PRECONDITIONS_VIOLATED;
    255             }
    256             continue;
    257         }
    258         assert(NULL == *oldRear);
    259         *oldRear = closure;
    260         tp->mClosureRear = newRear;
    261         // if a worker thread was waiting to dequeue, then suggest that it try again
    262         if (0 < tp->mWaitingNotEmpty) {
    263             --tp->mWaitingNotEmpty;
    264             ok = pthread_cond_signal(&tp->mCondNotEmpty);
    265             assert(0 == ok);
    266         }
    267         break;
    268     }
    269     ok = pthread_mutex_unlock(&tp->mMutex);
    270     assert(0 == ok);
    271     return SL_RESULT_SUCCESS;
    272 }
    273 
    274 // Called by a worker thread when it is ready to accept the next closure to execute
    275 Closure *ThreadPool_remove(ThreadPool *tp)
    276 {
    277     Closure *pClosure;
    278     int ok;
    279     ok = pthread_mutex_lock(&tp->mMutex);
    280     assert(0 == ok);
    281     for (;;) {
    282         Closure **oldFront = tp->mClosureFront;
    283         // if closure circular buffer is empty, then wait for it to become non-empty
    284         if (oldFront == tp->mClosureRear) {
    285             ++tp->mWaitingNotEmpty;
    286             ok = pthread_cond_wait(&tp->mCondNotEmpty, &tp->mMutex);
    287             assert(0 == ok);
    288             // fail if thread pool is shutting down
    289             if (tp->mShutdown) {
    290                 assert(0 < tp->mWaitingNotEmpty);
    291                 --tp->mWaitingNotEmpty;
    292                 pClosure = NULL;
    293                 break;
    294             }
    295             // try again
    296             continue;
    297         }
    298         // dequeue the closure at front of circular buffer
    299         Closure **newFront = oldFront;
    300         if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1])
    301             newFront = tp->mClosureArray;
    302         pClosure = *oldFront;
    303         assert(NULL != pClosure);
    304         *oldFront = NULL;
    305         tp->mClosureFront = newFront;
    306         // if a client thread was waiting to enqueue, then suggest that it try again
    307         if (0 < tp->mWaitingNotFull) {
    308             --tp->mWaitingNotFull;
    309             ok = pthread_cond_signal(&tp->mCondNotFull);
    310             assert(0 == ok);
    311         }
    312         break;
    313     }
    314     ok = pthread_mutex_unlock(&tp->mMutex);
    315     assert(0 == ok);
    316     return pClosure;
    317 }
    318