1 // Copyright 2011 Google Inc. All Rights Reserved. 2 // 3 // Use of this source code is governed by a BSD-style license 4 // that can be found in the COPYING file in the root of the source 5 // tree. An additional intellectual property rights grant can be found 6 // in the file PATENTS. All contributing project authors may 7 // be found in the AUTHORS file in the root of the source tree. 8 // ----------------------------------------------------------------------------- 9 // 10 // Multi-threaded worker 11 // 12 // Author: Skal (pascal.massimino (at) gmail.com) 13 14 #include <assert.h> 15 #include <string.h> // for memset() 16 #include "./thread_utils.h" 17 #include "./utils.h" 18 19 #ifdef WEBP_USE_THREAD 20 21 #if defined(_WIN32) 22 23 #include <windows.h> 24 typedef HANDLE pthread_t; 25 typedef CRITICAL_SECTION pthread_mutex_t; 26 27 #if _WIN32_WINNT >= 0x0600 // Windows Vista / Server 2008 or greater 28 #define USE_WINDOWS_CONDITION_VARIABLE 29 typedef CONDITION_VARIABLE pthread_cond_t; 30 #else 31 typedef struct { 32 HANDLE waiting_sem_; 33 HANDLE received_sem_; 34 HANDLE signal_event_; 35 } pthread_cond_t; 36 #endif // _WIN32_WINNT >= 0x600 37 38 #ifndef WINAPI_FAMILY_PARTITION 39 #define WINAPI_PARTITION_DESKTOP 1 40 #define WINAPI_FAMILY_PARTITION(x) x 41 #endif 42 43 #if !WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP) 44 #define USE_CREATE_THREAD 45 #endif 46 47 #else // !_WIN32 48 49 #include <pthread.h> 50 51 #endif // _WIN32 52 53 struct WebPWorkerImpl { 54 pthread_mutex_t mutex_; 55 pthread_cond_t condition_; 56 pthread_t thread_; 57 }; 58 59 #if defined(_WIN32) 60 61 //------------------------------------------------------------------------------ 62 // simplistic pthread emulation layer 63 64 #include <process.h> 65 66 // _beginthreadex requires __stdcall 67 #define THREADFN unsigned int __stdcall 68 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val) 69 70 #if _WIN32_WINNT >= 0x0501 // Windows XP or greater 71 #define WaitForSingleObject(obj, timeout) \ 72 WaitForSingleObjectEx(obj, timeout, FALSE /*bAlertable*/) 73 #endif 74 75 static int pthread_create(pthread_t* const thread, const void* attr, 76 unsigned int (__stdcall *start)(void*), void* arg) { 77 (void)attr; 78 #ifdef USE_CREATE_THREAD 79 *thread = CreateThread(NULL, /* lpThreadAttributes */ 80 0, /* dwStackSize */ 81 start, 82 arg, 83 0, /* dwStackSize */ 84 NULL); /* lpThreadId */ 85 #else 86 *thread = (pthread_t)_beginthreadex(NULL, /* void *security */ 87 0, /* unsigned stack_size */ 88 start, 89 arg, 90 0, /* unsigned initflag */ 91 NULL); /* unsigned *thrdaddr */ 92 #endif 93 if (*thread == NULL) return 1; 94 SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL); 95 return 0; 96 } 97 98 static int pthread_join(pthread_t thread, void** value_ptr) { 99 (void)value_ptr; 100 return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 || 101 CloseHandle(thread) == 0); 102 } 103 104 // Mutex 105 static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) { 106 (void)mutexattr; 107 #if _WIN32_WINNT >= 0x0600 // Windows Vista / Server 2008 or greater 108 InitializeCriticalSectionEx(mutex, 0 /*dwSpinCount*/, 0 /*Flags*/); 109 #else 110 InitializeCriticalSection(mutex); 111 #endif 112 return 0; 113 } 114 115 static int pthread_mutex_lock(pthread_mutex_t* const mutex) { 116 EnterCriticalSection(mutex); 117 return 0; 118 } 119 120 static int pthread_mutex_unlock(pthread_mutex_t* const mutex) { 121 LeaveCriticalSection(mutex); 122 return 0; 123 } 124 125 static int pthread_mutex_destroy(pthread_mutex_t* const mutex) { 126 DeleteCriticalSection(mutex); 127 return 0; 128 } 129 130 // Condition 131 static int pthread_cond_destroy(pthread_cond_t* const condition) { 132 int ok = 1; 133 #ifdef USE_WINDOWS_CONDITION_VARIABLE 134 (void)condition; 135 #else 136 ok &= (CloseHandle(condition->waiting_sem_) != 0); 137 ok &= (CloseHandle(condition->received_sem_) != 0); 138 ok &= (CloseHandle(condition->signal_event_) != 0); 139 #endif 140 return !ok; 141 } 142 143 static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) { 144 (void)cond_attr; 145 #ifdef USE_WINDOWS_CONDITION_VARIABLE 146 InitializeConditionVariable(condition); 147 #else 148 condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL); 149 condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL); 150 condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL); 151 if (condition->waiting_sem_ == NULL || 152 condition->received_sem_ == NULL || 153 condition->signal_event_ == NULL) { 154 pthread_cond_destroy(condition); 155 return 1; 156 } 157 #endif 158 return 0; 159 } 160 161 static int pthread_cond_signal(pthread_cond_t* const condition) { 162 int ok = 1; 163 #ifdef USE_WINDOWS_CONDITION_VARIABLE 164 WakeConditionVariable(condition); 165 #else 166 if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) { 167 // a thread is waiting in pthread_cond_wait: allow it to be notified 168 ok = SetEvent(condition->signal_event_); 169 // wait until the event is consumed so the signaler cannot consume 170 // the event via its own pthread_cond_wait. 171 ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) != 172 WAIT_OBJECT_0); 173 } 174 #endif 175 return !ok; 176 } 177 178 static int pthread_cond_wait(pthread_cond_t* const condition, 179 pthread_mutex_t* const mutex) { 180 int ok; 181 #ifdef USE_WINDOWS_CONDITION_VARIABLE 182 ok = SleepConditionVariableCS(condition, mutex, INFINITE); 183 #else 184 // note that there is a consumer available so the signal isn't dropped in 185 // pthread_cond_signal 186 if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL)) return 1; 187 // now unlock the mutex so pthread_cond_signal may be issued 188 pthread_mutex_unlock(mutex); 189 ok = (WaitForSingleObject(condition->signal_event_, INFINITE) == 190 WAIT_OBJECT_0); 191 ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL); 192 pthread_mutex_lock(mutex); 193 #endif 194 return !ok; 195 } 196 197 #else // !_WIN32 198 # define THREADFN void* 199 # define THREAD_RETURN(val) val 200 #endif // _WIN32 201 202 //------------------------------------------------------------------------------ 203 204 static void Execute(WebPWorker* const worker); // Forward declaration. 205 206 static THREADFN ThreadLoop(void* ptr) { 207 WebPWorker* const worker = (WebPWorker*)ptr; 208 int done = 0; 209 while (!done) { 210 pthread_mutex_lock(&worker->impl_->mutex_); 211 while (worker->status_ == OK) { // wait in idling mode 212 pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_); 213 } 214 if (worker->status_ == WORK) { 215 Execute(worker); 216 worker->status_ = OK; 217 } else if (worker->status_ == NOT_OK) { // finish the worker 218 done = 1; 219 } 220 // signal to the main thread that we're done (for Sync()) 221 pthread_cond_signal(&worker->impl_->condition_); 222 pthread_mutex_unlock(&worker->impl_->mutex_); 223 } 224 return THREAD_RETURN(NULL); // Thread is finished 225 } 226 227 // main thread state control 228 static void ChangeState(WebPWorker* const worker, WebPWorkerStatus new_status) { 229 // No-op when attempting to change state on a thread that didn't come up. 230 // Checking status_ without acquiring the lock first would result in a data 231 // race. 232 if (worker->impl_ == NULL) return; 233 234 pthread_mutex_lock(&worker->impl_->mutex_); 235 if (worker->status_ >= OK) { 236 // wait for the worker to finish 237 while (worker->status_ != OK) { 238 pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_); 239 } 240 // assign new status and release the working thread if needed 241 if (new_status != OK) { 242 worker->status_ = new_status; 243 pthread_cond_signal(&worker->impl_->condition_); 244 } 245 } 246 pthread_mutex_unlock(&worker->impl_->mutex_); 247 } 248 249 #endif // WEBP_USE_THREAD 250 251 //------------------------------------------------------------------------------ 252 253 static void Init(WebPWorker* const worker) { 254 memset(worker, 0, sizeof(*worker)); 255 worker->status_ = NOT_OK; 256 } 257 258 static int Sync(WebPWorker* const worker) { 259 #ifdef WEBP_USE_THREAD 260 ChangeState(worker, OK); 261 #endif 262 assert(worker->status_ <= OK); 263 return !worker->had_error; 264 } 265 266 static int Reset(WebPWorker* const worker) { 267 int ok = 1; 268 worker->had_error = 0; 269 if (worker->status_ < OK) { 270 #ifdef WEBP_USE_THREAD 271 worker->impl_ = (WebPWorkerImpl*)WebPSafeCalloc(1, sizeof(*worker->impl_)); 272 if (worker->impl_ == NULL) { 273 return 0; 274 } 275 if (pthread_mutex_init(&worker->impl_->mutex_, NULL)) { 276 goto Error; 277 } 278 if (pthread_cond_init(&worker->impl_->condition_, NULL)) { 279 pthread_mutex_destroy(&worker->impl_->mutex_); 280 goto Error; 281 } 282 pthread_mutex_lock(&worker->impl_->mutex_); 283 ok = !pthread_create(&worker->impl_->thread_, NULL, ThreadLoop, worker); 284 if (ok) worker->status_ = OK; 285 pthread_mutex_unlock(&worker->impl_->mutex_); 286 if (!ok) { 287 pthread_mutex_destroy(&worker->impl_->mutex_); 288 pthread_cond_destroy(&worker->impl_->condition_); 289 Error: 290 WebPSafeFree(worker->impl_); 291 worker->impl_ = NULL; 292 return 0; 293 } 294 #else 295 worker->status_ = OK; 296 #endif 297 } else if (worker->status_ > OK) { 298 ok = Sync(worker); 299 } 300 assert(!ok || (worker->status_ == OK)); 301 return ok; 302 } 303 304 static void Execute(WebPWorker* const worker) { 305 if (worker->hook != NULL) { 306 worker->had_error |= !worker->hook(worker->data1, worker->data2); 307 } 308 } 309 310 static void Launch(WebPWorker* const worker) { 311 #ifdef WEBP_USE_THREAD 312 ChangeState(worker, WORK); 313 #else 314 Execute(worker); 315 #endif 316 } 317 318 static void End(WebPWorker* const worker) { 319 #ifdef WEBP_USE_THREAD 320 if (worker->impl_ != NULL) { 321 ChangeState(worker, NOT_OK); 322 pthread_join(worker->impl_->thread_, NULL); 323 pthread_mutex_destroy(&worker->impl_->mutex_); 324 pthread_cond_destroy(&worker->impl_->condition_); 325 WebPSafeFree(worker->impl_); 326 worker->impl_ = NULL; 327 } 328 #else 329 worker->status_ = NOT_OK; 330 assert(worker->impl_ == NULL); 331 #endif 332 assert(worker->status_ == NOT_OK); 333 } 334 335 //------------------------------------------------------------------------------ 336 337 static WebPWorkerInterface g_worker_interface = { 338 Init, Reset, Sync, Launch, Execute, End 339 }; 340 341 int WebPSetWorkerInterface(const WebPWorkerInterface* const winterface) { 342 if (winterface == NULL || 343 winterface->Init == NULL || winterface->Reset == NULL || 344 winterface->Sync == NULL || winterface->Launch == NULL || 345 winterface->Execute == NULL || winterface->End == NULL) { 346 return 0; 347 } 348 g_worker_interface = *winterface; 349 return 1; 350 } 351 352 const WebPWorkerInterface* WebPGetWorkerInterface(void) { 353 return &g_worker_interface; 354 } 355 356 //------------------------------------------------------------------------------ 357