1 /* 2 * The copyright in this software is being made available under the 2-clauses 3 * BSD License, included below. This software may be subject to other third 4 * party and contributor rights, including patent rights, and no such rights 5 * are granted under this license. 6 * 7 * Copyright (c) 2016, Even Rouault 8 * All rights reserved. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 1. Redistributions of source code must retain the above copyright 14 * notice, this list of conditions and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS `AS IS' 20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 * POSSIBILITY OF SUCH DAMAGE. 30 */ 31 32 #include <assert.h> 33 34 #ifdef MUTEX_win32 35 36 /* Some versions of x86_64-w64-mingw32-gc -m32 resolve InterlockedCompareExchange() */ 37 /* as __sync_val_compare_and_swap_4 but fails to link it. As this protects against */ 38 /* a rather unlikely race, skip it */ 39 #if !(defined(__MINGW32__) && defined(__i386__)) 40 #define HAVE_INTERLOCKED_COMPARE_EXCHANGE 1 41 #endif 42 43 #include <windows.h> 44 #include <process.h> 45 46 #include "opj_includes.h" 47 48 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void) 49 { 50 return OPJ_TRUE; 51 } 52 53 int OPJ_CALLCONV opj_get_num_cpus(void) 54 { 55 SYSTEM_INFO info; 56 DWORD dwNum; 57 GetSystemInfo(&info); 58 dwNum = info.dwNumberOfProcessors; 59 if (dwNum < 1) { 60 return 1; 61 } 62 return (int)dwNum; 63 } 64 65 struct opj_mutex_t { 66 CRITICAL_SECTION cs; 67 }; 68 69 opj_mutex_t* opj_mutex_create(void) 70 { 71 opj_mutex_t* mutex = (opj_mutex_t*) opj_malloc(sizeof(opj_mutex_t)); 72 if (!mutex) { 73 return NULL; 74 } 75 InitializeCriticalSectionAndSpinCount(&(mutex->cs), 4000); 76 return mutex; 77 } 78 79 void opj_mutex_lock(opj_mutex_t* mutex) 80 { 81 EnterCriticalSection(&(mutex->cs)); 82 } 83 84 void opj_mutex_unlock(opj_mutex_t* mutex) 85 { 86 LeaveCriticalSection(&(mutex->cs)); 87 } 88 89 void opj_mutex_destroy(opj_mutex_t* mutex) 90 { 91 if (!mutex) { 92 return; 93 } 94 DeleteCriticalSection(&(mutex->cs)); 95 opj_free(mutex); 96 } 97 98 struct opj_cond_waiter_list_t { 99 HANDLE hEvent; 100 struct opj_cond_waiter_list_t* next; 101 }; 102 typedef struct opj_cond_waiter_list_t opj_cond_waiter_list_t; 103 104 struct opj_cond_t { 105 opj_mutex_t *internal_mutex; 106 opj_cond_waiter_list_t *waiter_list; 107 }; 108 109 static DWORD TLSKey = 0; 110 static volatile LONG inTLSLockedSection = 0; 111 static volatile int TLSKeyInit = OPJ_FALSE; 112 113 opj_cond_t* opj_cond_create(void) 114 { 115 opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t)); 116 if (!cond) { 117 return NULL; 118 } 119 120 /* Make sure that the TLS key is allocated in a thread-safe way */ 121 /* We cannot use a global mutex/critical section since its creation itself would not be */ 122 /* thread-safe, so use InterlockedCompareExchange trick */ 123 while (OPJ_TRUE) { 124 125 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE 126 if (InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0) 127 #endif 128 { 129 if (!TLSKeyInit) { 130 TLSKey = TlsAlloc(); 131 TLSKeyInit = OPJ_TRUE; 132 } 133 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE 134 InterlockedCompareExchange(&inTLSLockedSection, 0, 1); 135 #endif 136 break; 137 } 138 } 139 140 if (TLSKey == TLS_OUT_OF_INDEXES) { 141 opj_free(cond); 142 return NULL; 143 } 144 cond->internal_mutex = opj_mutex_create(); 145 if (cond->internal_mutex == NULL) { 146 opj_free(cond); 147 return NULL; 148 } 149 cond->waiter_list = NULL; 150 return cond; 151 } 152 153 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex) 154 { 155 opj_cond_waiter_list_t* item; 156 HANDLE hEvent = (HANDLE) TlsGetValue(TLSKey); 157 if (hEvent == NULL) { 158 hEvent = CreateEvent(NULL, /* security attributes */ 159 0, /* manual reset = no */ 160 0, /* initial state = unsignaled */ 161 NULL /* no name */); 162 assert(hEvent); 163 164 TlsSetValue(TLSKey, hEvent); 165 } 166 167 /* Insert the waiter into the waiter list of the condition */ 168 opj_mutex_lock(cond->internal_mutex); 169 170 item = (opj_cond_waiter_list_t*)opj_malloc(sizeof(opj_cond_waiter_list_t)); 171 assert(item != NULL); 172 173 item->hEvent = hEvent; 174 item->next = cond->waiter_list; 175 176 cond->waiter_list = item; 177 178 opj_mutex_unlock(cond->internal_mutex); 179 180 /* Release the client mutex before waiting for the event being signaled */ 181 opj_mutex_unlock(mutex); 182 183 /* Ideally we would check that we do not get WAIT_FAILED but it is hard */ 184 /* to report a failure. */ 185 WaitForSingleObject(hEvent, INFINITE); 186 187 /* Reacquire the client mutex */ 188 opj_mutex_lock(mutex); 189 } 190 191 void opj_cond_signal(opj_cond_t* cond) 192 { 193 opj_cond_waiter_list_t* psIter; 194 195 /* Signal the first registered event, and remove it from the list */ 196 opj_mutex_lock(cond->internal_mutex); 197 198 psIter = cond->waiter_list; 199 if (psIter != NULL) { 200 SetEvent(psIter->hEvent); 201 cond->waiter_list = psIter->next; 202 opj_free(psIter); 203 } 204 205 opj_mutex_unlock(cond->internal_mutex); 206 } 207 208 void opj_cond_destroy(opj_cond_t* cond) 209 { 210 if (!cond) { 211 return; 212 } 213 opj_mutex_destroy(cond->internal_mutex); 214 assert(cond->waiter_list == NULL); 215 opj_free(cond); 216 } 217 218 struct opj_thread_t { 219 opj_thread_fn thread_fn; 220 void* user_data; 221 HANDLE hThread; 222 }; 223 224 unsigned int __stdcall opj_thread_callback_adapter(void *info) 225 { 226 opj_thread_t* thread = (opj_thread_t*) info; 227 HANDLE hEvent = NULL; 228 229 thread->thread_fn(thread->user_data); 230 231 /* Free the handle possible allocated by a cond */ 232 while (OPJ_TRUE) { 233 /* Make sure TLSKey is not being created just at that moment... */ 234 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE 235 if (InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0) 236 #endif 237 { 238 if (TLSKeyInit) { 239 hEvent = (HANDLE) TlsGetValue(TLSKey); 240 } 241 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE 242 InterlockedCompareExchange(&inTLSLockedSection, 0, 1); 243 #endif 244 break; 245 } 246 } 247 if (hEvent) { 248 CloseHandle(hEvent); 249 } 250 251 return 0; 252 } 253 254 opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data) 255 { 256 opj_thread_t* thread; 257 258 assert(thread_fn); 259 260 thread = (opj_thread_t*) opj_malloc(sizeof(opj_thread_t)); 261 if (!thread) { 262 return NULL; 263 } 264 thread->thread_fn = thread_fn; 265 thread->user_data = user_data; 266 267 thread->hThread = (HANDLE)_beginthreadex(NULL, 0, 268 opj_thread_callback_adapter, thread, 0, NULL); 269 270 if (thread->hThread == NULL) { 271 opj_free(thread); 272 return NULL; 273 } 274 return thread; 275 } 276 277 void opj_thread_join(opj_thread_t* thread) 278 { 279 WaitForSingleObject(thread->hThread, INFINITE); 280 CloseHandle(thread->hThread); 281 282 opj_free(thread); 283 } 284 285 #elif MUTEX_pthread 286 287 #include <pthread.h> 288 #include <stdlib.h> 289 #include <unistd.h> 290 291 /* Moved after all system includes, and in particular pthread.h, so as to */ 292 /* avoid poisoning issuing with malloc() use in pthread.h with ulibc (#1013) */ 293 #include "opj_includes.h" 294 295 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void) 296 { 297 return OPJ_TRUE; 298 } 299 300 int OPJ_CALLCONV opj_get_num_cpus(void) 301 { 302 #ifdef _SC_NPROCESSORS_ONLN 303 return (int)sysconf(_SC_NPROCESSORS_ONLN); 304 #else 305 return 1; 306 #endif 307 } 308 309 struct opj_mutex_t { 310 pthread_mutex_t mutex; 311 }; 312 313 opj_mutex_t* opj_mutex_create(void) 314 { 315 opj_mutex_t* mutex = (opj_mutex_t*) opj_calloc(1U, sizeof(opj_mutex_t)); 316 if (mutex != NULL) { 317 if (pthread_mutex_init(&mutex->mutex, NULL) != 0) { 318 opj_free(mutex); 319 mutex = NULL; 320 } 321 } 322 return mutex; 323 } 324 325 void opj_mutex_lock(opj_mutex_t* mutex) 326 { 327 pthread_mutex_lock(&(mutex->mutex)); 328 } 329 330 void opj_mutex_unlock(opj_mutex_t* mutex) 331 { 332 pthread_mutex_unlock(&(mutex->mutex)); 333 } 334 335 void opj_mutex_destroy(opj_mutex_t* mutex) 336 { 337 if (!mutex) { 338 return; 339 } 340 pthread_mutex_destroy(&(mutex->mutex)); 341 opj_free(mutex); 342 } 343 344 struct opj_cond_t { 345 pthread_cond_t cond; 346 }; 347 348 opj_cond_t* opj_cond_create(void) 349 { 350 opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t)); 351 if (!cond) { 352 return NULL; 353 } 354 if (pthread_cond_init(&(cond->cond), NULL) != 0) { 355 opj_free(cond); 356 return NULL; 357 } 358 return cond; 359 } 360 361 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex) 362 { 363 pthread_cond_wait(&(cond->cond), &(mutex->mutex)); 364 } 365 366 void opj_cond_signal(opj_cond_t* cond) 367 { 368 int ret = pthread_cond_signal(&(cond->cond)); 369 (void)ret; 370 assert(ret == 0); 371 } 372 373 void opj_cond_destroy(opj_cond_t* cond) 374 { 375 if (!cond) { 376 return; 377 } 378 pthread_cond_destroy(&(cond->cond)); 379 opj_free(cond); 380 } 381 382 383 struct opj_thread_t { 384 opj_thread_fn thread_fn; 385 void* user_data; 386 pthread_t thread; 387 }; 388 389 static void* opj_thread_callback_adapter(void* info) 390 { 391 opj_thread_t* thread = (opj_thread_t*) info; 392 thread->thread_fn(thread->user_data); 393 return NULL; 394 } 395 396 opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data) 397 { 398 pthread_attr_t attr; 399 opj_thread_t* thread; 400 401 assert(thread_fn); 402 403 thread = (opj_thread_t*) opj_malloc(sizeof(opj_thread_t)); 404 if (!thread) { 405 return NULL; 406 } 407 thread->thread_fn = thread_fn; 408 thread->user_data = user_data; 409 410 pthread_attr_init(&attr); 411 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 412 if (pthread_create(&(thread->thread), &attr, 413 opj_thread_callback_adapter, (void *) thread) != 0) { 414 opj_free(thread); 415 return NULL; 416 } 417 return thread; 418 } 419 420 void opj_thread_join(opj_thread_t* thread) 421 { 422 void* status; 423 pthread_join(thread->thread, &status); 424 425 opj_free(thread); 426 } 427 428 #else 429 /* Stub implementation */ 430 431 #include "opj_includes.h" 432 433 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void) 434 { 435 return OPJ_FALSE; 436 } 437 438 int OPJ_CALLCONV opj_get_num_cpus(void) 439 { 440 return 1; 441 } 442 443 opj_mutex_t* opj_mutex_create(void) 444 { 445 return NULL; 446 } 447 448 void opj_mutex_lock(opj_mutex_t* mutex) 449 { 450 (void) mutex; 451 } 452 453 void opj_mutex_unlock(opj_mutex_t* mutex) 454 { 455 (void) mutex; 456 } 457 458 void opj_mutex_destroy(opj_mutex_t* mutex) 459 { 460 (void) mutex; 461 } 462 463 opj_cond_t* opj_cond_create(void) 464 { 465 return NULL; 466 } 467 468 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex) 469 { 470 (void) cond; 471 (void) mutex; 472 } 473 474 void opj_cond_signal(opj_cond_t* cond) 475 { 476 (void) cond; 477 } 478 479 void opj_cond_destroy(opj_cond_t* cond) 480 { 481 (void) cond; 482 } 483 484 opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data) 485 { 486 (void) thread_fn; 487 (void) user_data; 488 return NULL; 489 } 490 491 void opj_thread_join(opj_thread_t* thread) 492 { 493 (void) thread; 494 } 495 496 #endif 497 498 typedef struct { 499 int key; 500 void* value; 501 opj_tls_free_func opj_free_func; 502 } opj_tls_key_val_t; 503 504 struct opj_tls_t { 505 opj_tls_key_val_t* key_val; 506 int key_val_count; 507 }; 508 509 static opj_tls_t* opj_tls_new(void) 510 { 511 return (opj_tls_t*) opj_calloc(1, sizeof(opj_tls_t)); 512 } 513 514 static void opj_tls_destroy(opj_tls_t* tls) 515 { 516 int i; 517 if (!tls) { 518 return; 519 } 520 for (i = 0; i < tls->key_val_count; i++) { 521 if (tls->key_val[i].opj_free_func) { 522 tls->key_val[i].opj_free_func(tls->key_val[i].value); 523 } 524 } 525 opj_free(tls->key_val); 526 opj_free(tls); 527 } 528 529 void* opj_tls_get(opj_tls_t* tls, int key) 530 { 531 int i; 532 for (i = 0; i < tls->key_val_count; i++) { 533 if (tls->key_val[i].key == key) { 534 return tls->key_val[i].value; 535 } 536 } 537 return NULL; 538 } 539 540 OPJ_BOOL opj_tls_set(opj_tls_t* tls, int key, void* value, 541 opj_tls_free_func opj_free_func) 542 { 543 opj_tls_key_val_t* new_key_val; 544 int i; 545 546 if (tls->key_val_count == INT_MAX) { 547 return OPJ_FALSE; 548 } 549 for (i = 0; i < tls->key_val_count; i++) { 550 if (tls->key_val[i].key == key) { 551 if (tls->key_val[i].opj_free_func) { 552 tls->key_val[i].opj_free_func(tls->key_val[i].value); 553 } 554 tls->key_val[i].value = value; 555 tls->key_val[i].opj_free_func = opj_free_func; 556 return OPJ_TRUE; 557 } 558 } 559 new_key_val = (opj_tls_key_val_t*) opj_realloc(tls->key_val, 560 ((size_t)tls->key_val_count + 1U) * sizeof(opj_tls_key_val_t)); 561 if (!new_key_val) { 562 return OPJ_FALSE; 563 } 564 tls->key_val = new_key_val; 565 new_key_val[tls->key_val_count].key = key; 566 new_key_val[tls->key_val_count].value = value; 567 new_key_val[tls->key_val_count].opj_free_func = opj_free_func; 568 tls->key_val_count ++; 569 return OPJ_TRUE; 570 } 571 572 573 typedef struct { 574 opj_job_fn job_fn; 575 void *user_data; 576 } opj_worker_thread_job_t; 577 578 typedef struct { 579 opj_thread_pool_t *tp; 580 opj_thread_t *thread; 581 int marked_as_waiting; 582 583 opj_mutex_t *mutex; 584 opj_cond_t *cond; 585 } opj_worker_thread_t; 586 587 typedef enum { 588 OPJWTS_OK, 589 OPJWTS_STOP, 590 OPJWTS_ERROR 591 } opj_worker_thread_state; 592 593 struct opj_job_list_t { 594 opj_worker_thread_job_t* job; 595 struct opj_job_list_t* next; 596 }; 597 typedef struct opj_job_list_t opj_job_list_t; 598 599 struct opj_worker_thread_list_t { 600 opj_worker_thread_t* worker_thread; 601 struct opj_worker_thread_list_t* next; 602 }; 603 typedef struct opj_worker_thread_list_t opj_worker_thread_list_t; 604 605 struct opj_thread_pool_t { 606 opj_worker_thread_t* worker_threads; 607 int worker_threads_count; 608 opj_cond_t* cond; 609 opj_mutex_t* mutex; 610 volatile opj_worker_thread_state state; 611 opj_job_list_t* job_queue; 612 volatile int pending_jobs_count; 613 opj_worker_thread_list_t* waiting_worker_thread_list; 614 int waiting_worker_thread_count; 615 opj_tls_t* tls; 616 int signaling_threshold; 617 }; 618 619 static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads); 620 static opj_worker_thread_job_t* opj_thread_pool_get_next_job( 621 opj_thread_pool_t* tp, 622 opj_worker_thread_t* worker_thread, 623 OPJ_BOOL signal_job_finished); 624 625 opj_thread_pool_t* opj_thread_pool_create(int num_threads) 626 { 627 opj_thread_pool_t* tp; 628 629 tp = (opj_thread_pool_t*) opj_calloc(1, sizeof(opj_thread_pool_t)); 630 if (!tp) { 631 return NULL; 632 } 633 tp->state = OPJWTS_OK; 634 635 if (num_threads <= 0) { 636 tp->tls = opj_tls_new(); 637 if (!tp->tls) { 638 opj_free(tp); 639 tp = NULL; 640 } 641 return tp; 642 } 643 644 tp->mutex = opj_mutex_create(); 645 if (!tp->mutex) { 646 opj_free(tp); 647 return NULL; 648 } 649 if (!opj_thread_pool_setup(tp, num_threads)) { 650 opj_thread_pool_destroy(tp); 651 return NULL; 652 } 653 return tp; 654 } 655 656 static void opj_worker_thread_function(void* user_data) 657 { 658 opj_worker_thread_t* worker_thread; 659 opj_thread_pool_t* tp; 660 opj_tls_t* tls; 661 OPJ_BOOL job_finished = OPJ_FALSE; 662 663 worker_thread = (opj_worker_thread_t*) user_data; 664 tp = worker_thread->tp; 665 tls = opj_tls_new(); 666 667 while (OPJ_TRUE) { 668 opj_worker_thread_job_t* job = opj_thread_pool_get_next_job(tp, worker_thread, 669 job_finished); 670 if (job == NULL) { 671 break; 672 } 673 674 if (job->job_fn) { 675 job->job_fn(job->user_data, tls); 676 } 677 opj_free(job); 678 job_finished = OPJ_TRUE; 679 } 680 681 opj_tls_destroy(tls); 682 } 683 684 static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads) 685 { 686 int i; 687 OPJ_BOOL bRet = OPJ_TRUE; 688 689 assert(num_threads > 0); 690 691 tp->cond = opj_cond_create(); 692 if (tp->cond == NULL) { 693 return OPJ_FALSE; 694 } 695 696 tp->worker_threads = (opj_worker_thread_t*) opj_calloc((size_t)num_threads, 697 sizeof(opj_worker_thread_t)); 698 if (tp->worker_threads == NULL) { 699 return OPJ_FALSE; 700 } 701 tp->worker_threads_count = num_threads; 702 703 for (i = 0; i < num_threads; i++) { 704 tp->worker_threads[i].tp = tp; 705 706 tp->worker_threads[i].mutex = opj_mutex_create(); 707 if (tp->worker_threads[i].mutex == NULL) { 708 tp->worker_threads_count = i; 709 bRet = OPJ_FALSE; 710 break; 711 } 712 713 tp->worker_threads[i].cond = opj_cond_create(); 714 if (tp->worker_threads[i].cond == NULL) { 715 opj_mutex_destroy(tp->worker_threads[i].mutex); 716 tp->worker_threads_count = i; 717 bRet = OPJ_FALSE; 718 break; 719 } 720 721 tp->worker_threads[i].marked_as_waiting = OPJ_FALSE; 722 723 tp->worker_threads[i].thread = opj_thread_create(opj_worker_thread_function, 724 &(tp->worker_threads[i])); 725 if (tp->worker_threads[i].thread == NULL) { 726 tp->worker_threads_count = i; 727 bRet = OPJ_FALSE; 728 break; 729 } 730 } 731 732 /* Wait all threads to be started */ 733 /* printf("waiting for all threads to be started\n"); */ 734 opj_mutex_lock(tp->mutex); 735 while (tp->waiting_worker_thread_count < num_threads) { 736 opj_cond_wait(tp->cond, tp->mutex); 737 } 738 opj_mutex_unlock(tp->mutex); 739 /* printf("all threads started\n"); */ 740 741 if (tp->state == OPJWTS_ERROR) { 742 bRet = OPJ_FALSE; 743 } 744 745 return bRet; 746 } 747 748 /* 749 void opj_waiting() 750 { 751 printf("waiting!\n"); 752 } 753 */ 754 755 static opj_worker_thread_job_t* opj_thread_pool_get_next_job( 756 opj_thread_pool_t* tp, 757 opj_worker_thread_t* worker_thread, 758 OPJ_BOOL signal_job_finished) 759 { 760 while (OPJ_TRUE) { 761 opj_job_list_t* top_job_iter; 762 763 opj_mutex_lock(tp->mutex); 764 765 if (signal_job_finished) { 766 signal_job_finished = OPJ_FALSE; 767 tp->pending_jobs_count --; 768 /*printf("tp=%p, remaining jobs: %d\n", tp, tp->pending_jobs_count);*/ 769 if (tp->pending_jobs_count <= tp->signaling_threshold) { 770 opj_cond_signal(tp->cond); 771 } 772 } 773 774 if (tp->state == OPJWTS_STOP) { 775 opj_mutex_unlock(tp->mutex); 776 return NULL; 777 } 778 top_job_iter = tp->job_queue; 779 if (top_job_iter) { 780 opj_worker_thread_job_t* job; 781 tp->job_queue = top_job_iter->next; 782 783 job = top_job_iter->job; 784 opj_mutex_unlock(tp->mutex); 785 opj_free(top_job_iter); 786 return job; 787 } 788 789 /* opj_waiting(); */ 790 if (!worker_thread->marked_as_waiting) { 791 opj_worker_thread_list_t* item; 792 793 worker_thread->marked_as_waiting = OPJ_TRUE; 794 tp->waiting_worker_thread_count ++; 795 assert(tp->waiting_worker_thread_count <= tp->worker_threads_count); 796 797 item = (opj_worker_thread_list_t*) opj_malloc(sizeof(opj_worker_thread_list_t)); 798 if (item == NULL) { 799 tp->state = OPJWTS_ERROR; 800 opj_cond_signal(tp->cond); 801 802 opj_mutex_unlock(tp->mutex); 803 return NULL; 804 } 805 806 item->worker_thread = worker_thread; 807 item->next = tp->waiting_worker_thread_list; 808 tp->waiting_worker_thread_list = item; 809 } 810 811 /* printf("signaling that worker thread is ready\n"); */ 812 opj_cond_signal(tp->cond); 813 814 opj_mutex_lock(worker_thread->mutex); 815 opj_mutex_unlock(tp->mutex); 816 817 /* printf("waiting for job\n"); */ 818 opj_cond_wait(worker_thread->cond, worker_thread->mutex); 819 820 opj_mutex_unlock(worker_thread->mutex); 821 /* printf("got job\n"); */ 822 } 823 } 824 825 OPJ_BOOL opj_thread_pool_submit_job(opj_thread_pool_t* tp, 826 opj_job_fn job_fn, 827 void* user_data) 828 { 829 opj_worker_thread_job_t* job; 830 opj_job_list_t* item; 831 832 if (tp->mutex == NULL) { 833 job_fn(user_data, tp->tls); 834 return OPJ_TRUE; 835 } 836 837 job = (opj_worker_thread_job_t*)opj_malloc(sizeof(opj_worker_thread_job_t)); 838 if (job == NULL) { 839 return OPJ_FALSE; 840 } 841 job->job_fn = job_fn; 842 job->user_data = user_data; 843 844 item = (opj_job_list_t*) opj_malloc(sizeof(opj_job_list_t)); 845 if (item == NULL) { 846 opj_free(job); 847 return OPJ_FALSE; 848 } 849 item->job = job; 850 851 opj_mutex_lock(tp->mutex); 852 853 tp->signaling_threshold = 100 * tp->worker_threads_count; 854 while (tp->pending_jobs_count > tp->signaling_threshold) { 855 /* printf("%d jobs enqueued. Waiting\n", tp->pending_jobs_count); */ 856 opj_cond_wait(tp->cond, tp->mutex); 857 /* printf("...%d jobs enqueued.\n", tp->pending_jobs_count); */ 858 } 859 860 item->next = tp->job_queue; 861 tp->job_queue = item; 862 tp->pending_jobs_count ++; 863 864 if (tp->waiting_worker_thread_list) { 865 opj_worker_thread_t* worker_thread; 866 opj_worker_thread_list_t* next; 867 opj_worker_thread_list_t* to_opj_free; 868 869 worker_thread = tp->waiting_worker_thread_list->worker_thread; 870 871 assert(worker_thread->marked_as_waiting); 872 worker_thread->marked_as_waiting = OPJ_FALSE; 873 874 next = tp->waiting_worker_thread_list->next; 875 to_opj_free = tp->waiting_worker_thread_list; 876 tp->waiting_worker_thread_list = next; 877 tp->waiting_worker_thread_count --; 878 879 opj_mutex_lock(worker_thread->mutex); 880 opj_mutex_unlock(tp->mutex); 881 opj_cond_signal(worker_thread->cond); 882 opj_mutex_unlock(worker_thread->mutex); 883 884 opj_free(to_opj_free); 885 } else { 886 opj_mutex_unlock(tp->mutex); 887 } 888 889 return OPJ_TRUE; 890 } 891 892 void opj_thread_pool_wait_completion(opj_thread_pool_t* tp, 893 int max_remaining_jobs) 894 { 895 if (tp->mutex == NULL) { 896 return; 897 } 898 899 if (max_remaining_jobs < 0) { 900 max_remaining_jobs = 0; 901 } 902 opj_mutex_lock(tp->mutex); 903 tp->signaling_threshold = max_remaining_jobs; 904 while (tp->pending_jobs_count > max_remaining_jobs) { 905 /*printf("tp=%p, jobs before wait = %d, max_remaining_jobs = %d\n", tp, tp->pending_jobs_count, max_remaining_jobs);*/ 906 opj_cond_wait(tp->cond, tp->mutex); 907 /*printf("tp=%p, jobs after wait = %d\n", tp, tp->pending_jobs_count);*/ 908 } 909 opj_mutex_unlock(tp->mutex); 910 } 911 912 int opj_thread_pool_get_thread_count(opj_thread_pool_t* tp) 913 { 914 return tp->worker_threads_count; 915 } 916 917 void opj_thread_pool_destroy(opj_thread_pool_t* tp) 918 { 919 if (!tp) { 920 return; 921 } 922 if (tp->cond) { 923 int i; 924 opj_thread_pool_wait_completion(tp, 0); 925 926 opj_mutex_lock(tp->mutex); 927 tp->state = OPJWTS_STOP; 928 opj_mutex_unlock(tp->mutex); 929 930 for (i = 0; i < tp->worker_threads_count; i++) { 931 opj_mutex_lock(tp->worker_threads[i].mutex); 932 opj_cond_signal(tp->worker_threads[i].cond); 933 opj_mutex_unlock(tp->worker_threads[i].mutex); 934 opj_thread_join(tp->worker_threads[i].thread); 935 opj_cond_destroy(tp->worker_threads[i].cond); 936 opj_mutex_destroy(tp->worker_threads[i].mutex); 937 } 938 939 opj_free(tp->worker_threads); 940 941 while (tp->waiting_worker_thread_list != NULL) { 942 opj_worker_thread_list_t* next = tp->waiting_worker_thread_list->next; 943 opj_free(tp->waiting_worker_thread_list); 944 tp->waiting_worker_thread_list = next; 945 } 946 947 opj_cond_destroy(tp->cond); 948 } 949 opj_mutex_destroy(tp->mutex); 950 opj_tls_destroy(tp->tls); 951 opj_free(tp); 952 } 953