1 // Copyright 2011 Google Inc. All Rights Reserved. 2 // 3 // This code is licensed under the same terms as WebM: 4 // Software License Agreement: http://www.webmproject.org/license/software/ 5 // Additional IP Rights Grant: http://www.webmproject.org/license/additional/ 6 // ----------------------------------------------------------------------------- 7 // 8 // Multi-threaded worker 9 // 10 // Author: Skal (pascal.massimino (at) gmail.com) 11 12 #include <assert.h> 13 #include <string.h> // for memset() 14 #include "./thread.h" 15 16 #if defined(__cplusplus) || defined(c_plusplus) 17 extern "C" { 18 #endif 19 20 #ifdef WEBP_USE_THREAD 21 22 #if defined(_WIN32) 23 24 //------------------------------------------------------------------------------ 25 // simplistic pthread emulation layer 26 27 #include <process.h> 28 29 // _beginthreadex requires __stdcall 30 #define THREADFN unsigned int __stdcall 31 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val) 32 33 static int pthread_create(pthread_t* const thread, const void* attr, 34 unsigned int (__stdcall *start)(void*), void* arg) { 35 (void)attr; 36 *thread = (pthread_t)_beginthreadex(NULL, /* void *security */ 37 0, /* unsigned stack_size */ 38 start, 39 arg, 40 0, /* unsigned initflag */ 41 NULL); /* unsigned *thrdaddr */ 42 if (*thread == NULL) return 1; 43 SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL); 44 return 0; 45 } 46 47 static int pthread_join(pthread_t thread, void** value_ptr) { 48 (void)value_ptr; 49 return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 || 50 CloseHandle(thread) == 0); 51 } 52 53 // Mutex 54 static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) { 55 (void)mutexattr; 56 InitializeCriticalSection(mutex); 57 return 0; 58 } 59 60 static int pthread_mutex_lock(pthread_mutex_t* const mutex) { 61 EnterCriticalSection(mutex); 62 return 0; 63 } 64 65 static int pthread_mutex_unlock(pthread_mutex_t* const mutex) { 66 LeaveCriticalSection(mutex); 67 return 0; 68 } 69 70 static int pthread_mutex_destroy(pthread_mutex_t* const mutex) { 71 DeleteCriticalSection(mutex); 72 return 0; 73 } 74 75 // Condition 76 static int pthread_cond_destroy(pthread_cond_t* const condition) { 77 int ok = 1; 78 ok &= (CloseHandle(condition->waiting_sem_) != 0); 79 ok &= (CloseHandle(condition->received_sem_) != 0); 80 ok &= (CloseHandle(condition->signal_event_) != 0); 81 return !ok; 82 } 83 84 static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) { 85 (void)cond_attr; 86 condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL); 87 condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL); 88 condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL); 89 if (condition->waiting_sem_ == NULL || 90 condition->received_sem_ == NULL || 91 condition->signal_event_ == NULL) { 92 pthread_cond_destroy(condition); 93 return 1; 94 } 95 return 0; 96 } 97 98 static int pthread_cond_signal(pthread_cond_t* const condition) { 99 int ok = 1; 100 if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) { 101 // a thread is waiting in pthread_cond_wait: allow it to be notified 102 ok = SetEvent(condition->signal_event_); 103 // wait until the event is consumed so the signaler cannot consume 104 // the event via its own pthread_cond_wait. 105 ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) != 106 WAIT_OBJECT_0); 107 } 108 return !ok; 109 } 110 111 static int pthread_cond_wait(pthread_cond_t* const condition, 112 pthread_mutex_t* const mutex) { 113 int ok; 114 // note that there is a consumer available so the signal isn't dropped in 115 // pthread_cond_signal 116 if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL)) 117 return 1; 118 // now unlock the mutex so pthread_cond_signal may be issued 119 pthread_mutex_unlock(mutex); 120 ok = (WaitForSingleObject(condition->signal_event_, INFINITE) == 121 WAIT_OBJECT_0); 122 ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL); 123 pthread_mutex_lock(mutex); 124 return !ok; 125 } 126 127 #else // _WIN32 128 # define THREADFN void* 129 # define THREAD_RETURN(val) val 130 #endif 131 132 //------------------------------------------------------------------------------ 133 134 static THREADFN WebPWorkerThreadLoop(void *ptr) { // thread loop 135 WebPWorker* const worker = (WebPWorker*)ptr; 136 int done = 0; 137 while (!done) { 138 pthread_mutex_lock(&worker->mutex_); 139 while (worker->status_ == OK) { // wait in idling mode 140 pthread_cond_wait(&worker->condition_, &worker->mutex_); 141 } 142 if (worker->status_ == WORK) { 143 if (worker->hook) { 144 worker->had_error |= !worker->hook(worker->data1, worker->data2); 145 } 146 worker->status_ = OK; 147 } else if (worker->status_ == NOT_OK) { // finish the worker 148 done = 1; 149 } 150 // signal to the main thread that we're done (for Sync()) 151 pthread_cond_signal(&worker->condition_); 152 pthread_mutex_unlock(&worker->mutex_); 153 } 154 return THREAD_RETURN(NULL); // Thread is finished 155 } 156 157 // main thread state control 158 static void WebPWorkerChangeState(WebPWorker* const worker, 159 WebPWorkerStatus new_status) { 160 // no-op when attempting to change state on a thread that didn't come up 161 if (worker->status_ < OK) return; 162 163 pthread_mutex_lock(&worker->mutex_); 164 // wait for the worker to finish 165 while (worker->status_ != OK) { 166 pthread_cond_wait(&worker->condition_, &worker->mutex_); 167 } 168 // assign new status and release the working thread if needed 169 if (new_status != OK) { 170 worker->status_ = new_status; 171 pthread_cond_signal(&worker->condition_); 172 } 173 pthread_mutex_unlock(&worker->mutex_); 174 } 175 176 #endif 177 178 //------------------------------------------------------------------------------ 179 180 void WebPWorkerInit(WebPWorker* const worker) { 181 memset(worker, 0, sizeof(*worker)); 182 worker->status_ = NOT_OK; 183 } 184 185 int WebPWorkerSync(WebPWorker* const worker) { 186 #ifdef WEBP_USE_THREAD 187 WebPWorkerChangeState(worker, OK); 188 #endif 189 assert(worker->status_ <= OK); 190 return !worker->had_error; 191 } 192 193 int WebPWorkerReset(WebPWorker* const worker) { 194 int ok = 1; 195 worker->had_error = 0; 196 if (worker->status_ < OK) { 197 #ifdef WEBP_USE_THREAD 198 if (pthread_mutex_init(&worker->mutex_, NULL) || 199 pthread_cond_init(&worker->condition_, NULL)) { 200 return 0; 201 } 202 pthread_mutex_lock(&worker->mutex_); 203 ok = !pthread_create(&worker->thread_, NULL, WebPWorkerThreadLoop, worker); 204 if (ok) worker->status_ = OK; 205 pthread_mutex_unlock(&worker->mutex_); 206 #else 207 worker->status_ = OK; 208 #endif 209 } else if (worker->status_ > OK) { 210 ok = WebPWorkerSync(worker); 211 } 212 assert(!ok || (worker->status_ == OK)); 213 return ok; 214 } 215 216 void WebPWorkerLaunch(WebPWorker* const worker) { 217 #ifdef WEBP_USE_THREAD 218 WebPWorkerChangeState(worker, WORK); 219 #else 220 if (worker->hook) 221 worker->had_error |= !worker->hook(worker->data1, worker->data2); 222 #endif 223 } 224 225 void WebPWorkerEnd(WebPWorker* const worker) { 226 if (worker->status_ >= OK) { 227 #ifdef WEBP_USE_THREAD 228 WebPWorkerChangeState(worker, NOT_OK); 229 pthread_join(worker->thread_, NULL); 230 pthread_mutex_destroy(&worker->mutex_); 231 pthread_cond_destroy(&worker->condition_); 232 #else 233 worker->status_ = NOT_OK; 234 #endif 235 } 236 assert(worker->status_ == NOT_OK); 237 } 238 239 //------------------------------------------------------------------------------ 240 241 #if defined(__cplusplus) || defined(c_plusplus) 242 } // extern "C" 243 #endif 244