1 // Copyright 2011 Google Inc. All Rights Reserved. 2 // 3 // Use of this source code is governed by a BSD-style license 4 // that can be found in the COPYING file in the root of the source 5 // tree. An additional intellectual property rights grant can be found 6 // in the file PATENTS. All contributing project authors may 7 // be found in the AUTHORS file in the root of the source tree. 8 // ----------------------------------------------------------------------------- 9 // 10 // Multi-threaded worker 11 // 12 // Author: Skal (pascal.massimino (at) gmail.com) 13 14 #include <assert.h> 15 #include <string.h> // for memset() 16 #include "./thread.h" 17 18 #if defined(__cplusplus) || defined(c_plusplus) 19 extern "C" { 20 #endif 21 22 #ifdef WEBP_USE_THREAD 23 24 #if defined(_WIN32) 25 26 //------------------------------------------------------------------------------ 27 // simplistic pthread emulation layer 28 29 #include <process.h> 30 31 // _beginthreadex requires __stdcall 32 #define THREADFN unsigned int __stdcall 33 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val) 34 35 static int pthread_create(pthread_t* const thread, const void* attr, 36 unsigned int (__stdcall *start)(void*), void* arg) { 37 (void)attr; 38 *thread = (pthread_t)_beginthreadex(NULL, /* void *security */ 39 0, /* unsigned stack_size */ 40 start, 41 arg, 42 0, /* unsigned initflag */ 43 NULL); /* unsigned *thrdaddr */ 44 if (*thread == NULL) return 1; 45 SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL); 46 return 0; 47 } 48 49 static int pthread_join(pthread_t thread, void** value_ptr) { 50 (void)value_ptr; 51 return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 || 52 CloseHandle(thread) == 0); 53 } 54 55 // Mutex 56 static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) { 57 (void)mutexattr; 58 InitializeCriticalSection(mutex); 59 return 0; 60 } 61 62 static int pthread_mutex_lock(pthread_mutex_t* const mutex) { 63 EnterCriticalSection(mutex); 64 return 0; 65 } 66 67 static int pthread_mutex_unlock(pthread_mutex_t* const mutex) { 68 LeaveCriticalSection(mutex); 69 return 0; 70 } 71 72 static int pthread_mutex_destroy(pthread_mutex_t* const mutex) { 73 DeleteCriticalSection(mutex); 74 return 0; 75 } 76 77 // Condition 78 static int pthread_cond_destroy(pthread_cond_t* const condition) { 79 int ok = 1; 80 ok &= (CloseHandle(condition->waiting_sem_) != 0); 81 ok &= (CloseHandle(condition->received_sem_) != 0); 82 ok &= (CloseHandle(condition->signal_event_) != 0); 83 return !ok; 84 } 85 86 static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) { 87 (void)cond_attr; 88 condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL); 89 condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL); 90 condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL); 91 if (condition->waiting_sem_ == NULL || 92 condition->received_sem_ == NULL || 93 condition->signal_event_ == NULL) { 94 pthread_cond_destroy(condition); 95 return 1; 96 } 97 return 0; 98 } 99 100 static int pthread_cond_signal(pthread_cond_t* const condition) { 101 int ok = 1; 102 if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) { 103 // a thread is waiting in pthread_cond_wait: allow it to be notified 104 ok = SetEvent(condition->signal_event_); 105 // wait until the event is consumed so the signaler cannot consume 106 // the event via its own pthread_cond_wait. 107 ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) != 108 WAIT_OBJECT_0); 109 } 110 return !ok; 111 } 112 113 static int pthread_cond_wait(pthread_cond_t* const condition, 114 pthread_mutex_t* const mutex) { 115 int ok; 116 // note that there is a consumer available so the signal isn't dropped in 117 // pthread_cond_signal 118 if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL)) 119 return 1; 120 // now unlock the mutex so pthread_cond_signal may be issued 121 pthread_mutex_unlock(mutex); 122 ok = (WaitForSingleObject(condition->signal_event_, INFINITE) == 123 WAIT_OBJECT_0); 124 ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL); 125 pthread_mutex_lock(mutex); 126 return !ok; 127 } 128 129 #else // _WIN32 130 # define THREADFN void* 131 # define THREAD_RETURN(val) val 132 #endif 133 134 //------------------------------------------------------------------------------ 135 136 static THREADFN WebPWorkerThreadLoop(void *ptr) { // thread loop 137 WebPWorker* const worker = (WebPWorker*)ptr; 138 int done = 0; 139 while (!done) { 140 pthread_mutex_lock(&worker->mutex_); 141 while (worker->status_ == OK) { // wait in idling mode 142 pthread_cond_wait(&worker->condition_, &worker->mutex_); 143 } 144 if (worker->status_ == WORK) { 145 if (worker->hook) { 146 worker->had_error |= !worker->hook(worker->data1, worker->data2); 147 } 148 worker->status_ = OK; 149 } else if (worker->status_ == NOT_OK) { // finish the worker 150 done = 1; 151 } 152 // signal to the main thread that we're done (for Sync()) 153 pthread_cond_signal(&worker->condition_); 154 pthread_mutex_unlock(&worker->mutex_); 155 } 156 return THREAD_RETURN(NULL); // Thread is finished 157 } 158 159 // main thread state control 160 static void WebPWorkerChangeState(WebPWorker* const worker, 161 WebPWorkerStatus new_status) { 162 // no-op when attempting to change state on a thread that didn't come up 163 if (worker->status_ < OK) return; 164 165 pthread_mutex_lock(&worker->mutex_); 166 // wait for the worker to finish 167 while (worker->status_ != OK) { 168 pthread_cond_wait(&worker->condition_, &worker->mutex_); 169 } 170 // assign new status and release the working thread if needed 171 if (new_status != OK) { 172 worker->status_ = new_status; 173 pthread_cond_signal(&worker->condition_); 174 } 175 pthread_mutex_unlock(&worker->mutex_); 176 } 177 178 #endif 179 180 //------------------------------------------------------------------------------ 181 182 void WebPWorkerInit(WebPWorker* const worker) { 183 memset(worker, 0, sizeof(*worker)); 184 worker->status_ = NOT_OK; 185 } 186 187 int WebPWorkerSync(WebPWorker* const worker) { 188 #ifdef WEBP_USE_THREAD 189 WebPWorkerChangeState(worker, OK); 190 #endif 191 assert(worker->status_ <= OK); 192 return !worker->had_error; 193 } 194 195 int WebPWorkerReset(WebPWorker* const worker) { 196 int ok = 1; 197 worker->had_error = 0; 198 if (worker->status_ < OK) { 199 #ifdef WEBP_USE_THREAD 200 if (pthread_mutex_init(&worker->mutex_, NULL) || 201 pthread_cond_init(&worker->condition_, NULL)) { 202 return 0; 203 } 204 pthread_mutex_lock(&worker->mutex_); 205 ok = !pthread_create(&worker->thread_, NULL, WebPWorkerThreadLoop, worker); 206 if (ok) worker->status_ = OK; 207 pthread_mutex_unlock(&worker->mutex_); 208 #else 209 worker->status_ = OK; 210 #endif 211 } else if (worker->status_ > OK) { 212 ok = WebPWorkerSync(worker); 213 } 214 assert(!ok || (worker->status_ == OK)); 215 return ok; 216 } 217 218 void WebPWorkerLaunch(WebPWorker* const worker) { 219 #ifdef WEBP_USE_THREAD 220 WebPWorkerChangeState(worker, WORK); 221 #else 222 if (worker->hook) 223 worker->had_error |= !worker->hook(worker->data1, worker->data2); 224 #endif 225 } 226 227 void WebPWorkerEnd(WebPWorker* const worker) { 228 if (worker->status_ >= OK) { 229 #ifdef WEBP_USE_THREAD 230 WebPWorkerChangeState(worker, NOT_OK); 231 pthread_join(worker->thread_, NULL); 232 pthread_mutex_destroy(&worker->mutex_); 233 pthread_cond_destroy(&worker->condition_); 234 #else 235 worker->status_ = NOT_OK; 236 #endif 237 } 238 assert(worker->status_ == NOT_OK); 239 } 240 241 //------------------------------------------------------------------------------ 242 243 #if defined(__cplusplus) || defined(c_plusplus) 244 } // extern "C" 245 #endif 246