1 // Copyright 2011 Google Inc. All Rights Reserved. 2 // 3 // Use of this source code is governed by a BSD-style license 4 // that can be found in the COPYING file in the root of the source 5 // tree. An additional intellectual property rights grant can be found 6 // in the file PATENTS. All contributing project authors may 7 // be found in the AUTHORS file in the root of the source tree. 8 // ----------------------------------------------------------------------------- 9 // 10 // Multi-threaded worker 11 // 12 // Author: Skal (pascal.massimino (at) gmail.com) 13 14 #include <assert.h> 15 #include <string.h> // for memset() 16 #include "src/utils/thread_utils.h" 17 #include "src/utils/utils.h" 18 19 #ifdef WEBP_USE_THREAD 20 21 #if defined(_WIN32) 22 23 #include <windows.h> 24 typedef HANDLE pthread_t; 25 typedef CRITICAL_SECTION pthread_mutex_t; 26 27 #if _WIN32_WINNT >= 0x0600 // Windows Vista / Server 2008 or greater 28 #define USE_WINDOWS_CONDITION_VARIABLE 29 typedef CONDITION_VARIABLE pthread_cond_t; 30 #else 31 typedef struct { 32 HANDLE waiting_sem_; 33 HANDLE received_sem_; 34 HANDLE signal_event_; 35 } pthread_cond_t; 36 #endif // _WIN32_WINNT >= 0x600 37 38 #ifndef WINAPI_FAMILY_PARTITION 39 #define WINAPI_PARTITION_DESKTOP 1 40 #define WINAPI_FAMILY_PARTITION(x) x 41 #endif 42 43 #if !WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP) 44 #define USE_CREATE_THREAD 45 #endif 46 47 #else // !_WIN32 48 49 #include <pthread.h> 50 51 #endif // _WIN32 52 53 typedef struct { 54 pthread_mutex_t mutex_; 55 pthread_cond_t condition_; 56 pthread_t thread_; 57 } WebPWorkerImpl; 58 59 #if defined(_WIN32) 60 61 //------------------------------------------------------------------------------ 62 // simplistic pthread emulation layer 63 64 #include <process.h> 65 66 // _beginthreadex requires __stdcall 67 #define THREADFN unsigned int __stdcall 68 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val) 69 70 #if _WIN32_WINNT >= 0x0501 // Windows XP or greater 71 #define WaitForSingleObject(obj, timeout) \ 72 WaitForSingleObjectEx(obj, timeout, FALSE /*bAlertable*/) 73 #endif 74 75 static int pthread_create(pthread_t* const thread, const void* attr, 76 unsigned int (__stdcall *start)(void*), void* arg) { 77 (void)attr; 78 #ifdef USE_CREATE_THREAD 79 *thread = CreateThread(NULL, /* lpThreadAttributes */ 80 0, /* dwStackSize */ 81 start, 82 arg, 83 0, /* dwStackSize */ 84 NULL); /* lpThreadId */ 85 #else 86 *thread = (pthread_t)_beginthreadex(NULL, /* void *security */ 87 0, /* unsigned stack_size */ 88 start, 89 arg, 90 0, /* unsigned initflag */ 91 NULL); /* unsigned *thrdaddr */ 92 #endif 93 if (*thread == NULL) return 1; 94 SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL); 95 return 0; 96 } 97 98 static int pthread_join(pthread_t thread, void** value_ptr) { 99 (void)value_ptr; 100 return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 || 101 CloseHandle(thread) == 0); 102 } 103 104 // Mutex 105 static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) { 106 (void)mutexattr; 107 #if _WIN32_WINNT >= 0x0600 // Windows Vista / Server 2008 or greater 108 InitializeCriticalSectionEx(mutex, 0 /*dwSpinCount*/, 0 /*Flags*/); 109 #else 110 InitializeCriticalSection(mutex); 111 #endif 112 return 0; 113 } 114 115 static int pthread_mutex_lock(pthread_mutex_t* const mutex) { 116 EnterCriticalSection(mutex); 117 return 0; 118 } 119 120 static int pthread_mutex_unlock(pthread_mutex_t* const mutex) { 121 LeaveCriticalSection(mutex); 122 return 0; 123 } 124 125 static int pthread_mutex_destroy(pthread_mutex_t* const mutex) { 126 DeleteCriticalSection(mutex); 127 return 0; 128 } 129 130 // Condition 131 static int pthread_cond_destroy(pthread_cond_t* const condition) { 132 int ok = 1; 133 #ifdef USE_WINDOWS_CONDITION_VARIABLE 134 (void)condition; 135 #else 136 ok &= (CloseHandle(condition->waiting_sem_) != 0); 137 ok &= (CloseHandle(condition->received_sem_) != 0); 138 ok &= (CloseHandle(condition->signal_event_) != 0); 139 #endif 140 return !ok; 141 } 142 143 static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) { 144 (void)cond_attr; 145 #ifdef USE_WINDOWS_CONDITION_VARIABLE 146 InitializeConditionVariable(condition); 147 #else 148 condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL); 149 condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL); 150 condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL); 151 if (condition->waiting_sem_ == NULL || 152 condition->received_sem_ == NULL || 153 condition->signal_event_ == NULL) { 154 pthread_cond_destroy(condition); 155 return 1; 156 } 157 #endif 158 return 0; 159 } 160 161 static int pthread_cond_signal(pthread_cond_t* const condition) { 162 int ok = 1; 163 #ifdef USE_WINDOWS_CONDITION_VARIABLE 164 WakeConditionVariable(condition); 165 #else 166 if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) { 167 // a thread is waiting in pthread_cond_wait: allow it to be notified 168 ok = SetEvent(condition->signal_event_); 169 // wait until the event is consumed so the signaler cannot consume 170 // the event via its own pthread_cond_wait. 171 ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) != 172 WAIT_OBJECT_0); 173 } 174 #endif 175 return !ok; 176 } 177 178 static int pthread_cond_wait(pthread_cond_t* const condition, 179 pthread_mutex_t* const mutex) { 180 int ok; 181 #ifdef USE_WINDOWS_CONDITION_VARIABLE 182 ok = SleepConditionVariableCS(condition, mutex, INFINITE); 183 #else 184 // note that there is a consumer available so the signal isn't dropped in 185 // pthread_cond_signal 186 if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL)) return 1; 187 // now unlock the mutex so pthread_cond_signal may be issued 188 pthread_mutex_unlock(mutex); 189 ok = (WaitForSingleObject(condition->signal_event_, INFINITE) == 190 WAIT_OBJECT_0); 191 ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL); 192 pthread_mutex_lock(mutex); 193 #endif 194 return !ok; 195 } 196 197 #else // !_WIN32 198 # define THREADFN void* 199 # define THREAD_RETURN(val) val 200 #endif // _WIN32 201 202 //------------------------------------------------------------------------------ 203 204 static THREADFN ThreadLoop(void* ptr) { 205 WebPWorker* const worker = (WebPWorker*)ptr; 206 WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_; 207 int done = 0; 208 while (!done) { 209 pthread_mutex_lock(&impl->mutex_); 210 while (worker->status_ == OK) { // wait in idling mode 211 pthread_cond_wait(&impl->condition_, &impl->mutex_); 212 } 213 if (worker->status_ == WORK) { 214 WebPGetWorkerInterface()->Execute(worker); 215 worker->status_ = OK; 216 } else if (worker->status_ == NOT_OK) { // finish the worker 217 done = 1; 218 } 219 // signal to the main thread that we're done (for Sync()) 220 pthread_cond_signal(&impl->condition_); 221 pthread_mutex_unlock(&impl->mutex_); 222 } 223 return THREAD_RETURN(NULL); // Thread is finished 224 } 225 226 // main thread state control 227 static void ChangeState(WebPWorker* const worker, WebPWorkerStatus new_status) { 228 // No-op when attempting to change state on a thread that didn't come up. 229 // Checking status_ without acquiring the lock first would result in a data 230 // race. 231 WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_; 232 if (impl == NULL) return; 233 234 pthread_mutex_lock(&impl->mutex_); 235 if (worker->status_ >= OK) { 236 // wait for the worker to finish 237 while (worker->status_ != OK) { 238 pthread_cond_wait(&impl->condition_, &impl->mutex_); 239 } 240 // assign new status and release the working thread if needed 241 if (new_status != OK) { 242 worker->status_ = new_status; 243 pthread_cond_signal(&impl->condition_); 244 } 245 } 246 pthread_mutex_unlock(&impl->mutex_); 247 } 248 249 #endif // WEBP_USE_THREAD 250 251 //------------------------------------------------------------------------------ 252 253 static void Init(WebPWorker* const worker) { 254 memset(worker, 0, sizeof(*worker)); 255 worker->status_ = NOT_OK; 256 } 257 258 static int Sync(WebPWorker* const worker) { 259 #ifdef WEBP_USE_THREAD 260 ChangeState(worker, OK); 261 #endif 262 assert(worker->status_ <= OK); 263 return !worker->had_error; 264 } 265 266 static int Reset(WebPWorker* const worker) { 267 int ok = 1; 268 worker->had_error = 0; 269 if (worker->status_ < OK) { 270 #ifdef WEBP_USE_THREAD 271 WebPWorkerImpl* const impl = 272 (WebPWorkerImpl*)WebPSafeCalloc(1, sizeof(WebPWorkerImpl)); 273 worker->impl_ = (void*)impl; 274 if (worker->impl_ == NULL) { 275 return 0; 276 } 277 if (pthread_mutex_init(&impl->mutex_, NULL)) { 278 goto Error; 279 } 280 if (pthread_cond_init(&impl->condition_, NULL)) { 281 pthread_mutex_destroy(&impl->mutex_); 282 goto Error; 283 } 284 pthread_mutex_lock(&impl->mutex_); 285 ok = !pthread_create(&impl->thread_, NULL, ThreadLoop, worker); 286 if (ok) worker->status_ = OK; 287 pthread_mutex_unlock(&impl->mutex_); 288 if (!ok) { 289 pthread_mutex_destroy(&impl->mutex_); 290 pthread_cond_destroy(&impl->condition_); 291 Error: 292 WebPSafeFree(impl); 293 worker->impl_ = NULL; 294 return 0; 295 } 296 #else 297 worker->status_ = OK; 298 #endif 299 } else if (worker->status_ > OK) { 300 ok = Sync(worker); 301 } 302 assert(!ok || (worker->status_ == OK)); 303 return ok; 304 } 305 306 static void Execute(WebPWorker* const worker) { 307 if (worker->hook != NULL) { 308 worker->had_error |= !worker->hook(worker->data1, worker->data2); 309 } 310 } 311 312 static void Launch(WebPWorker* const worker) { 313 #ifdef WEBP_USE_THREAD 314 ChangeState(worker, WORK); 315 #else 316 Execute(worker); 317 #endif 318 } 319 320 static void End(WebPWorker* const worker) { 321 #ifdef WEBP_USE_THREAD 322 if (worker->impl_ != NULL) { 323 WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_; 324 ChangeState(worker, NOT_OK); 325 pthread_join(impl->thread_, NULL); 326 pthread_mutex_destroy(&impl->mutex_); 327 pthread_cond_destroy(&impl->condition_); 328 WebPSafeFree(impl); 329 worker->impl_ = NULL; 330 } 331 #else 332 worker->status_ = NOT_OK; 333 assert(worker->impl_ == NULL); 334 #endif 335 assert(worker->status_ == NOT_OK); 336 } 337 338 //------------------------------------------------------------------------------ 339 340 static WebPWorkerInterface g_worker_interface = { 341 Init, Reset, Sync, Launch, Execute, End 342 }; 343 344 int WebPSetWorkerInterface(const WebPWorkerInterface* const winterface) { 345 if (winterface == NULL || 346 winterface->Init == NULL || winterface->Reset == NULL || 347 winterface->Sync == NULL || winterface->Launch == NULL || 348 winterface->Execute == NULL || winterface->End == NULL) { 349 return 0; 350 } 351 g_worker_interface = *winterface; 352 return 1; 353 } 354 355 const WebPWorkerInterface* WebPGetWorkerInterface(void) { 356 return &g_worker_interface; 357 } 358 359 //------------------------------------------------------------------------------ 360