1 // Copyright 2013 Google Inc. All Rights Reserved. 2 // 3 // Use of this source code is governed by a BSD-style license 4 // that can be found in the COPYING file in the root of the source 5 // tree. An additional intellectual property rights grant can be found 6 // in the file PATENTS. All contributing project authors may 7 // be found in the AUTHORS file in the root of the source tree. 8 // ----------------------------------------------------------------------------- 9 // 10 // Multi-threaded worker 11 // 12 // Original source: 13 // http://git.chromium.org/webm/libwebp.git 14 // 100644 blob eff8f2a8c20095aade3c292b0e9292dac6cb3587 src/utils/thread.c 15 16 17 #include <assert.h> 18 #include <string.h> // for memset() 19 #include "./vp9_thread.h" 20 21 #if defined(__cplusplus) || defined(c_plusplus) 22 extern "C" { 23 #endif 24 25 #if CONFIG_MULTITHREAD 26 27 #if defined(_WIN32) 28 29 //------------------------------------------------------------------------------ 30 // simplistic pthread emulation layer 31 32 #include <process.h> // NOLINT 33 34 // _beginthreadex requires __stdcall 35 #define THREADFN unsigned int __stdcall 36 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val) 37 38 static int pthread_create(pthread_t* const thread, const void* attr, 39 unsigned int (__stdcall *start)(void*), void* arg) { 40 (void)attr; 41 *thread = (pthread_t)_beginthreadex(NULL, /* void *security */ 42 0, /* unsigned stack_size */ 43 start, 44 arg, 45 0, /* unsigned initflag */ 46 NULL); /* unsigned *thrdaddr */ 47 if (*thread == NULL) return 1; 48 SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL); 49 return 0; 50 } 51 52 static int pthread_join(pthread_t thread, void** value_ptr) { 53 (void)value_ptr; 54 return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 || 55 CloseHandle(thread) == 0); 56 } 57 58 // Mutex 59 static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) { 60 (void)mutexattr; 61 InitializeCriticalSection(mutex); 62 return 0; 63 } 64 65 static int pthread_mutex_lock(pthread_mutex_t* const mutex) { 66 EnterCriticalSection(mutex); 67 return 0; 68 } 69 70 static int pthread_mutex_unlock(pthread_mutex_t* const mutex) { 71 LeaveCriticalSection(mutex); 72 return 0; 73 } 74 75 static int pthread_mutex_destroy(pthread_mutex_t* const mutex) { 76 DeleteCriticalSection(mutex); 77 return 0; 78 } 79 80 // Condition 81 static int pthread_cond_destroy(pthread_cond_t* const condition) { 82 int ok = 1; 83 ok &= (CloseHandle(condition->waiting_sem_) != 0); 84 ok &= (CloseHandle(condition->received_sem_) != 0); 85 ok &= (CloseHandle(condition->signal_event_) != 0); 86 return !ok; 87 } 88 89 static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) { 90 (void)cond_attr; 91 condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL); 92 condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL); 93 condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL); 94 if (condition->waiting_sem_ == NULL || 95 condition->received_sem_ == NULL || 96 condition->signal_event_ == NULL) { 97 pthread_cond_destroy(condition); 98 return 1; 99 } 100 return 0; 101 } 102 103 static int pthread_cond_signal(pthread_cond_t* const condition) { 104 int ok = 1; 105 if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) { 106 // a thread is waiting in pthread_cond_wait: allow it to be notified 107 ok = SetEvent(condition->signal_event_); 108 // wait until the event is consumed so the signaler cannot consume 109 // the event via its own pthread_cond_wait. 110 ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) != 111 WAIT_OBJECT_0); 112 } 113 return !ok; 114 } 115 116 static int pthread_cond_wait(pthread_cond_t* const condition, 117 pthread_mutex_t* const mutex) { 118 int ok; 119 // note that there is a consumer available so the signal isn't dropped in 120 // pthread_cond_signal 121 if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL)) 122 return 1; 123 // now unlock the mutex so pthread_cond_signal may be issued 124 pthread_mutex_unlock(mutex); 125 ok = (WaitForSingleObject(condition->signal_event_, INFINITE) == 126 WAIT_OBJECT_0); 127 ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL); 128 pthread_mutex_lock(mutex); 129 return !ok; 130 } 131 132 #else // _WIN32 133 # define THREADFN void* 134 # define THREAD_RETURN(val) val 135 #endif 136 137 //------------------------------------------------------------------------------ 138 139 static THREADFN thread_loop(void *ptr) { // thread loop 140 VP9Worker* const worker = (VP9Worker*)ptr; 141 int done = 0; 142 while (!done) { 143 pthread_mutex_lock(&worker->mutex_); 144 while (worker->status_ == OK) { // wait in idling mode 145 pthread_cond_wait(&worker->condition_, &worker->mutex_); 146 } 147 if (worker->status_ == WORK) { 148 vp9_worker_execute(worker); 149 worker->status_ = OK; 150 } else if (worker->status_ == NOT_OK) { // finish the worker 151 done = 1; 152 } 153 // signal to the main thread that we're done (for Sync()) 154 pthread_cond_signal(&worker->condition_); 155 pthread_mutex_unlock(&worker->mutex_); 156 } 157 return THREAD_RETURN(NULL); // Thread is finished 158 } 159 160 // main thread state control 161 static void change_state(VP9Worker* const worker, 162 VP9WorkerStatus new_status) { 163 // no-op when attempting to change state on a thread that didn't come up 164 if (worker->status_ < OK) return; 165 166 pthread_mutex_lock(&worker->mutex_); 167 // wait for the worker to finish 168 while (worker->status_ != OK) { 169 pthread_cond_wait(&worker->condition_, &worker->mutex_); 170 } 171 // assign new status and release the working thread if needed 172 if (new_status != OK) { 173 worker->status_ = new_status; 174 pthread_cond_signal(&worker->condition_); 175 } 176 pthread_mutex_unlock(&worker->mutex_); 177 } 178 179 #endif // CONFIG_MULTITHREAD 180 181 //------------------------------------------------------------------------------ 182 183 void vp9_worker_init(VP9Worker* const worker) { 184 memset(worker, 0, sizeof(*worker)); 185 worker->status_ = NOT_OK; 186 } 187 188 int vp9_worker_sync(VP9Worker* const worker) { 189 #if CONFIG_MULTITHREAD 190 change_state(worker, OK); 191 #endif 192 assert(worker->status_ <= OK); 193 return !worker->had_error; 194 } 195 196 int vp9_worker_reset(VP9Worker* const worker) { 197 int ok = 1; 198 worker->had_error = 0; 199 if (worker->status_ < OK) { 200 #if CONFIG_MULTITHREAD 201 if (pthread_mutex_init(&worker->mutex_, NULL) || 202 pthread_cond_init(&worker->condition_, NULL)) { 203 return 0; 204 } 205 pthread_mutex_lock(&worker->mutex_); 206 ok = !pthread_create(&worker->thread_, NULL, thread_loop, worker); 207 if (ok) worker->status_ = OK; 208 pthread_mutex_unlock(&worker->mutex_); 209 #else 210 worker->status_ = OK; 211 #endif 212 } else if (worker->status_ > OK) { 213 ok = vp9_worker_sync(worker); 214 } 215 assert(!ok || (worker->status_ == OK)); 216 return ok; 217 } 218 219 void vp9_worker_execute(VP9Worker* const worker) { 220 if (worker->hook != NULL) { 221 worker->had_error |= !worker->hook(worker->data1, worker->data2); 222 } 223 } 224 225 void vp9_worker_launch(VP9Worker* const worker) { 226 #if CONFIG_MULTITHREAD 227 change_state(worker, WORK); 228 #else 229 vp9_worker_execute(worker); 230 #endif 231 } 232 233 void vp9_worker_end(VP9Worker* const worker) { 234 if (worker->status_ >= OK) { 235 #if CONFIG_MULTITHREAD 236 change_state(worker, NOT_OK); 237 pthread_join(worker->thread_, NULL); 238 pthread_mutex_destroy(&worker->mutex_); 239 pthread_cond_destroy(&worker->condition_); 240 #else 241 worker->status_ = NOT_OK; 242 #endif 243 } 244 assert(worker->status_ == NOT_OK); 245 } 246 247 //------------------------------------------------------------------------------ 248 249 #if defined(__cplusplus) || defined(c_plusplus) 250 } // extern "C" 251 #endif 252