Home | History | Annotate | Download | only in utils
      1 // Copyright 2011 Google Inc. All Rights Reserved.
      2 //
      3 // Use of this source code is governed by a BSD-style license
      4 // that can be found in the COPYING file in the root of the source
      5 // tree. An additional intellectual property rights grant can be found
      6 // in the file PATENTS. All contributing project authors may
      7 // be found in the AUTHORS file in the root of the source tree.
      8 // -----------------------------------------------------------------------------
      9 //
     10 // Multi-threaded worker
     11 //
     12 // Author: Skal (pascal.massimino (at) gmail.com)
     13 
     14 #include <assert.h>
     15 #include <string.h>   // for memset()
     16 #include "./thread_utils.h"
     17 #include "./utils.h"
     18 
     19 #ifdef WEBP_USE_THREAD
     20 
     21 #if defined(_WIN32)
     22 
     23 #include <windows.h>
     24 typedef HANDLE pthread_t;
     25 typedef CRITICAL_SECTION pthread_mutex_t;
     26 
     27 #if _WIN32_WINNT >= 0x0600  // Windows Vista / Server 2008 or greater
     28 #define USE_WINDOWS_CONDITION_VARIABLE
     29 typedef CONDITION_VARIABLE pthread_cond_t;
     30 #else
     31 typedef struct {
     32   HANDLE waiting_sem_;
     33   HANDLE received_sem_;
     34   HANDLE signal_event_;
     35 } pthread_cond_t;
     36 #endif  // _WIN32_WINNT >= 0x600
     37 
     38 #ifndef WINAPI_FAMILY_PARTITION
     39 #define WINAPI_PARTITION_DESKTOP 1
     40 #define WINAPI_FAMILY_PARTITION(x) x
     41 #endif
     42 
     43 #if !WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP)
     44 #define USE_CREATE_THREAD
     45 #endif
     46 
     47 #else  // !_WIN32
     48 
     49 #include <pthread.h>
     50 
     51 #endif  // _WIN32
     52 
     53 struct WebPWorkerImpl {
     54   pthread_mutex_t mutex_;
     55   pthread_cond_t  condition_;
     56   pthread_t       thread_;
     57 };
     58 
     59 #if defined(_WIN32)
     60 
     61 //------------------------------------------------------------------------------
     62 // simplistic pthread emulation layer
     63 
     64 #include <process.h>
     65 
     66 // _beginthreadex requires __stdcall
     67 #define THREADFN unsigned int __stdcall
     68 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
     69 
     70 #if _WIN32_WINNT >= 0x0501  // Windows XP or greater
     71 #define WaitForSingleObject(obj, timeout) \
     72   WaitForSingleObjectEx(obj, timeout, FALSE /*bAlertable*/)
     73 #endif
     74 
     75 static int pthread_create(pthread_t* const thread, const void* attr,
     76                           unsigned int (__stdcall *start)(void*), void* arg) {
     77   (void)attr;
     78 #ifdef USE_CREATE_THREAD
     79   *thread = CreateThread(NULL,   /* lpThreadAttributes */
     80                          0,      /* dwStackSize */
     81                          start,
     82                          arg,
     83                          0,      /* dwStackSize */
     84                          NULL);  /* lpThreadId */
     85 #else
     86   *thread = (pthread_t)_beginthreadex(NULL,   /* void *security */
     87                                       0,      /* unsigned stack_size */
     88                                       start,
     89                                       arg,
     90                                       0,      /* unsigned initflag */
     91                                       NULL);  /* unsigned *thrdaddr */
     92 #endif
     93   if (*thread == NULL) return 1;
     94   SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);
     95   return 0;
     96 }
     97 
     98 static int pthread_join(pthread_t thread, void** value_ptr) {
     99   (void)value_ptr;
    100   return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||
    101           CloseHandle(thread) == 0);
    102 }
    103 
    104 // Mutex
    105 static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {
    106   (void)mutexattr;
    107 #if _WIN32_WINNT >= 0x0600  // Windows Vista / Server 2008 or greater
    108   InitializeCriticalSectionEx(mutex, 0 /*dwSpinCount*/, 0 /*Flags*/);
    109 #else
    110   InitializeCriticalSection(mutex);
    111 #endif
    112   return 0;
    113 }
    114 
    115 static int pthread_mutex_lock(pthread_mutex_t* const mutex) {
    116   EnterCriticalSection(mutex);
    117   return 0;
    118 }
    119 
    120 static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {
    121   LeaveCriticalSection(mutex);
    122   return 0;
    123 }
    124 
    125 static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {
    126   DeleteCriticalSection(mutex);
    127   return 0;
    128 }
    129 
    130 // Condition
    131 static int pthread_cond_destroy(pthread_cond_t* const condition) {
    132   int ok = 1;
    133 #ifdef USE_WINDOWS_CONDITION_VARIABLE
    134   (void)condition;
    135 #else
    136   ok &= (CloseHandle(condition->waiting_sem_) != 0);
    137   ok &= (CloseHandle(condition->received_sem_) != 0);
    138   ok &= (CloseHandle(condition->signal_event_) != 0);
    139 #endif
    140   return !ok;
    141 }
    142 
    143 static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {
    144   (void)cond_attr;
    145 #ifdef USE_WINDOWS_CONDITION_VARIABLE
    146   InitializeConditionVariable(condition);
    147 #else
    148   condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
    149   condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
    150   condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL);
    151   if (condition->waiting_sem_ == NULL ||
    152       condition->received_sem_ == NULL ||
    153       condition->signal_event_ == NULL) {
    154     pthread_cond_destroy(condition);
    155     return 1;
    156   }
    157 #endif
    158   return 0;
    159 }
    160 
    161 static int pthread_cond_signal(pthread_cond_t* const condition) {
    162   int ok = 1;
    163 #ifdef USE_WINDOWS_CONDITION_VARIABLE
    164   WakeConditionVariable(condition);
    165 #else
    166   if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) {
    167     // a thread is waiting in pthread_cond_wait: allow it to be notified
    168     ok = SetEvent(condition->signal_event_);
    169     // wait until the event is consumed so the signaler cannot consume
    170     // the event via its own pthread_cond_wait.
    171     ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) !=
    172            WAIT_OBJECT_0);
    173   }
    174 #endif
    175   return !ok;
    176 }
    177 
    178 static int pthread_cond_wait(pthread_cond_t* const condition,
    179                              pthread_mutex_t* const mutex) {
    180   int ok;
    181 #ifdef USE_WINDOWS_CONDITION_VARIABLE
    182   ok = SleepConditionVariableCS(condition, mutex, INFINITE);
    183 #else
    184   // note that there is a consumer available so the signal isn't dropped in
    185   // pthread_cond_signal
    186   if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL)) return 1;
    187   // now unlock the mutex so pthread_cond_signal may be issued
    188   pthread_mutex_unlock(mutex);
    189   ok = (WaitForSingleObject(condition->signal_event_, INFINITE) ==
    190         WAIT_OBJECT_0);
    191   ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL);
    192   pthread_mutex_lock(mutex);
    193 #endif
    194   return !ok;
    195 }
    196 
    197 #else  // !_WIN32
    198 # define THREADFN void*
    199 # define THREAD_RETURN(val) val
    200 #endif  // _WIN32
    201 
    202 //------------------------------------------------------------------------------
    203 
    204 static void Execute(WebPWorker* const worker);  // Forward declaration.
    205 
    206 static THREADFN ThreadLoop(void* ptr) {
    207   WebPWorker* const worker = (WebPWorker*)ptr;
    208   int done = 0;
    209   while (!done) {
    210     pthread_mutex_lock(&worker->impl_->mutex_);
    211     while (worker->status_ == OK) {   // wait in idling mode
    212       pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_);
    213     }
    214     if (worker->status_ == WORK) {
    215       Execute(worker);
    216       worker->status_ = OK;
    217     } else if (worker->status_ == NOT_OK) {   // finish the worker
    218       done = 1;
    219     }
    220     // signal to the main thread that we're done (for Sync())
    221     pthread_cond_signal(&worker->impl_->condition_);
    222     pthread_mutex_unlock(&worker->impl_->mutex_);
    223   }
    224   return THREAD_RETURN(NULL);    // Thread is finished
    225 }
    226 
    227 // main thread state control
    228 static void ChangeState(WebPWorker* const worker, WebPWorkerStatus new_status) {
    229   // No-op when attempting to change state on a thread that didn't come up.
    230   // Checking status_ without acquiring the lock first would result in a data
    231   // race.
    232   if (worker->impl_ == NULL) return;
    233 
    234   pthread_mutex_lock(&worker->impl_->mutex_);
    235   if (worker->status_ >= OK) {
    236     // wait for the worker to finish
    237     while (worker->status_ != OK) {
    238       pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_);
    239     }
    240     // assign new status and release the working thread if needed
    241     if (new_status != OK) {
    242       worker->status_ = new_status;
    243       pthread_cond_signal(&worker->impl_->condition_);
    244     }
    245   }
    246   pthread_mutex_unlock(&worker->impl_->mutex_);
    247 }
    248 
    249 #endif  // WEBP_USE_THREAD
    250 
    251 //------------------------------------------------------------------------------
    252 
    253 static void Init(WebPWorker* const worker) {
    254   memset(worker, 0, sizeof(*worker));
    255   worker->status_ = NOT_OK;
    256 }
    257 
    258 static int Sync(WebPWorker* const worker) {
    259 #ifdef WEBP_USE_THREAD
    260   ChangeState(worker, OK);
    261 #endif
    262   assert(worker->status_ <= OK);
    263   return !worker->had_error;
    264 }
    265 
    266 static int Reset(WebPWorker* const worker) {
    267   int ok = 1;
    268   worker->had_error = 0;
    269   if (worker->status_ < OK) {
    270 #ifdef WEBP_USE_THREAD
    271     worker->impl_ = (WebPWorkerImpl*)WebPSafeCalloc(1, sizeof(*worker->impl_));
    272     if (worker->impl_ == NULL) {
    273       return 0;
    274     }
    275     if (pthread_mutex_init(&worker->impl_->mutex_, NULL)) {
    276       goto Error;
    277     }
    278     if (pthread_cond_init(&worker->impl_->condition_, NULL)) {
    279       pthread_mutex_destroy(&worker->impl_->mutex_);
    280       goto Error;
    281     }
    282     pthread_mutex_lock(&worker->impl_->mutex_);
    283     ok = !pthread_create(&worker->impl_->thread_, NULL, ThreadLoop, worker);
    284     if (ok) worker->status_ = OK;
    285     pthread_mutex_unlock(&worker->impl_->mutex_);
    286     if (!ok) {
    287       pthread_mutex_destroy(&worker->impl_->mutex_);
    288       pthread_cond_destroy(&worker->impl_->condition_);
    289  Error:
    290       WebPSafeFree(worker->impl_);
    291       worker->impl_ = NULL;
    292       return 0;
    293     }
    294 #else
    295     worker->status_ = OK;
    296 #endif
    297   } else if (worker->status_ > OK) {
    298     ok = Sync(worker);
    299   }
    300   assert(!ok || (worker->status_ == OK));
    301   return ok;
    302 }
    303 
    304 static void Execute(WebPWorker* const worker) {
    305   if (worker->hook != NULL) {
    306     worker->had_error |= !worker->hook(worker->data1, worker->data2);
    307   }
    308 }
    309 
    310 static void Launch(WebPWorker* const worker) {
    311 #ifdef WEBP_USE_THREAD
    312   ChangeState(worker, WORK);
    313 #else
    314   Execute(worker);
    315 #endif
    316 }
    317 
    318 static void End(WebPWorker* const worker) {
    319 #ifdef WEBP_USE_THREAD
    320   if (worker->impl_ != NULL) {
    321     ChangeState(worker, NOT_OK);
    322     pthread_join(worker->impl_->thread_, NULL);
    323     pthread_mutex_destroy(&worker->impl_->mutex_);
    324     pthread_cond_destroy(&worker->impl_->condition_);
    325     WebPSafeFree(worker->impl_);
    326     worker->impl_ = NULL;
    327   }
    328 #else
    329   worker->status_ = NOT_OK;
    330   assert(worker->impl_ == NULL);
    331 #endif
    332   assert(worker->status_ == NOT_OK);
    333 }
    334 
    335 //------------------------------------------------------------------------------
    336 
    337 static WebPWorkerInterface g_worker_interface = {
    338   Init, Reset, Sync, Launch, Execute, End
    339 };
    340 
    341 int WebPSetWorkerInterface(const WebPWorkerInterface* const winterface) {
    342   if (winterface == NULL ||
    343       winterface->Init == NULL || winterface->Reset == NULL ||
    344       winterface->Sync == NULL || winterface->Launch == NULL ||
    345       winterface->Execute == NULL || winterface->End == NULL) {
    346     return 0;
    347   }
    348   g_worker_interface = *winterface;
    349   return 1;
    350 }
    351 
    352 const WebPWorkerInterface* WebPGetWorkerInterface(void) {
    353   return &g_worker_interface;
    354 }
    355 
    356 //------------------------------------------------------------------------------
    357