Home | History | Annotate | Download | only in utils
      1 // Copyright 2011 Google Inc. All Rights Reserved.
      2 //
      3 // Use of this source code is governed by a BSD-style license
      4 // that can be found in the COPYING file in the root of the source
      5 // tree. An additional intellectual property rights grant can be found
      6 // in the file PATENTS. All contributing project authors may
      7 // be found in the AUTHORS file in the root of the source tree.
      8 // -----------------------------------------------------------------------------
      9 //
     10 // Multi-threaded worker
     11 //
     12 // Author: Skal (pascal.massimino (at) gmail.com)
     13 
     14 #include <assert.h>
     15 #include <string.h>   // for memset()
     16 #include "./thread.h"
     17 #include "./utils.h"
     18 
     19 #ifdef WEBP_USE_THREAD
     20 
     21 #if defined(_WIN32)
     22 
     23 #include <windows.h>
     24 typedef HANDLE pthread_t;
     25 typedef CRITICAL_SECTION pthread_mutex_t;
     26 typedef struct {
     27   HANDLE waiting_sem_;
     28   HANDLE received_sem_;
     29   HANDLE signal_event_;
     30 } pthread_cond_t;
     31 
     32 #else  // !_WIN32
     33 
     34 #include <pthread.h>
     35 
     36 #endif  // _WIN32
     37 
     38 struct WebPWorkerImpl {
     39   pthread_mutex_t mutex_;
     40   pthread_cond_t  condition_;
     41   pthread_t       thread_;
     42 };
     43 
     44 #if defined(_WIN32)
     45 
     46 //------------------------------------------------------------------------------
     47 // simplistic pthread emulation layer
     48 
     49 #include <process.h>
     50 
     51 // _beginthreadex requires __stdcall
     52 #define THREADFN unsigned int __stdcall
     53 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
     54 
     55 static int pthread_create(pthread_t* const thread, const void* attr,
     56                           unsigned int (__stdcall *start)(void*), void* arg) {
     57   (void)attr;
     58   *thread = (pthread_t)_beginthreadex(NULL,   /* void *security */
     59                                       0,      /* unsigned stack_size */
     60                                       start,
     61                                       arg,
     62                                       0,      /* unsigned initflag */
     63                                       NULL);  /* unsigned *thrdaddr */
     64   if (*thread == NULL) return 1;
     65   SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);
     66   return 0;
     67 }
     68 
     69 static int pthread_join(pthread_t thread, void** value_ptr) {
     70   (void)value_ptr;
     71   return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||
     72           CloseHandle(thread) == 0);
     73 }
     74 
     75 // Mutex
     76 static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {
     77   (void)mutexattr;
     78   InitializeCriticalSection(mutex);
     79   return 0;
     80 }
     81 
     82 static int pthread_mutex_lock(pthread_mutex_t* const mutex) {
     83   EnterCriticalSection(mutex);
     84   return 0;
     85 }
     86 
     87 static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {
     88   LeaveCriticalSection(mutex);
     89   return 0;
     90 }
     91 
     92 static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {
     93   DeleteCriticalSection(mutex);
     94   return 0;
     95 }
     96 
     97 // Condition
     98 static int pthread_cond_destroy(pthread_cond_t* const condition) {
     99   int ok = 1;
    100   ok &= (CloseHandle(condition->waiting_sem_) != 0);
    101   ok &= (CloseHandle(condition->received_sem_) != 0);
    102   ok &= (CloseHandle(condition->signal_event_) != 0);
    103   return !ok;
    104 }
    105 
    106 static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {
    107   (void)cond_attr;
    108   condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
    109   condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
    110   condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL);
    111   if (condition->waiting_sem_ == NULL ||
    112       condition->received_sem_ == NULL ||
    113       condition->signal_event_ == NULL) {
    114     pthread_cond_destroy(condition);
    115     return 1;
    116   }
    117   return 0;
    118 }
    119 
    120 static int pthread_cond_signal(pthread_cond_t* const condition) {
    121   int ok = 1;
    122   if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) {
    123     // a thread is waiting in pthread_cond_wait: allow it to be notified
    124     ok = SetEvent(condition->signal_event_);
    125     // wait until the event is consumed so the signaler cannot consume
    126     // the event via its own pthread_cond_wait.
    127     ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) !=
    128            WAIT_OBJECT_0);
    129   }
    130   return !ok;
    131 }
    132 
    133 static int pthread_cond_wait(pthread_cond_t* const condition,
    134                              pthread_mutex_t* const mutex) {
    135   int ok;
    136   // note that there is a consumer available so the signal isn't dropped in
    137   // pthread_cond_signal
    138   if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL))
    139     return 1;
    140   // now unlock the mutex so pthread_cond_signal may be issued
    141   pthread_mutex_unlock(mutex);
    142   ok = (WaitForSingleObject(condition->signal_event_, INFINITE) ==
    143         WAIT_OBJECT_0);
    144   ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL);
    145   pthread_mutex_lock(mutex);
    146   return !ok;
    147 }
    148 
    149 #else  // !_WIN32
    150 # define THREADFN void*
    151 # define THREAD_RETURN(val) val
    152 #endif  // _WIN32
    153 
    154 //------------------------------------------------------------------------------
    155 
    156 static void Execute(WebPWorker* const worker);  // Forward declaration.
    157 
    158 static THREADFN ThreadLoop(void* ptr) {
    159   WebPWorker* const worker = (WebPWorker*)ptr;
    160   int done = 0;
    161   while (!done) {
    162     pthread_mutex_lock(&worker->impl_->mutex_);
    163     while (worker->status_ == OK) {   // wait in idling mode
    164       pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_);
    165     }
    166     if (worker->status_ == WORK) {
    167       Execute(worker);
    168       worker->status_ = OK;
    169     } else if (worker->status_ == NOT_OK) {   // finish the worker
    170       done = 1;
    171     }
    172     // signal to the main thread that we're done (for Sync())
    173     pthread_cond_signal(&worker->impl_->condition_);
    174     pthread_mutex_unlock(&worker->impl_->mutex_);
    175   }
    176   return THREAD_RETURN(NULL);    // Thread is finished
    177 }
    178 
    179 // main thread state control
    180 static void ChangeState(WebPWorker* const worker,
    181                         WebPWorkerStatus new_status) {
    182   // No-op when attempting to change state on a thread that didn't come up.
    183   // Checking status_ without acquiring the lock first would result in a data
    184   // race.
    185   if (worker->impl_ == NULL) return;
    186 
    187   pthread_mutex_lock(&worker->impl_->mutex_);
    188   if (worker->status_ >= OK) {
    189     // wait for the worker to finish
    190     while (worker->status_ != OK) {
    191       pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_);
    192     }
    193     // assign new status and release the working thread if needed
    194     if (new_status != OK) {
    195       worker->status_ = new_status;
    196       pthread_cond_signal(&worker->impl_->condition_);
    197     }
    198   }
    199   pthread_mutex_unlock(&worker->impl_->mutex_);
    200 }
    201 
    202 #endif  // WEBP_USE_THREAD
    203 
    204 //------------------------------------------------------------------------------
    205 
    206 static void Init(WebPWorker* const worker) {
    207   memset(worker, 0, sizeof(*worker));
    208   worker->status_ = NOT_OK;
    209 }
    210 
    211 static int Sync(WebPWorker* const worker) {
    212 #ifdef WEBP_USE_THREAD
    213   ChangeState(worker, OK);
    214 #endif
    215   assert(worker->status_ <= OK);
    216   return !worker->had_error;
    217 }
    218 
    219 static int Reset(WebPWorker* const worker) {
    220   int ok = 1;
    221   worker->had_error = 0;
    222   if (worker->status_ < OK) {
    223 #ifdef WEBP_USE_THREAD
    224     worker->impl_ = (WebPWorkerImpl*)WebPSafeCalloc(1, sizeof(*worker->impl_));
    225     if (worker->impl_ == NULL) {
    226       return 0;
    227     }
    228     if (pthread_mutex_init(&worker->impl_->mutex_, NULL)) {
    229       goto Error;
    230     }
    231     if (pthread_cond_init(&worker->impl_->condition_, NULL)) {
    232       pthread_mutex_destroy(&worker->impl_->mutex_);
    233       goto Error;
    234     }
    235     pthread_mutex_lock(&worker->impl_->mutex_);
    236     ok = !pthread_create(&worker->impl_->thread_, NULL, ThreadLoop, worker);
    237     if (ok) worker->status_ = OK;
    238     pthread_mutex_unlock(&worker->impl_->mutex_);
    239     if (!ok) {
    240       pthread_mutex_destroy(&worker->impl_->mutex_);
    241       pthread_cond_destroy(&worker->impl_->condition_);
    242  Error:
    243       WebPSafeFree(worker->impl_);
    244       worker->impl_ = NULL;
    245       return 0;
    246     }
    247 #else
    248     worker->status_ = OK;
    249 #endif
    250   } else if (worker->status_ > OK) {
    251     ok = Sync(worker);
    252   }
    253   assert(!ok || (worker->status_ == OK));
    254   return ok;
    255 }
    256 
    257 static void Execute(WebPWorker* const worker) {
    258   if (worker->hook != NULL) {
    259     worker->had_error |= !worker->hook(worker->data1, worker->data2);
    260   }
    261 }
    262 
    263 static void Launch(WebPWorker* const worker) {
    264 #ifdef WEBP_USE_THREAD
    265   ChangeState(worker, WORK);
    266 #else
    267   Execute(worker);
    268 #endif
    269 }
    270 
    271 static void End(WebPWorker* const worker) {
    272 #ifdef WEBP_USE_THREAD
    273   if (worker->impl_ != NULL) {
    274     ChangeState(worker, NOT_OK);
    275     pthread_join(worker->impl_->thread_, NULL);
    276     pthread_mutex_destroy(&worker->impl_->mutex_);
    277     pthread_cond_destroy(&worker->impl_->condition_);
    278     WebPSafeFree(worker->impl_);
    279     worker->impl_ = NULL;
    280   }
    281 #else
    282   worker->status_ = NOT_OK;
    283   assert(worker->impl_ == NULL);
    284 #endif
    285   assert(worker->status_ == NOT_OK);
    286 }
    287 
    288 //------------------------------------------------------------------------------
    289 
    290 static WebPWorkerInterface g_worker_interface = {
    291   Init, Reset, Sync, Launch, Execute, End
    292 };
    293 
    294 int WebPSetWorkerInterface(const WebPWorkerInterface* const winterface) {
    295   if (winterface == NULL ||
    296       winterface->Init == NULL || winterface->Reset == NULL ||
    297       winterface->Sync == NULL || winterface->Launch == NULL ||
    298       winterface->Execute == NULL || winterface->End == NULL) {
    299     return 0;
    300   }
    301   g_worker_interface = *winterface;
    302   return 1;
    303 }
    304 
    305 const WebPWorkerInterface* WebPGetWorkerInterface(void) {
    306   return &g_worker_interface;
    307 }
    308 
    309 //------------------------------------------------------------------------------
    310