Home | History | Annotate | Download | only in utils
      1 // Copyright 2011 Google Inc. All Rights Reserved.
      2 //
      3 // Use of this source code is governed by a BSD-style license
      4 // that can be found in the COPYING file in the root of the source
      5 // tree. An additional intellectual property rights grant can be found
      6 // in the file PATENTS. All contributing project authors may
      7 // be found in the AUTHORS file in the root of the source tree.
      8 // -----------------------------------------------------------------------------
      9 //
     10 // Multi-threaded worker
     11 //
     12 // Author: Skal (pascal.massimino (at) gmail.com)
     13 
     14 #include <assert.h>
     15 #include <string.h>   // for memset()
     16 #include "./thread.h"
     17 
     18 #if defined(__cplusplus) || defined(c_plusplus)
     19 extern "C" {
     20 #endif
     21 
     22 #ifdef WEBP_USE_THREAD
     23 
     24 #if defined(_WIN32)
     25 
     26 //------------------------------------------------------------------------------
     27 // simplistic pthread emulation layer
     28 
     29 #include <process.h>
     30 
     31 // _beginthreadex requires __stdcall
     32 #define THREADFN unsigned int __stdcall
     33 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
     34 
     35 static int pthread_create(pthread_t* const thread, const void* attr,
     36                           unsigned int (__stdcall *start)(void*), void* arg) {
     37   (void)attr;
     38   *thread = (pthread_t)_beginthreadex(NULL,   /* void *security */
     39                                       0,      /* unsigned stack_size */
     40                                       start,
     41                                       arg,
     42                                       0,      /* unsigned initflag */
     43                                       NULL);  /* unsigned *thrdaddr */
     44   if (*thread == NULL) return 1;
     45   SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);
     46   return 0;
     47 }
     48 
     49 static int pthread_join(pthread_t thread, void** value_ptr) {
     50   (void)value_ptr;
     51   return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||
     52           CloseHandle(thread) == 0);
     53 }
     54 
     55 // Mutex
     56 static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {
     57   (void)mutexattr;
     58   InitializeCriticalSection(mutex);
     59   return 0;
     60 }
     61 
     62 static int pthread_mutex_lock(pthread_mutex_t* const mutex) {
     63   EnterCriticalSection(mutex);
     64   return 0;
     65 }
     66 
     67 static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {
     68   LeaveCriticalSection(mutex);
     69   return 0;
     70 }
     71 
     72 static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {
     73   DeleteCriticalSection(mutex);
     74   return 0;
     75 }
     76 
     77 // Condition
     78 static int pthread_cond_destroy(pthread_cond_t* const condition) {
     79   int ok = 1;
     80   ok &= (CloseHandle(condition->waiting_sem_) != 0);
     81   ok &= (CloseHandle(condition->received_sem_) != 0);
     82   ok &= (CloseHandle(condition->signal_event_) != 0);
     83   return !ok;
     84 }
     85 
     86 static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {
     87   (void)cond_attr;
     88   condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
     89   condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
     90   condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL);
     91   if (condition->waiting_sem_ == NULL ||
     92       condition->received_sem_ == NULL ||
     93       condition->signal_event_ == NULL) {
     94     pthread_cond_destroy(condition);
     95     return 1;
     96   }
     97   return 0;
     98 }
     99 
    100 static int pthread_cond_signal(pthread_cond_t* const condition) {
    101   int ok = 1;
    102   if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) {
    103     // a thread is waiting in pthread_cond_wait: allow it to be notified
    104     ok = SetEvent(condition->signal_event_);
    105     // wait until the event is consumed so the signaler cannot consume
    106     // the event via its own pthread_cond_wait.
    107     ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) !=
    108            WAIT_OBJECT_0);
    109   }
    110   return !ok;
    111 }
    112 
    113 static int pthread_cond_wait(pthread_cond_t* const condition,
    114                              pthread_mutex_t* const mutex) {
    115   int ok;
    116   // note that there is a consumer available so the signal isn't dropped in
    117   // pthread_cond_signal
    118   if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL))
    119     return 1;
    120   // now unlock the mutex so pthread_cond_signal may be issued
    121   pthread_mutex_unlock(mutex);
    122   ok = (WaitForSingleObject(condition->signal_event_, INFINITE) ==
    123         WAIT_OBJECT_0);
    124   ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL);
    125   pthread_mutex_lock(mutex);
    126   return !ok;
    127 }
    128 
    129 #else  // _WIN32
    130 # define THREADFN void*
    131 # define THREAD_RETURN(val) val
    132 #endif
    133 
    134 //------------------------------------------------------------------------------
    135 
    136 static THREADFN WebPWorkerThreadLoop(void *ptr) {    // thread loop
    137   WebPWorker* const worker = (WebPWorker*)ptr;
    138   int done = 0;
    139   while (!done) {
    140     pthread_mutex_lock(&worker->mutex_);
    141     while (worker->status_ == OK) {   // wait in idling mode
    142       pthread_cond_wait(&worker->condition_, &worker->mutex_);
    143     }
    144     if (worker->status_ == WORK) {
    145       if (worker->hook) {
    146         worker->had_error |= !worker->hook(worker->data1, worker->data2);
    147       }
    148       worker->status_ = OK;
    149     } else if (worker->status_ == NOT_OK) {   // finish the worker
    150       done = 1;
    151     }
    152     // signal to the main thread that we're done (for Sync())
    153     pthread_cond_signal(&worker->condition_);
    154     pthread_mutex_unlock(&worker->mutex_);
    155   }
    156   return THREAD_RETURN(NULL);    // Thread is finished
    157 }
    158 
    159 // main thread state control
    160 static void WebPWorkerChangeState(WebPWorker* const worker,
    161                                   WebPWorkerStatus new_status) {
    162   // no-op when attempting to change state on a thread that didn't come up
    163   if (worker->status_ < OK) return;
    164 
    165   pthread_mutex_lock(&worker->mutex_);
    166   // wait for the worker to finish
    167   while (worker->status_ != OK) {
    168     pthread_cond_wait(&worker->condition_, &worker->mutex_);
    169   }
    170   // assign new status and release the working thread if needed
    171   if (new_status != OK) {
    172     worker->status_ = new_status;
    173     pthread_cond_signal(&worker->condition_);
    174   }
    175   pthread_mutex_unlock(&worker->mutex_);
    176 }
    177 
    178 #endif
    179 
    180 //------------------------------------------------------------------------------
    181 
    182 void WebPWorkerInit(WebPWorker* const worker) {
    183   memset(worker, 0, sizeof(*worker));
    184   worker->status_ = NOT_OK;
    185 }
    186 
    187 int WebPWorkerSync(WebPWorker* const worker) {
    188 #ifdef WEBP_USE_THREAD
    189   WebPWorkerChangeState(worker, OK);
    190 #endif
    191   assert(worker->status_ <= OK);
    192   return !worker->had_error;
    193 }
    194 
    195 int WebPWorkerReset(WebPWorker* const worker) {
    196   int ok = 1;
    197   worker->had_error = 0;
    198   if (worker->status_ < OK) {
    199 #ifdef WEBP_USE_THREAD
    200     if (pthread_mutex_init(&worker->mutex_, NULL) ||
    201         pthread_cond_init(&worker->condition_, NULL)) {
    202       return 0;
    203     }
    204     pthread_mutex_lock(&worker->mutex_);
    205     ok = !pthread_create(&worker->thread_, NULL, WebPWorkerThreadLoop, worker);
    206     if (ok) worker->status_ = OK;
    207     pthread_mutex_unlock(&worker->mutex_);
    208 #else
    209     worker->status_ = OK;
    210 #endif
    211   } else if (worker->status_ > OK) {
    212     ok = WebPWorkerSync(worker);
    213   }
    214   assert(!ok || (worker->status_ == OK));
    215   return ok;
    216 }
    217 
    218 void WebPWorkerLaunch(WebPWorker* const worker) {
    219 #ifdef WEBP_USE_THREAD
    220   WebPWorkerChangeState(worker, WORK);
    221 #else
    222   if (worker->hook)
    223     worker->had_error |= !worker->hook(worker->data1, worker->data2);
    224 #endif
    225 }
    226 
    227 void WebPWorkerEnd(WebPWorker* const worker) {
    228   if (worker->status_ >= OK) {
    229 #ifdef WEBP_USE_THREAD
    230     WebPWorkerChangeState(worker, NOT_OK);
    231     pthread_join(worker->thread_, NULL);
    232     pthread_mutex_destroy(&worker->mutex_);
    233     pthread_cond_destroy(&worker->condition_);
    234 #else
    235     worker->status_ = NOT_OK;
    236 #endif
    237   }
    238   assert(worker->status_ == NOT_OK);
    239 }
    240 
    241 //------------------------------------------------------------------------------
    242 
    243 #if defined(__cplusplus) || defined(c_plusplus)
    244 }    // extern "C"
    245 #endif
    246