Home | History | Annotate | Download | only in utils
      1 // Copyright 2011 Google Inc. All Rights Reserved.
      2 //
      3 // Use of this source code is governed by a BSD-style license
      4 // that can be found in the COPYING file in the root of the source
      5 // tree. An additional intellectual property rights grant can be found
      6 // in the file PATENTS. All contributing project authors may
      7 // be found in the AUTHORS file in the root of the source tree.
      8 // -----------------------------------------------------------------------------
      9 //
     10 // Multi-threaded worker
     11 //
     12 // Author: Skal (pascal.massimino (at) gmail.com)
     13 
     14 #include <assert.h>
     15 #include <string.h>   // for memset()
     16 #include "src/utils/thread_utils.h"
     17 #include "src/utils/utils.h"
     18 
     19 #ifdef WEBP_USE_THREAD
     20 
     21 #if defined(_WIN32)
     22 
     23 #include <windows.h>
     24 typedef HANDLE pthread_t;
     25 typedef CRITICAL_SECTION pthread_mutex_t;
     26 
     27 #if _WIN32_WINNT >= 0x0600  // Windows Vista / Server 2008 or greater
     28 #define USE_WINDOWS_CONDITION_VARIABLE
     29 typedef CONDITION_VARIABLE pthread_cond_t;
     30 #else
     31 typedef struct {
     32   HANDLE waiting_sem_;
     33   HANDLE received_sem_;
     34   HANDLE signal_event_;
     35 } pthread_cond_t;
     36 #endif  // _WIN32_WINNT >= 0x600
     37 
     38 #ifndef WINAPI_FAMILY_PARTITION
     39 #define WINAPI_PARTITION_DESKTOP 1
     40 #define WINAPI_FAMILY_PARTITION(x) x
     41 #endif
     42 
     43 #if !WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP)
     44 #define USE_CREATE_THREAD
     45 #endif
     46 
     47 #else  // !_WIN32
     48 
     49 #include <pthread.h>
     50 
     51 #endif  // _WIN32
     52 
     53 typedef struct {
     54   pthread_mutex_t mutex_;
     55   pthread_cond_t  condition_;
     56   pthread_t       thread_;
     57 } WebPWorkerImpl;
     58 
     59 #if defined(_WIN32)
     60 
     61 //------------------------------------------------------------------------------
     62 // simplistic pthread emulation layer
     63 
     64 #include <process.h>
     65 
     66 // _beginthreadex requires __stdcall
     67 #define THREADFN unsigned int __stdcall
     68 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
     69 
     70 #if _WIN32_WINNT >= 0x0501  // Windows XP or greater
     71 #define WaitForSingleObject(obj, timeout) \
     72   WaitForSingleObjectEx(obj, timeout, FALSE /*bAlertable*/)
     73 #endif
     74 
     75 static int pthread_create(pthread_t* const thread, const void* attr,
     76                           unsigned int (__stdcall *start)(void*), void* arg) {
     77   (void)attr;
     78 #ifdef USE_CREATE_THREAD
     79   *thread = CreateThread(NULL,   /* lpThreadAttributes */
     80                          0,      /* dwStackSize */
     81                          start,
     82                          arg,
     83                          0,      /* dwStackSize */
     84                          NULL);  /* lpThreadId */
     85 #else
     86   *thread = (pthread_t)_beginthreadex(NULL,   /* void *security */
     87                                       0,      /* unsigned stack_size */
     88                                       start,
     89                                       arg,
     90                                       0,      /* unsigned initflag */
     91                                       NULL);  /* unsigned *thrdaddr */
     92 #endif
     93   if (*thread == NULL) return 1;
     94   SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);
     95   return 0;
     96 }
     97 
     98 static int pthread_join(pthread_t thread, void** value_ptr) {
     99   (void)value_ptr;
    100   return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||
    101           CloseHandle(thread) == 0);
    102 }
    103 
    104 // Mutex
    105 static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {
    106   (void)mutexattr;
    107 #if _WIN32_WINNT >= 0x0600  // Windows Vista / Server 2008 or greater
    108   InitializeCriticalSectionEx(mutex, 0 /*dwSpinCount*/, 0 /*Flags*/);
    109 #else
    110   InitializeCriticalSection(mutex);
    111 #endif
    112   return 0;
    113 }
    114 
    115 static int pthread_mutex_lock(pthread_mutex_t* const mutex) {
    116   EnterCriticalSection(mutex);
    117   return 0;
    118 }
    119 
    120 static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {
    121   LeaveCriticalSection(mutex);
    122   return 0;
    123 }
    124 
    125 static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {
    126   DeleteCriticalSection(mutex);
    127   return 0;
    128 }
    129 
    130 // Condition
    131 static int pthread_cond_destroy(pthread_cond_t* const condition) {
    132   int ok = 1;
    133 #ifdef USE_WINDOWS_CONDITION_VARIABLE
    134   (void)condition;
    135 #else
    136   ok &= (CloseHandle(condition->waiting_sem_) != 0);
    137   ok &= (CloseHandle(condition->received_sem_) != 0);
    138   ok &= (CloseHandle(condition->signal_event_) != 0);
    139 #endif
    140   return !ok;
    141 }
    142 
    143 static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {
    144   (void)cond_attr;
    145 #ifdef USE_WINDOWS_CONDITION_VARIABLE
    146   InitializeConditionVariable(condition);
    147 #else
    148   condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
    149   condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
    150   condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL);
    151   if (condition->waiting_sem_ == NULL ||
    152       condition->received_sem_ == NULL ||
    153       condition->signal_event_ == NULL) {
    154     pthread_cond_destroy(condition);
    155     return 1;
    156   }
    157 #endif
    158   return 0;
    159 }
    160 
    161 static int pthread_cond_signal(pthread_cond_t* const condition) {
    162   int ok = 1;
    163 #ifdef USE_WINDOWS_CONDITION_VARIABLE
    164   WakeConditionVariable(condition);
    165 #else
    166   if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) {
    167     // a thread is waiting in pthread_cond_wait: allow it to be notified
    168     ok = SetEvent(condition->signal_event_);
    169     // wait until the event is consumed so the signaler cannot consume
    170     // the event via its own pthread_cond_wait.
    171     ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) !=
    172            WAIT_OBJECT_0);
    173   }
    174 #endif
    175   return !ok;
    176 }
    177 
    178 static int pthread_cond_wait(pthread_cond_t* const condition,
    179                              pthread_mutex_t* const mutex) {
    180   int ok;
    181 #ifdef USE_WINDOWS_CONDITION_VARIABLE
    182   ok = SleepConditionVariableCS(condition, mutex, INFINITE);
    183 #else
    184   // note that there is a consumer available so the signal isn't dropped in
    185   // pthread_cond_signal
    186   if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL)) return 1;
    187   // now unlock the mutex so pthread_cond_signal may be issued
    188   pthread_mutex_unlock(mutex);
    189   ok = (WaitForSingleObject(condition->signal_event_, INFINITE) ==
    190         WAIT_OBJECT_0);
    191   ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL);
    192   pthread_mutex_lock(mutex);
    193 #endif
    194   return !ok;
    195 }
    196 
    197 #else  // !_WIN32
    198 # define THREADFN void*
    199 # define THREAD_RETURN(val) val
    200 #endif  // _WIN32
    201 
    202 //------------------------------------------------------------------------------
    203 
    204 static THREADFN ThreadLoop(void* ptr) {
    205   WebPWorker* const worker = (WebPWorker*)ptr;
    206   WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_;
    207   int done = 0;
    208   while (!done) {
    209     pthread_mutex_lock(&impl->mutex_);
    210     while (worker->status_ == OK) {   // wait in idling mode
    211       pthread_cond_wait(&impl->condition_, &impl->mutex_);
    212     }
    213     if (worker->status_ == WORK) {
    214       WebPGetWorkerInterface()->Execute(worker);
    215       worker->status_ = OK;
    216     } else if (worker->status_ == NOT_OK) {   // finish the worker
    217       done = 1;
    218     }
    219     // signal to the main thread that we're done (for Sync())
    220     pthread_cond_signal(&impl->condition_);
    221     pthread_mutex_unlock(&impl->mutex_);
    222   }
    223   return THREAD_RETURN(NULL);    // Thread is finished
    224 }
    225 
    226 // main thread state control
    227 static void ChangeState(WebPWorker* const worker, WebPWorkerStatus new_status) {
    228   // No-op when attempting to change state on a thread that didn't come up.
    229   // Checking status_ without acquiring the lock first would result in a data
    230   // race.
    231   WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_;
    232   if (impl == NULL) return;
    233 
    234   pthread_mutex_lock(&impl->mutex_);
    235   if (worker->status_ >= OK) {
    236     // wait for the worker to finish
    237     while (worker->status_ != OK) {
    238       pthread_cond_wait(&impl->condition_, &impl->mutex_);
    239     }
    240     // assign new status and release the working thread if needed
    241     if (new_status != OK) {
    242       worker->status_ = new_status;
    243       pthread_cond_signal(&impl->condition_);
    244     }
    245   }
    246   pthread_mutex_unlock(&impl->mutex_);
    247 }
    248 
    249 #endif  // WEBP_USE_THREAD
    250 
    251 //------------------------------------------------------------------------------
    252 
    253 static void Init(WebPWorker* const worker) {
    254   memset(worker, 0, sizeof(*worker));
    255   worker->status_ = NOT_OK;
    256 }
    257 
    258 static int Sync(WebPWorker* const worker) {
    259 #ifdef WEBP_USE_THREAD
    260   ChangeState(worker, OK);
    261 #endif
    262   assert(worker->status_ <= OK);
    263   return !worker->had_error;
    264 }
    265 
    266 static int Reset(WebPWorker* const worker) {
    267   int ok = 1;
    268   worker->had_error = 0;
    269   if (worker->status_ < OK) {
    270 #ifdef WEBP_USE_THREAD
    271     WebPWorkerImpl* const impl =
    272         (WebPWorkerImpl*)WebPSafeCalloc(1, sizeof(WebPWorkerImpl));
    273     worker->impl_ = (void*)impl;
    274     if (worker->impl_ == NULL) {
    275       return 0;
    276     }
    277     if (pthread_mutex_init(&impl->mutex_, NULL)) {
    278       goto Error;
    279     }
    280     if (pthread_cond_init(&impl->condition_, NULL)) {
    281       pthread_mutex_destroy(&impl->mutex_);
    282       goto Error;
    283     }
    284     pthread_mutex_lock(&impl->mutex_);
    285     ok = !pthread_create(&impl->thread_, NULL, ThreadLoop, worker);
    286     if (ok) worker->status_ = OK;
    287     pthread_mutex_unlock(&impl->mutex_);
    288     if (!ok) {
    289       pthread_mutex_destroy(&impl->mutex_);
    290       pthread_cond_destroy(&impl->condition_);
    291  Error:
    292       WebPSafeFree(impl);
    293       worker->impl_ = NULL;
    294       return 0;
    295     }
    296 #else
    297     worker->status_ = OK;
    298 #endif
    299   } else if (worker->status_ > OK) {
    300     ok = Sync(worker);
    301   }
    302   assert(!ok || (worker->status_ == OK));
    303   return ok;
    304 }
    305 
    306 static void Execute(WebPWorker* const worker) {
    307   if (worker->hook != NULL) {
    308     worker->had_error |= !worker->hook(worker->data1, worker->data2);
    309   }
    310 }
    311 
    312 static void Launch(WebPWorker* const worker) {
    313 #ifdef WEBP_USE_THREAD
    314   ChangeState(worker, WORK);
    315 #else
    316   Execute(worker);
    317 #endif
    318 }
    319 
    320 static void End(WebPWorker* const worker) {
    321 #ifdef WEBP_USE_THREAD
    322   if (worker->impl_ != NULL) {
    323     WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_;
    324     ChangeState(worker, NOT_OK);
    325     pthread_join(impl->thread_, NULL);
    326     pthread_mutex_destroy(&impl->mutex_);
    327     pthread_cond_destroy(&impl->condition_);
    328     WebPSafeFree(impl);
    329     worker->impl_ = NULL;
    330   }
    331 #else
    332   worker->status_ = NOT_OK;
    333   assert(worker->impl_ == NULL);
    334 #endif
    335   assert(worker->status_ == NOT_OK);
    336 }
    337 
    338 //------------------------------------------------------------------------------
    339 
    340 static WebPWorkerInterface g_worker_interface = {
    341   Init, Reset, Sync, Launch, Execute, End
    342 };
    343 
    344 int WebPSetWorkerInterface(const WebPWorkerInterface* const winterface) {
    345   if (winterface == NULL ||
    346       winterface->Init == NULL || winterface->Reset == NULL ||
    347       winterface->Sync == NULL || winterface->Launch == NULL ||
    348       winterface->Execute == NULL || winterface->End == NULL) {
    349     return 0;
    350   }
    351   g_worker_interface = *winterface;
    352   return 1;
    353 }
    354 
    355 const WebPWorkerInterface* WebPGetWorkerInterface(void) {
    356   return &g_worker_interface;
    357 }
    358 
    359 //------------------------------------------------------------------------------
    360