Home | History | Annotate | Download | only in libopenjpeg20
      1 /*
      2  * The copyright in this software is being made available under the 2-clauses
      3  * BSD License, included below. This software may be subject to other third
      4  * party and contributor rights, including patent rights, and no such rights
      5  * are granted under this license.
      6  *
      7  * Copyright (c) 2016, Even Rouault
      8  * All rights reserved.
      9  *
     10  * Redistribution and use in source and binary forms, with or without
     11  * modification, are permitted provided that the following conditions
     12  * are met:
     13  * 1. Redistributions of source code must retain the above copyright
     14  *    notice, this list of conditions and the following disclaimer.
     15  * 2. Redistributions in binary form must reproduce the above copyright
     16  *    notice, this list of conditions and the following disclaimer in the
     17  *    documentation and/or other materials provided with the distribution.
     18  *
     19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS `AS IS'
     20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     22  * ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
     23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
     25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
     27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
     28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
     29  * POSSIBILITY OF SUCH DAMAGE.
     30  */
     31 
     32 #include <assert.h>
     33 
     34 #ifdef MUTEX_win32
     35 
     36 /* Some versions of x86_64-w64-mingw32-gc -m32 resolve InterlockedCompareExchange() */
     37 /* as __sync_val_compare_and_swap_4 but fails to link it. As this protects against */
     38 /* a rather unlikely race, skip it */
     39 #if !(defined(__MINGW32__) && defined(__i386__))
     40 #define HAVE_INTERLOCKED_COMPARE_EXCHANGE 1
     41 #endif
     42 
     43 #include <windows.h>
     44 #include <process.h>
     45 
     46 #include "opj_includes.h"
     47 
     48 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
     49 {
     50     return OPJ_TRUE;
     51 }
     52 
     53 int OPJ_CALLCONV opj_get_num_cpus(void)
     54 {
     55     SYSTEM_INFO info;
     56     DWORD dwNum;
     57     GetSystemInfo(&info);
     58     dwNum = info.dwNumberOfProcessors;
     59     if (dwNum < 1) {
     60         return 1;
     61     }
     62     return (int)dwNum;
     63 }
     64 
     65 struct opj_mutex_t {
     66     CRITICAL_SECTION cs;
     67 };
     68 
     69 opj_mutex_t* opj_mutex_create(void)
     70 {
     71     opj_mutex_t* mutex = (opj_mutex_t*) opj_malloc(sizeof(opj_mutex_t));
     72     if (!mutex) {
     73         return NULL;
     74     }
     75     InitializeCriticalSectionAndSpinCount(&(mutex->cs), 4000);
     76     return mutex;
     77 }
     78 
     79 void opj_mutex_lock(opj_mutex_t* mutex)
     80 {
     81     EnterCriticalSection(&(mutex->cs));
     82 }
     83 
     84 void opj_mutex_unlock(opj_mutex_t* mutex)
     85 {
     86     LeaveCriticalSection(&(mutex->cs));
     87 }
     88 
     89 void opj_mutex_destroy(opj_mutex_t* mutex)
     90 {
     91     if (!mutex) {
     92         return;
     93     }
     94     DeleteCriticalSection(&(mutex->cs));
     95     opj_free(mutex);
     96 }
     97 
     98 struct opj_cond_waiter_list_t {
     99     HANDLE hEvent;
    100     struct opj_cond_waiter_list_t* next;
    101 };
    102 typedef struct opj_cond_waiter_list_t opj_cond_waiter_list_t;
    103 
    104 struct opj_cond_t {
    105     opj_mutex_t             *internal_mutex;
    106     opj_cond_waiter_list_t  *waiter_list;
    107 };
    108 
    109 static DWORD TLSKey = 0;
    110 static volatile LONG inTLSLockedSection = 0;
    111 static volatile int TLSKeyInit = OPJ_FALSE;
    112 
    113 opj_cond_t* opj_cond_create(void)
    114 {
    115     opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t));
    116     if (!cond) {
    117         return NULL;
    118     }
    119 
    120     /* Make sure that the TLS key is allocated in a thread-safe way */
    121     /* We cannot use a global mutex/critical section since its creation itself would not be */
    122     /* thread-safe, so use InterlockedCompareExchange trick */
    123     while (OPJ_TRUE) {
    124 
    125 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
    126         if (InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0)
    127 #endif
    128         {
    129             if (!TLSKeyInit) {
    130                 TLSKey = TlsAlloc();
    131                 TLSKeyInit = OPJ_TRUE;
    132             }
    133 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
    134             InterlockedCompareExchange(&inTLSLockedSection, 0, 1);
    135 #endif
    136             break;
    137         }
    138     }
    139 
    140     if (TLSKey == TLS_OUT_OF_INDEXES) {
    141         opj_free(cond);
    142         return NULL;
    143     }
    144     cond->internal_mutex = opj_mutex_create();
    145     if (cond->internal_mutex == NULL) {
    146         opj_free(cond);
    147         return NULL;
    148     }
    149     cond->waiter_list = NULL;
    150     return cond;
    151 }
    152 
    153 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
    154 {
    155     opj_cond_waiter_list_t* item;
    156     HANDLE hEvent = (HANDLE) TlsGetValue(TLSKey);
    157     if (hEvent == NULL) {
    158         hEvent = CreateEvent(NULL, /* security attributes */
    159                              0,    /* manual reset = no */
    160                              0,    /* initial state = unsignaled */
    161                              NULL  /* no name */);
    162         assert(hEvent);
    163 
    164         TlsSetValue(TLSKey, hEvent);
    165     }
    166 
    167     /* Insert the waiter into the waiter list of the condition */
    168     opj_mutex_lock(cond->internal_mutex);
    169 
    170     item = (opj_cond_waiter_list_t*)opj_malloc(sizeof(opj_cond_waiter_list_t));
    171     assert(item != NULL);
    172 
    173     item->hEvent = hEvent;
    174     item->next = cond->waiter_list;
    175 
    176     cond->waiter_list = item;
    177 
    178     opj_mutex_unlock(cond->internal_mutex);
    179 
    180     /* Release the client mutex before waiting for the event being signaled */
    181     opj_mutex_unlock(mutex);
    182 
    183     /* Ideally we would check that we do not get WAIT_FAILED but it is hard */
    184     /* to report a failure. */
    185     WaitForSingleObject(hEvent, INFINITE);
    186 
    187     /* Reacquire the client mutex */
    188     opj_mutex_lock(mutex);
    189 }
    190 
    191 void opj_cond_signal(opj_cond_t* cond)
    192 {
    193     opj_cond_waiter_list_t* psIter;
    194 
    195     /* Signal the first registered event, and remove it from the list */
    196     opj_mutex_lock(cond->internal_mutex);
    197 
    198     psIter = cond->waiter_list;
    199     if (psIter != NULL) {
    200         SetEvent(psIter->hEvent);
    201         cond->waiter_list = psIter->next;
    202         opj_free(psIter);
    203     }
    204 
    205     opj_mutex_unlock(cond->internal_mutex);
    206 }
    207 
    208 void opj_cond_destroy(opj_cond_t* cond)
    209 {
    210     if (!cond) {
    211         return;
    212     }
    213     opj_mutex_destroy(cond->internal_mutex);
    214     assert(cond->waiter_list == NULL);
    215     opj_free(cond);
    216 }
    217 
    218 struct opj_thread_t {
    219     opj_thread_fn thread_fn;
    220     void* user_data;
    221     HANDLE hThread;
    222 };
    223 
    224 unsigned int __stdcall opj_thread_callback_adapter(void *info)
    225 {
    226     opj_thread_t* thread = (opj_thread_t*) info;
    227     HANDLE hEvent = NULL;
    228 
    229     thread->thread_fn(thread->user_data);
    230 
    231     /* Free the handle possible allocated by a cond */
    232     while (OPJ_TRUE) {
    233         /* Make sure TLSKey is not being created just at that moment... */
    234 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
    235         if (InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0)
    236 #endif
    237         {
    238             if (TLSKeyInit) {
    239                 hEvent = (HANDLE) TlsGetValue(TLSKey);
    240             }
    241 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
    242             InterlockedCompareExchange(&inTLSLockedSection, 0, 1);
    243 #endif
    244             break;
    245         }
    246     }
    247     if (hEvent) {
    248         CloseHandle(hEvent);
    249     }
    250 
    251     return 0;
    252 }
    253 
    254 opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data)
    255 {
    256     opj_thread_t* thread;
    257 
    258     assert(thread_fn);
    259 
    260     thread = (opj_thread_t*) opj_malloc(sizeof(opj_thread_t));
    261     if (!thread) {
    262         return NULL;
    263     }
    264     thread->thread_fn = thread_fn;
    265     thread->user_data = user_data;
    266 
    267     thread->hThread = (HANDLE)_beginthreadex(NULL, 0,
    268                       opj_thread_callback_adapter, thread, 0, NULL);
    269 
    270     if (thread->hThread == NULL) {
    271         opj_free(thread);
    272         return NULL;
    273     }
    274     return thread;
    275 }
    276 
    277 void opj_thread_join(opj_thread_t* thread)
    278 {
    279     WaitForSingleObject(thread->hThread, INFINITE);
    280     CloseHandle(thread->hThread);
    281 
    282     opj_free(thread);
    283 }
    284 
    285 #elif MUTEX_pthread
    286 
    287 #include <pthread.h>
    288 #include <stdlib.h>
    289 #include <unistd.h>
    290 
    291 /* Moved after all system includes, and in particular pthread.h, so as to */
    292 /* avoid poisoning issuing with malloc() use in pthread.h with ulibc (#1013) */
    293 #include "opj_includes.h"
    294 
    295 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
    296 {
    297     return OPJ_TRUE;
    298 }
    299 
    300 int OPJ_CALLCONV opj_get_num_cpus(void)
    301 {
    302 #ifdef _SC_NPROCESSORS_ONLN
    303     return (int)sysconf(_SC_NPROCESSORS_ONLN);
    304 #else
    305     return 1;
    306 #endif
    307 }
    308 
    309 struct opj_mutex_t {
    310     pthread_mutex_t mutex;
    311 };
    312 
    313 opj_mutex_t* opj_mutex_create(void)
    314 {
    315     opj_mutex_t* mutex = (opj_mutex_t*) opj_calloc(1U, sizeof(opj_mutex_t));
    316     if (mutex != NULL) {
    317         if (pthread_mutex_init(&mutex->mutex, NULL) != 0) {
    318             opj_free(mutex);
    319             mutex = NULL;
    320         }
    321     }
    322     return mutex;
    323 }
    324 
    325 void opj_mutex_lock(opj_mutex_t* mutex)
    326 {
    327     pthread_mutex_lock(&(mutex->mutex));
    328 }
    329 
    330 void opj_mutex_unlock(opj_mutex_t* mutex)
    331 {
    332     pthread_mutex_unlock(&(mutex->mutex));
    333 }
    334 
    335 void opj_mutex_destroy(opj_mutex_t* mutex)
    336 {
    337     if (!mutex) {
    338         return;
    339     }
    340     pthread_mutex_destroy(&(mutex->mutex));
    341     opj_free(mutex);
    342 }
    343 
    344 struct opj_cond_t {
    345     pthread_cond_t cond;
    346 };
    347 
    348 opj_cond_t* opj_cond_create(void)
    349 {
    350     opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t));
    351     if (!cond) {
    352         return NULL;
    353     }
    354     if (pthread_cond_init(&(cond->cond), NULL) != 0) {
    355         opj_free(cond);
    356         return NULL;
    357     }
    358     return cond;
    359 }
    360 
    361 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
    362 {
    363     pthread_cond_wait(&(cond->cond), &(mutex->mutex));
    364 }
    365 
    366 void opj_cond_signal(opj_cond_t* cond)
    367 {
    368     int ret = pthread_cond_signal(&(cond->cond));
    369     (void)ret;
    370     assert(ret == 0);
    371 }
    372 
    373 void opj_cond_destroy(opj_cond_t* cond)
    374 {
    375     if (!cond) {
    376         return;
    377     }
    378     pthread_cond_destroy(&(cond->cond));
    379     opj_free(cond);
    380 }
    381 
    382 
    383 struct opj_thread_t {
    384     opj_thread_fn thread_fn;
    385     void* user_data;
    386     pthread_t thread;
    387 };
    388 
    389 static void* opj_thread_callback_adapter(void* info)
    390 {
    391     opj_thread_t* thread = (opj_thread_t*) info;
    392     thread->thread_fn(thread->user_data);
    393     return NULL;
    394 }
    395 
    396 opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data)
    397 {
    398     pthread_attr_t attr;
    399     opj_thread_t* thread;
    400 
    401     assert(thread_fn);
    402 
    403     thread = (opj_thread_t*) opj_malloc(sizeof(opj_thread_t));
    404     if (!thread) {
    405         return NULL;
    406     }
    407     thread->thread_fn = thread_fn;
    408     thread->user_data = user_data;
    409 
    410     pthread_attr_init(&attr);
    411     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    412     if (pthread_create(&(thread->thread), &attr,
    413                        opj_thread_callback_adapter, (void *) thread) != 0) {
    414         opj_free(thread);
    415         return NULL;
    416     }
    417     return thread;
    418 }
    419 
    420 void opj_thread_join(opj_thread_t* thread)
    421 {
    422     void* status;
    423     pthread_join(thread->thread, &status);
    424 
    425     opj_free(thread);
    426 }
    427 
    428 #else
    429 /* Stub implementation */
    430 
    431 #include "opj_includes.h"
    432 
    433 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
    434 {
    435     return OPJ_FALSE;
    436 }
    437 
    438 int OPJ_CALLCONV opj_get_num_cpus(void)
    439 {
    440     return 1;
    441 }
    442 
    443 opj_mutex_t* opj_mutex_create(void)
    444 {
    445     return NULL;
    446 }
    447 
    448 void opj_mutex_lock(opj_mutex_t* mutex)
    449 {
    450     (void) mutex;
    451 }
    452 
    453 void opj_mutex_unlock(opj_mutex_t* mutex)
    454 {
    455     (void) mutex;
    456 }
    457 
    458 void opj_mutex_destroy(opj_mutex_t* mutex)
    459 {
    460     (void) mutex;
    461 }
    462 
    463 opj_cond_t* opj_cond_create(void)
    464 {
    465     return NULL;
    466 }
    467 
    468 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
    469 {
    470     (void) cond;
    471     (void) mutex;
    472 }
    473 
    474 void opj_cond_signal(opj_cond_t* cond)
    475 {
    476     (void) cond;
    477 }
    478 
    479 void opj_cond_destroy(opj_cond_t* cond)
    480 {
    481     (void) cond;
    482 }
    483 
    484 opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data)
    485 {
    486     (void) thread_fn;
    487     (void) user_data;
    488     return NULL;
    489 }
    490 
    491 void opj_thread_join(opj_thread_t* thread)
    492 {
    493     (void) thread;
    494 }
    495 
    496 #endif
    497 
    498 typedef struct {
    499     int key;
    500     void* value;
    501     opj_tls_free_func opj_free_func;
    502 } opj_tls_key_val_t;
    503 
    504 struct opj_tls_t {
    505     opj_tls_key_val_t* key_val;
    506     int                key_val_count;
    507 };
    508 
    509 static opj_tls_t* opj_tls_new(void)
    510 {
    511     return (opj_tls_t*) opj_calloc(1, sizeof(opj_tls_t));
    512 }
    513 
    514 static void opj_tls_destroy(opj_tls_t* tls)
    515 {
    516     int i;
    517     if (!tls) {
    518         return;
    519     }
    520     for (i = 0; i < tls->key_val_count; i++) {
    521         if (tls->key_val[i].opj_free_func) {
    522             tls->key_val[i].opj_free_func(tls->key_val[i].value);
    523         }
    524     }
    525     opj_free(tls->key_val);
    526     opj_free(tls);
    527 }
    528 
    529 void* opj_tls_get(opj_tls_t* tls, int key)
    530 {
    531     int i;
    532     for (i = 0; i < tls->key_val_count; i++) {
    533         if (tls->key_val[i].key == key) {
    534             return tls->key_val[i].value;
    535         }
    536     }
    537     return NULL;
    538 }
    539 
    540 OPJ_BOOL opj_tls_set(opj_tls_t* tls, int key, void* value,
    541                      opj_tls_free_func opj_free_func)
    542 {
    543     opj_tls_key_val_t* new_key_val;
    544     int i;
    545 
    546     if (tls->key_val_count == INT_MAX) {
    547         return OPJ_FALSE;
    548     }
    549     for (i = 0; i < tls->key_val_count; i++) {
    550         if (tls->key_val[i].key == key) {
    551             if (tls->key_val[i].opj_free_func) {
    552                 tls->key_val[i].opj_free_func(tls->key_val[i].value);
    553             }
    554             tls->key_val[i].value = value;
    555             tls->key_val[i].opj_free_func = opj_free_func;
    556             return OPJ_TRUE;
    557         }
    558     }
    559     new_key_val = (opj_tls_key_val_t*) opj_realloc(tls->key_val,
    560                   ((size_t)tls->key_val_count + 1U) * sizeof(opj_tls_key_val_t));
    561     if (!new_key_val) {
    562         return OPJ_FALSE;
    563     }
    564     tls->key_val = new_key_val;
    565     new_key_val[tls->key_val_count].key = key;
    566     new_key_val[tls->key_val_count].value = value;
    567     new_key_val[tls->key_val_count].opj_free_func = opj_free_func;
    568     tls->key_val_count ++;
    569     return OPJ_TRUE;
    570 }
    571 
    572 
    573 typedef struct {
    574     opj_job_fn          job_fn;
    575     void               *user_data;
    576 } opj_worker_thread_job_t;
    577 
    578 typedef struct {
    579     opj_thread_pool_t   *tp;
    580     opj_thread_t        *thread;
    581     int                  marked_as_waiting;
    582 
    583     opj_mutex_t         *mutex;
    584     opj_cond_t          *cond;
    585 } opj_worker_thread_t;
    586 
    587 typedef enum {
    588     OPJWTS_OK,
    589     OPJWTS_STOP,
    590     OPJWTS_ERROR
    591 } opj_worker_thread_state;
    592 
    593 struct opj_job_list_t {
    594     opj_worker_thread_job_t* job;
    595     struct opj_job_list_t* next;
    596 };
    597 typedef struct opj_job_list_t opj_job_list_t;
    598 
    599 struct opj_worker_thread_list_t {
    600     opj_worker_thread_t* worker_thread;
    601     struct opj_worker_thread_list_t* next;
    602 };
    603 typedef struct opj_worker_thread_list_t opj_worker_thread_list_t;
    604 
    605 struct opj_thread_pool_t {
    606     opj_worker_thread_t*             worker_threads;
    607     int                              worker_threads_count;
    608     opj_cond_t*                      cond;
    609     opj_mutex_t*                     mutex;
    610     volatile opj_worker_thread_state state;
    611     opj_job_list_t*                  job_queue;
    612     volatile int                     pending_jobs_count;
    613     opj_worker_thread_list_t*        waiting_worker_thread_list;
    614     int                              waiting_worker_thread_count;
    615     opj_tls_t*                       tls;
    616     int                              signaling_threshold;
    617 };
    618 
    619 static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads);
    620 static opj_worker_thread_job_t* opj_thread_pool_get_next_job(
    621     opj_thread_pool_t* tp,
    622     opj_worker_thread_t* worker_thread,
    623     OPJ_BOOL signal_job_finished);
    624 
    625 opj_thread_pool_t* opj_thread_pool_create(int num_threads)
    626 {
    627     opj_thread_pool_t* tp;
    628 
    629     tp = (opj_thread_pool_t*) opj_calloc(1, sizeof(opj_thread_pool_t));
    630     if (!tp) {
    631         return NULL;
    632     }
    633     tp->state = OPJWTS_OK;
    634 
    635     if (num_threads <= 0) {
    636         tp->tls = opj_tls_new();
    637         if (!tp->tls) {
    638             opj_free(tp);
    639             tp = NULL;
    640         }
    641         return tp;
    642     }
    643 
    644     tp->mutex = opj_mutex_create();
    645     if (!tp->mutex) {
    646         opj_free(tp);
    647         return NULL;
    648     }
    649     if (!opj_thread_pool_setup(tp, num_threads)) {
    650         opj_thread_pool_destroy(tp);
    651         return NULL;
    652     }
    653     return tp;
    654 }
    655 
    656 static void opj_worker_thread_function(void* user_data)
    657 {
    658     opj_worker_thread_t* worker_thread;
    659     opj_thread_pool_t* tp;
    660     opj_tls_t* tls;
    661     OPJ_BOOL job_finished = OPJ_FALSE;
    662 
    663     worker_thread = (opj_worker_thread_t*) user_data;
    664     tp = worker_thread->tp;
    665     tls = opj_tls_new();
    666 
    667     while (OPJ_TRUE) {
    668         opj_worker_thread_job_t* job = opj_thread_pool_get_next_job(tp, worker_thread,
    669                                        job_finished);
    670         if (job == NULL) {
    671             break;
    672         }
    673 
    674         if (job->job_fn) {
    675             job->job_fn(job->user_data, tls);
    676         }
    677         opj_free(job);
    678         job_finished = OPJ_TRUE;
    679     }
    680 
    681     opj_tls_destroy(tls);
    682 }
    683 
    684 static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads)
    685 {
    686     int i;
    687     OPJ_BOOL bRet = OPJ_TRUE;
    688 
    689     assert(num_threads > 0);
    690 
    691     tp->cond = opj_cond_create();
    692     if (tp->cond == NULL) {
    693         return OPJ_FALSE;
    694     }
    695 
    696     tp->worker_threads = (opj_worker_thread_t*) opj_calloc((size_t)num_threads,
    697                          sizeof(opj_worker_thread_t));
    698     if (tp->worker_threads == NULL) {
    699         return OPJ_FALSE;
    700     }
    701     tp->worker_threads_count = num_threads;
    702 
    703     for (i = 0; i < num_threads; i++) {
    704         tp->worker_threads[i].tp = tp;
    705 
    706         tp->worker_threads[i].mutex = opj_mutex_create();
    707         if (tp->worker_threads[i].mutex == NULL) {
    708             tp->worker_threads_count = i;
    709             bRet = OPJ_FALSE;
    710             break;
    711         }
    712 
    713         tp->worker_threads[i].cond = opj_cond_create();
    714         if (tp->worker_threads[i].cond == NULL) {
    715             opj_mutex_destroy(tp->worker_threads[i].mutex);
    716             tp->worker_threads_count = i;
    717             bRet = OPJ_FALSE;
    718             break;
    719         }
    720 
    721         tp->worker_threads[i].marked_as_waiting = OPJ_FALSE;
    722 
    723         tp->worker_threads[i].thread = opj_thread_create(opj_worker_thread_function,
    724                                        &(tp->worker_threads[i]));
    725         if (tp->worker_threads[i].thread == NULL) {
    726             tp->worker_threads_count = i;
    727             bRet = OPJ_FALSE;
    728             break;
    729         }
    730     }
    731 
    732     /* Wait all threads to be started */
    733     /* printf("waiting for all threads to be started\n"); */
    734     opj_mutex_lock(tp->mutex);
    735     while (tp->waiting_worker_thread_count < num_threads) {
    736         opj_cond_wait(tp->cond, tp->mutex);
    737     }
    738     opj_mutex_unlock(tp->mutex);
    739     /* printf("all threads started\n"); */
    740 
    741     if (tp->state == OPJWTS_ERROR) {
    742         bRet = OPJ_FALSE;
    743     }
    744 
    745     return bRet;
    746 }
    747 
    748 /*
    749 void opj_waiting()
    750 {
    751     printf("waiting!\n");
    752 }
    753 */
    754 
    755 static opj_worker_thread_job_t* opj_thread_pool_get_next_job(
    756     opj_thread_pool_t* tp,
    757     opj_worker_thread_t* worker_thread,
    758     OPJ_BOOL signal_job_finished)
    759 {
    760     while (OPJ_TRUE) {
    761         opj_job_list_t* top_job_iter;
    762 
    763         opj_mutex_lock(tp->mutex);
    764 
    765         if (signal_job_finished) {
    766             signal_job_finished = OPJ_FALSE;
    767             tp->pending_jobs_count --;
    768             /*printf("tp=%p, remaining jobs: %d\n", tp, tp->pending_jobs_count);*/
    769             if (tp->pending_jobs_count <= tp->signaling_threshold) {
    770                 opj_cond_signal(tp->cond);
    771             }
    772         }
    773 
    774         if (tp->state == OPJWTS_STOP) {
    775             opj_mutex_unlock(tp->mutex);
    776             return NULL;
    777         }
    778         top_job_iter = tp->job_queue;
    779         if (top_job_iter) {
    780             opj_worker_thread_job_t* job;
    781             tp->job_queue = top_job_iter->next;
    782 
    783             job = top_job_iter->job;
    784             opj_mutex_unlock(tp->mutex);
    785             opj_free(top_job_iter);
    786             return job;
    787         }
    788 
    789         /* opj_waiting(); */
    790         if (!worker_thread->marked_as_waiting) {
    791             opj_worker_thread_list_t* item;
    792 
    793             worker_thread->marked_as_waiting = OPJ_TRUE;
    794             tp->waiting_worker_thread_count ++;
    795             assert(tp->waiting_worker_thread_count <= tp->worker_threads_count);
    796 
    797             item = (opj_worker_thread_list_t*) opj_malloc(sizeof(opj_worker_thread_list_t));
    798             if (item == NULL) {
    799                 tp->state = OPJWTS_ERROR;
    800                 opj_cond_signal(tp->cond);
    801 
    802                 opj_mutex_unlock(tp->mutex);
    803                 return NULL;
    804             }
    805 
    806             item->worker_thread = worker_thread;
    807             item->next = tp->waiting_worker_thread_list;
    808             tp->waiting_worker_thread_list = item;
    809         }
    810 
    811         /* printf("signaling that worker thread is ready\n"); */
    812         opj_cond_signal(tp->cond);
    813 
    814         opj_mutex_lock(worker_thread->mutex);
    815         opj_mutex_unlock(tp->mutex);
    816 
    817         /* printf("waiting for job\n"); */
    818         opj_cond_wait(worker_thread->cond, worker_thread->mutex);
    819 
    820         opj_mutex_unlock(worker_thread->mutex);
    821         /* printf("got job\n"); */
    822     }
    823 }
    824 
    825 OPJ_BOOL opj_thread_pool_submit_job(opj_thread_pool_t* tp,
    826                                     opj_job_fn job_fn,
    827                                     void* user_data)
    828 {
    829     opj_worker_thread_job_t* job;
    830     opj_job_list_t* item;
    831 
    832     if (tp->mutex == NULL) {
    833         job_fn(user_data, tp->tls);
    834         return OPJ_TRUE;
    835     }
    836 
    837     job = (opj_worker_thread_job_t*)opj_malloc(sizeof(opj_worker_thread_job_t));
    838     if (job == NULL) {
    839         return OPJ_FALSE;
    840     }
    841     job->job_fn = job_fn;
    842     job->user_data = user_data;
    843 
    844     item = (opj_job_list_t*) opj_malloc(sizeof(opj_job_list_t));
    845     if (item == NULL) {
    846         opj_free(job);
    847         return OPJ_FALSE;
    848     }
    849     item->job = job;
    850 
    851     opj_mutex_lock(tp->mutex);
    852 
    853     tp->signaling_threshold = 100 * tp->worker_threads_count;
    854     while (tp->pending_jobs_count > tp->signaling_threshold) {
    855         /* printf("%d jobs enqueued. Waiting\n", tp->pending_jobs_count); */
    856         opj_cond_wait(tp->cond, tp->mutex);
    857         /* printf("...%d jobs enqueued.\n", tp->pending_jobs_count); */
    858     }
    859 
    860     item->next = tp->job_queue;
    861     tp->job_queue = item;
    862     tp->pending_jobs_count ++;
    863 
    864     if (tp->waiting_worker_thread_list) {
    865         opj_worker_thread_t* worker_thread;
    866         opj_worker_thread_list_t* next;
    867         opj_worker_thread_list_t* to_opj_free;
    868 
    869         worker_thread = tp->waiting_worker_thread_list->worker_thread;
    870 
    871         assert(worker_thread->marked_as_waiting);
    872         worker_thread->marked_as_waiting = OPJ_FALSE;
    873 
    874         next = tp->waiting_worker_thread_list->next;
    875         to_opj_free = tp->waiting_worker_thread_list;
    876         tp->waiting_worker_thread_list = next;
    877         tp->waiting_worker_thread_count --;
    878 
    879         opj_mutex_lock(worker_thread->mutex);
    880         opj_mutex_unlock(tp->mutex);
    881         opj_cond_signal(worker_thread->cond);
    882         opj_mutex_unlock(worker_thread->mutex);
    883 
    884         opj_free(to_opj_free);
    885     } else {
    886         opj_mutex_unlock(tp->mutex);
    887     }
    888 
    889     return OPJ_TRUE;
    890 }
    891 
    892 void opj_thread_pool_wait_completion(opj_thread_pool_t* tp,
    893                                      int max_remaining_jobs)
    894 {
    895     if (tp->mutex == NULL) {
    896         return;
    897     }
    898 
    899     if (max_remaining_jobs < 0) {
    900         max_remaining_jobs = 0;
    901     }
    902     opj_mutex_lock(tp->mutex);
    903     tp->signaling_threshold = max_remaining_jobs;
    904     while (tp->pending_jobs_count > max_remaining_jobs) {
    905         /*printf("tp=%p, jobs before wait = %d, max_remaining_jobs = %d\n", tp, tp->pending_jobs_count, max_remaining_jobs);*/
    906         opj_cond_wait(tp->cond, tp->mutex);
    907         /*printf("tp=%p, jobs after wait = %d\n", tp, tp->pending_jobs_count);*/
    908     }
    909     opj_mutex_unlock(tp->mutex);
    910 }
    911 
    912 int opj_thread_pool_get_thread_count(opj_thread_pool_t* tp)
    913 {
    914     return tp->worker_threads_count;
    915 }
    916 
    917 void opj_thread_pool_destroy(opj_thread_pool_t* tp)
    918 {
    919     if (!tp) {
    920         return;
    921     }
    922     if (tp->cond) {
    923         int i;
    924         opj_thread_pool_wait_completion(tp, 0);
    925 
    926         opj_mutex_lock(tp->mutex);
    927         tp->state = OPJWTS_STOP;
    928         opj_mutex_unlock(tp->mutex);
    929 
    930         for (i = 0; i < tp->worker_threads_count; i++) {
    931             opj_mutex_lock(tp->worker_threads[i].mutex);
    932             opj_cond_signal(tp->worker_threads[i].cond);
    933             opj_mutex_unlock(tp->worker_threads[i].mutex);
    934             opj_thread_join(tp->worker_threads[i].thread);
    935             opj_cond_destroy(tp->worker_threads[i].cond);
    936             opj_mutex_destroy(tp->worker_threads[i].mutex);
    937         }
    938 
    939         opj_free(tp->worker_threads);
    940 
    941         while (tp->waiting_worker_thread_list != NULL) {
    942             opj_worker_thread_list_t* next = tp->waiting_worker_thread_list->next;
    943             opj_free(tp->waiting_worker_thread_list);
    944             tp->waiting_worker_thread_list = next;
    945         }
    946 
    947         opj_cond_destroy(tp->cond);
    948     }
    949     opj_mutex_destroy(tp->mutex);
    950     opj_tls_destroy(tp->tls);
    951     opj_free(tp);
    952 }
    953