Home | History | Annotate | Download | only in utils
      1 // Copyright 2011 Google Inc. All Rights Reserved.
      2 //
      3 // This code is licensed under the same terms as WebM:
      4 //  Software License Agreement:  http://www.webmproject.org/license/software/
      5 //  Additional IP Rights Grant:  http://www.webmproject.org/license/additional/
      6 // -----------------------------------------------------------------------------
      7 //
      8 // Multi-threaded worker
      9 //
     10 // Author: Skal (pascal.massimino (at) gmail.com)
     11 
     12 #include <assert.h>
     13 #include <string.h>   // for memset()
     14 #include "./thread.h"
     15 
     16 #if defined(__cplusplus) || defined(c_plusplus)
     17 extern "C" {
     18 #endif
     19 
     20 #ifdef WEBP_USE_THREAD
     21 
     22 #if defined(_WIN32)
     23 
     24 //------------------------------------------------------------------------------
     25 // simplistic pthread emulation layer
     26 
     27 #include <process.h>
     28 
     29 // _beginthreadex requires __stdcall
     30 #define THREADFN unsigned int __stdcall
     31 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
     32 
     33 static int pthread_create(pthread_t* const thread, const void* attr,
     34                           unsigned int (__stdcall *start)(void*), void* arg) {
     35   (void)attr;
     36   *thread = (pthread_t)_beginthreadex(NULL,   /* void *security */
     37                                       0,      /* unsigned stack_size */
     38                                       start,
     39                                       arg,
     40                                       0,      /* unsigned initflag */
     41                                       NULL);  /* unsigned *thrdaddr */
     42   if (*thread == NULL) return 1;
     43   SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);
     44   return 0;
     45 }
     46 
     47 static int pthread_join(pthread_t thread, void** value_ptr) {
     48   (void)value_ptr;
     49   return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||
     50           CloseHandle(thread) == 0);
     51 }
     52 
     53 // Mutex
     54 static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {
     55   (void)mutexattr;
     56   InitializeCriticalSection(mutex);
     57   return 0;
     58 }
     59 
     60 static int pthread_mutex_lock(pthread_mutex_t* const mutex) {
     61   EnterCriticalSection(mutex);
     62   return 0;
     63 }
     64 
     65 static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {
     66   LeaveCriticalSection(mutex);
     67   return 0;
     68 }
     69 
     70 static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {
     71   DeleteCriticalSection(mutex);
     72   return 0;
     73 }
     74 
     75 // Condition
     76 static int pthread_cond_destroy(pthread_cond_t* const condition) {
     77   int ok = 1;
     78   ok &= (CloseHandle(condition->waiting_sem_) != 0);
     79   ok &= (CloseHandle(condition->received_sem_) != 0);
     80   ok &= (CloseHandle(condition->signal_event_) != 0);
     81   return !ok;
     82 }
     83 
     84 static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {
     85   (void)cond_attr;
     86   condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
     87   condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
     88   condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL);
     89   if (condition->waiting_sem_ == NULL ||
     90       condition->received_sem_ == NULL ||
     91       condition->signal_event_ == NULL) {
     92     pthread_cond_destroy(condition);
     93     return 1;
     94   }
     95   return 0;
     96 }
     97 
     98 static int pthread_cond_signal(pthread_cond_t* const condition) {
     99   int ok = 1;
    100   if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) {
    101     // a thread is waiting in pthread_cond_wait: allow it to be notified
    102     ok = SetEvent(condition->signal_event_);
    103     // wait until the event is consumed so the signaler cannot consume
    104     // the event via its own pthread_cond_wait.
    105     ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) !=
    106            WAIT_OBJECT_0);
    107   }
    108   return !ok;
    109 }
    110 
    111 static int pthread_cond_wait(pthread_cond_t* const condition,
    112                              pthread_mutex_t* const mutex) {
    113   int ok;
    114   // note that there is a consumer available so the signal isn't dropped in
    115   // pthread_cond_signal
    116   if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL))
    117     return 1;
    118   // now unlock the mutex so pthread_cond_signal may be issued
    119   pthread_mutex_unlock(mutex);
    120   ok = (WaitForSingleObject(condition->signal_event_, INFINITE) ==
    121         WAIT_OBJECT_0);
    122   ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL);
    123   pthread_mutex_lock(mutex);
    124   return !ok;
    125 }
    126 
    127 #else  // _WIN32
    128 # define THREADFN void*
    129 # define THREAD_RETURN(val) val
    130 #endif
    131 
    132 //------------------------------------------------------------------------------
    133 
    134 static THREADFN WebPWorkerThreadLoop(void *ptr) {    // thread loop
    135   WebPWorker* const worker = (WebPWorker*)ptr;
    136   int done = 0;
    137   while (!done) {
    138     pthread_mutex_lock(&worker->mutex_);
    139     while (worker->status_ == OK) {   // wait in idling mode
    140       pthread_cond_wait(&worker->condition_, &worker->mutex_);
    141     }
    142     if (worker->status_ == WORK) {
    143       if (worker->hook) {
    144         worker->had_error |= !worker->hook(worker->data1, worker->data2);
    145       }
    146       worker->status_ = OK;
    147     } else if (worker->status_ == NOT_OK) {   // finish the worker
    148       done = 1;
    149     }
    150     // signal to the main thread that we're done (for Sync())
    151     pthread_cond_signal(&worker->condition_);
    152     pthread_mutex_unlock(&worker->mutex_);
    153   }
    154   return THREAD_RETURN(NULL);    // Thread is finished
    155 }
    156 
    157 // main thread state control
    158 static void WebPWorkerChangeState(WebPWorker* const worker,
    159                                   WebPWorkerStatus new_status) {
    160   // no-op when attempting to change state on a thread that didn't come up
    161   if (worker->status_ < OK) return;
    162 
    163   pthread_mutex_lock(&worker->mutex_);
    164   // wait for the worker to finish
    165   while (worker->status_ != OK) {
    166     pthread_cond_wait(&worker->condition_, &worker->mutex_);
    167   }
    168   // assign new status and release the working thread if needed
    169   if (new_status != OK) {
    170     worker->status_ = new_status;
    171     pthread_cond_signal(&worker->condition_);
    172   }
    173   pthread_mutex_unlock(&worker->mutex_);
    174 }
    175 
    176 #endif
    177 
    178 //------------------------------------------------------------------------------
    179 
    180 void WebPWorkerInit(WebPWorker* const worker) {
    181   memset(worker, 0, sizeof(*worker));
    182   worker->status_ = NOT_OK;
    183 }
    184 
    185 int WebPWorkerSync(WebPWorker* const worker) {
    186 #ifdef WEBP_USE_THREAD
    187   WebPWorkerChangeState(worker, OK);
    188 #endif
    189   assert(worker->status_ <= OK);
    190   return !worker->had_error;
    191 }
    192 
    193 int WebPWorkerReset(WebPWorker* const worker) {
    194   int ok = 1;
    195   worker->had_error = 0;
    196   if (worker->status_ < OK) {
    197 #ifdef WEBP_USE_THREAD
    198     if (pthread_mutex_init(&worker->mutex_, NULL) ||
    199         pthread_cond_init(&worker->condition_, NULL)) {
    200       return 0;
    201     }
    202     pthread_mutex_lock(&worker->mutex_);
    203     ok = !pthread_create(&worker->thread_, NULL, WebPWorkerThreadLoop, worker);
    204     if (ok) worker->status_ = OK;
    205     pthread_mutex_unlock(&worker->mutex_);
    206 #else
    207     worker->status_ = OK;
    208 #endif
    209   } else if (worker->status_ > OK) {
    210     ok = WebPWorkerSync(worker);
    211   }
    212   assert(!ok || (worker->status_ == OK));
    213   return ok;
    214 }
    215 
    216 void WebPWorkerLaunch(WebPWorker* const worker) {
    217 #ifdef WEBP_USE_THREAD
    218   WebPWorkerChangeState(worker, WORK);
    219 #else
    220   if (worker->hook)
    221     worker->had_error |= !worker->hook(worker->data1, worker->data2);
    222 #endif
    223 }
    224 
    225 void WebPWorkerEnd(WebPWorker* const worker) {
    226   if (worker->status_ >= OK) {
    227 #ifdef WEBP_USE_THREAD
    228     WebPWorkerChangeState(worker, NOT_OK);
    229     pthread_join(worker->thread_, NULL);
    230     pthread_mutex_destroy(&worker->mutex_);
    231     pthread_cond_destroy(&worker->condition_);
    232 #else
    233     worker->status_ = NOT_OK;
    234 #endif
    235   }
    236   assert(worker->status_ == NOT_OK);
    237 }
    238 
    239 //------------------------------------------------------------------------------
    240 
    241 #if defined(__cplusplus) || defined(c_plusplus)
    242 }    // extern "C"
    243 #endif
    244