Home | History | Annotate | Download | only in utils
      1 // Copyright 2011 Google Inc. All Rights Reserved.
      2 //
      3 // Use of this source code is governed by a BSD-style license
      4 // that can be found in the COPYING file in the root of the source
      5 // tree. An additional intellectual property rights grant can be found
      6 // in the file PATENTS. All contributing project authors may
      7 // be found in the AUTHORS file in the root of the source tree.
      8 // -----------------------------------------------------------------------------
      9 //
     10 // Multi-threaded worker
     11 //
     12 // Author: Skal (pascal.massimino (at) gmail.com)
     13 
     14 #include <assert.h>
     15 #include <string.h>   // for memset()
     16 #include "./thread.h"
     17 #include "./utils.h"
     18 
     19 #ifdef WEBP_USE_THREAD
     20 
     21 #if defined(_WIN32)
     22 
     23 #include <windows.h>
     24 typedef HANDLE pthread_t;
     25 typedef CRITICAL_SECTION pthread_mutex_t;
     26 
     27 #if _WIN32_WINNT >= 0x0600  // Windows Vista / Server 2008 or greater
     28 #define USE_WINDOWS_CONDITION_VARIABLE
     29 typedef CONDITION_VARIABLE pthread_cond_t;
     30 #else
     31 typedef struct {
     32   HANDLE waiting_sem_;
     33   HANDLE received_sem_;
     34   HANDLE signal_event_;
     35 } pthread_cond_t;
     36 #endif  // _WIN32_WINNT >= 0x600
     37 
     38 #ifndef WINAPI_FAMILY_PARTITION
     39 #define WINAPI_PARTITION_DESKTOP 1
     40 #define WINAPI_FAMILY_PARTITION(x) x
     41 #endif
     42 
     43 #if !WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP)
     44 #define USE_CREATE_THREAD
     45 #endif
     46 
     47 #else  // !_WIN32
     48 
     49 #include <pthread.h>
     50 
     51 #endif  // _WIN32
     52 
     53 struct WebPWorkerImpl {
     54   pthread_mutex_t mutex_;
     55   pthread_cond_t  condition_;
     56   pthread_t       thread_;
     57 };
     58 
     59 #if defined(_WIN32)
     60 
     61 //------------------------------------------------------------------------------
     62 // simplistic pthread emulation layer
     63 
     64 #include <process.h>
     65 
     66 // _beginthreadex requires __stdcall
     67 #define THREADFN unsigned int __stdcall
     68 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
     69 
     70 #if _WIN32_WINNT >= 0x0501  // Windows XP or greater
     71 #define WaitForSingleObject(obj, timeout) \
     72   WaitForSingleObjectEx(obj, timeout, FALSE /*bAlertable*/)
     73 #endif
     74 
     75 static int pthread_create(pthread_t* const thread, const void* attr,
     76                           unsigned int (__stdcall *start)(void*), void* arg) {
     77   (void)attr;
     78 #ifdef USE_CREATE_THREAD
     79   *thread = CreateThread(NULL,   /* lpThreadAttributes */
     80                          0,      /* dwStackSize */
     81                          start,
     82                          arg,
     83                          0,      /* dwStackSize */
     84                          NULL);  /* lpThreadId */
     85 #else
     86   *thread = (pthread_t)_beginthreadex(NULL,   /* void *security */
     87                                       0,      /* unsigned stack_size */
     88                                       start,
     89                                       arg,
     90                                       0,      /* unsigned initflag */
     91                                       NULL);  /* unsigned *thrdaddr */
     92 #endif
     93   if (*thread == NULL) return 1;
     94   SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);
     95   return 0;
     96 }
     97 
     98 static int pthread_join(pthread_t thread, void** value_ptr) {
     99   (void)value_ptr;
    100   return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||
    101           CloseHandle(thread) == 0);
    102 }
    103 
    104 // Mutex
    105 static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {
    106   (void)mutexattr;
    107 #if _WIN32_WINNT >= 0x0600  // Windows Vista / Server 2008 or greater
    108   InitializeCriticalSectionEx(mutex, 0 /*dwSpinCount*/, 0 /*Flags*/);
    109 #else
    110   InitializeCriticalSection(mutex);
    111 #endif
    112   return 0;
    113 }
    114 
    115 static int pthread_mutex_lock(pthread_mutex_t* const mutex) {
    116   EnterCriticalSection(mutex);
    117   return 0;
    118 }
    119 
    120 static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {
    121   LeaveCriticalSection(mutex);
    122   return 0;
    123 }
    124 
    125 static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {
    126   DeleteCriticalSection(mutex);
    127   return 0;
    128 }
    129 
    130 // Condition
    131 static int pthread_cond_destroy(pthread_cond_t* const condition) {
    132   int ok = 1;
    133 #ifdef USE_WINDOWS_CONDITION_VARIABLE
    134   (void)condition;
    135 #else
    136   ok &= (CloseHandle(condition->waiting_sem_) != 0);
    137   ok &= (CloseHandle(condition->received_sem_) != 0);
    138   ok &= (CloseHandle(condition->signal_event_) != 0);
    139 #endif
    140   return !ok;
    141 }
    142 
    143 static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {
    144   (void)cond_attr;
    145 #ifdef USE_WINDOWS_CONDITION_VARIABLE
    146   InitializeConditionVariable(condition);
    147 #else
    148   condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
    149   condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
    150   condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL);
    151   if (condition->waiting_sem_ == NULL ||
    152       condition->received_sem_ == NULL ||
    153       condition->signal_event_ == NULL) {
    154     pthread_cond_destroy(condition);
    155     return 1;
    156   }
    157 #endif
    158   return 0;
    159 }
    160 
    161 static int pthread_cond_signal(pthread_cond_t* const condition) {
    162   int ok = 1;
    163 #ifdef USE_WINDOWS_CONDITION_VARIABLE
    164   WakeConditionVariable(condition);
    165 #else
    166   if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) {
    167     // a thread is waiting in pthread_cond_wait: allow it to be notified
    168     ok = SetEvent(condition->signal_event_);
    169     // wait until the event is consumed so the signaler cannot consume
    170     // the event via its own pthread_cond_wait.
    171     ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) !=
    172            WAIT_OBJECT_0);
    173   }
    174 #endif
    175   return !ok;
    176 }
    177 
    178 static int pthread_cond_wait(pthread_cond_t* const condition,
    179                              pthread_mutex_t* const mutex) {
    180   int ok;
    181 #ifdef USE_WINDOWS_CONDITION_VARIABLE
    182   ok = SleepConditionVariableCS(condition, mutex, INFINITE);
    183 #else
    184   // note that there is a consumer available so the signal isn't dropped in
    185   // pthread_cond_signal
    186   if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL))
    187     return 1;
    188   // now unlock the mutex so pthread_cond_signal may be issued
    189   pthread_mutex_unlock(mutex);
    190   ok = (WaitForSingleObject(condition->signal_event_, INFINITE) ==
    191         WAIT_OBJECT_0);
    192   ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL);
    193   pthread_mutex_lock(mutex);
    194 #endif
    195   return !ok;
    196 }
    197 
    198 #else  // !_WIN32
    199 # define THREADFN void*
    200 # define THREAD_RETURN(val) val
    201 #endif  // _WIN32
    202 
    203 //------------------------------------------------------------------------------
    204 
    205 static void Execute(WebPWorker* const worker);  // Forward declaration.
    206 
    207 static THREADFN ThreadLoop(void* ptr) {
    208   WebPWorker* const worker = (WebPWorker*)ptr;
    209   int done = 0;
    210   while (!done) {
    211     pthread_mutex_lock(&worker->impl_->mutex_);
    212     while (worker->status_ == OK) {   // wait in idling mode
    213       pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_);
    214     }
    215     if (worker->status_ == WORK) {
    216       Execute(worker);
    217       worker->status_ = OK;
    218     } else if (worker->status_ == NOT_OK) {   // finish the worker
    219       done = 1;
    220     }
    221     // signal to the main thread that we're done (for Sync())
    222     pthread_cond_signal(&worker->impl_->condition_);
    223     pthread_mutex_unlock(&worker->impl_->mutex_);
    224   }
    225   return THREAD_RETURN(NULL);    // Thread is finished
    226 }
    227 
    228 // main thread state control
    229 static void ChangeState(WebPWorker* const worker,
    230                         WebPWorkerStatus new_status) {
    231   // No-op when attempting to change state on a thread that didn't come up.
    232   // Checking status_ without acquiring the lock first would result in a data
    233   // race.
    234   if (worker->impl_ == NULL) return;
    235 
    236   pthread_mutex_lock(&worker->impl_->mutex_);
    237   if (worker->status_ >= OK) {
    238     // wait for the worker to finish
    239     while (worker->status_ != OK) {
    240       pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_);
    241     }
    242     // assign new status and release the working thread if needed
    243     if (new_status != OK) {
    244       worker->status_ = new_status;
    245       pthread_cond_signal(&worker->impl_->condition_);
    246     }
    247   }
    248   pthread_mutex_unlock(&worker->impl_->mutex_);
    249 }
    250 
    251 #endif  // WEBP_USE_THREAD
    252 
    253 //------------------------------------------------------------------------------
    254 
    255 static void Init(WebPWorker* const worker) {
    256   memset(worker, 0, sizeof(*worker));
    257   worker->status_ = NOT_OK;
    258 }
    259 
    260 static int Sync(WebPWorker* const worker) {
    261 #ifdef WEBP_USE_THREAD
    262   ChangeState(worker, OK);
    263 #endif
    264   assert(worker->status_ <= OK);
    265   return !worker->had_error;
    266 }
    267 
    268 static int Reset(WebPWorker* const worker) {
    269   int ok = 1;
    270   worker->had_error = 0;
    271   if (worker->status_ < OK) {
    272 #ifdef WEBP_USE_THREAD
    273     worker->impl_ = (WebPWorkerImpl*)WebPSafeCalloc(1, sizeof(*worker->impl_));
    274     if (worker->impl_ == NULL) {
    275       return 0;
    276     }
    277     if (pthread_mutex_init(&worker->impl_->mutex_, NULL)) {
    278       goto Error;
    279     }
    280     if (pthread_cond_init(&worker->impl_->condition_, NULL)) {
    281       pthread_mutex_destroy(&worker->impl_->mutex_);
    282       goto Error;
    283     }
    284     pthread_mutex_lock(&worker->impl_->mutex_);
    285     ok = !pthread_create(&worker->impl_->thread_, NULL, ThreadLoop, worker);
    286     if (ok) worker->status_ = OK;
    287     pthread_mutex_unlock(&worker->impl_->mutex_);
    288     if (!ok) {
    289       pthread_mutex_destroy(&worker->impl_->mutex_);
    290       pthread_cond_destroy(&worker->impl_->condition_);
    291  Error:
    292       WebPSafeFree(worker->impl_);
    293       worker->impl_ = NULL;
    294       return 0;
    295     }
    296 #else
    297     worker->status_ = OK;
    298 #endif
    299   } else if (worker->status_ > OK) {
    300     ok = Sync(worker);
    301   }
    302   assert(!ok || (worker->status_ == OK));
    303   return ok;
    304 }
    305 
    306 static void Execute(WebPWorker* const worker) {
    307   if (worker->hook != NULL) {
    308     worker->had_error |= !worker->hook(worker->data1, worker->data2);
    309   }
    310 }
    311 
    312 static void Launch(WebPWorker* const worker) {
    313 #ifdef WEBP_USE_THREAD
    314   ChangeState(worker, WORK);
    315 #else
    316   Execute(worker);
    317 #endif
    318 }
    319 
    320 static void End(WebPWorker* const worker) {
    321 #ifdef WEBP_USE_THREAD
    322   if (worker->impl_ != NULL) {
    323     ChangeState(worker, NOT_OK);
    324     pthread_join(worker->impl_->thread_, NULL);
    325     pthread_mutex_destroy(&worker->impl_->mutex_);
    326     pthread_cond_destroy(&worker->impl_->condition_);
    327     WebPSafeFree(worker->impl_);
    328     worker->impl_ = NULL;
    329   }
    330 #else
    331   worker->status_ = NOT_OK;
    332   assert(worker->impl_ == NULL);
    333 #endif
    334   assert(worker->status_ == NOT_OK);
    335 }
    336 
    337 //------------------------------------------------------------------------------
    338 
    339 static WebPWorkerInterface g_worker_interface = {
    340   Init, Reset, Sync, Launch, Execute, End
    341 };
    342 
    343 int WebPSetWorkerInterface(const WebPWorkerInterface* const winterface) {
    344   if (winterface == NULL ||
    345       winterface->Init == NULL || winterface->Reset == NULL ||
    346       winterface->Sync == NULL || winterface->Launch == NULL ||
    347       winterface->Execute == NULL || winterface->End == NULL) {
    348     return 0;
    349   }
    350   g_worker_interface = *winterface;
    351   return 1;
    352 }
    353 
    354 const WebPWorkerInterface* WebPGetWorkerInterface(void) {
    355   return &g_worker_interface;
    356 }
    357 
    358 //------------------------------------------------------------------------------
    359