Home | History | Annotate | Download | only in util
      1 /*
      2  * Copyright  2016 Advanced Micro Devices, Inc.
      3  * All Rights Reserved.
      4  *
      5  * Permission is hereby granted, free of charge, to any person obtaining
      6  * a copy of this software and associated documentation files (the
      7  * "Software"), to deal in the Software without restriction, including
      8  * without limitation the rights to use, copy, modify, merge, publish,
      9  * distribute, sub license, and/or sell copies of the Software, and to
     10  * permit persons to whom the Software is furnished to do so, subject to
     11  * the following conditions:
     12  *
     13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
     14  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
     15  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
     16  * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
     17  * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
     19  * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
     20  * USE OR OTHER DEALINGS IN THE SOFTWARE.
     21  *
     22  * The above copyright notice and this permission notice (including the
     23  * next paragraph) shall be included in all copies or substantial portions
     24  * of the Software.
     25  */
     26 
     27 #include "u_queue.h"
     28 #include "u_memory.h"
     29 #include "u_string.h"
     30 #include "os/os_time.h"
     31 
     32 static void util_queue_killall_and_wait(struct util_queue *queue);
     33 
     34 /****************************************************************************
     35  * Wait for all queues to assert idle when exit() is called.
     36  *
     37  * Otherwise, C++ static variable destructors can be called while threads
     38  * are using the static variables.
     39  */
     40 
     41 static once_flag atexit_once_flag = ONCE_FLAG_INIT;
     42 static struct list_head queue_list;
     43 pipe_static_mutex(exit_mutex);
     44 
     45 static void
     46 atexit_handler(void)
     47 {
     48    struct util_queue *iter;
     49 
     50    pipe_mutex_lock(exit_mutex);
     51    /* Wait for all queues to assert idle. */
     52    LIST_FOR_EACH_ENTRY(iter, &queue_list, head) {
     53       util_queue_killall_and_wait(iter);
     54    }
     55    pipe_mutex_unlock(exit_mutex);
     56 }
     57 
     58 static void
     59 global_init(void)
     60 {
     61    LIST_INITHEAD(&queue_list);
     62    atexit(atexit_handler);
     63 }
     64 
     65 static void
     66 add_to_atexit_list(struct util_queue *queue)
     67 {
     68    call_once(&atexit_once_flag, global_init);
     69 
     70    pipe_mutex_lock(exit_mutex);
     71    LIST_ADD(&queue->head, &queue_list);
     72    pipe_mutex_unlock(exit_mutex);
     73 }
     74 
     75 static void
     76 remove_from_atexit_list(struct util_queue *queue)
     77 {
     78    struct util_queue *iter, *tmp;
     79 
     80    pipe_mutex_lock(exit_mutex);
     81    LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) {
     82       if (iter == queue) {
     83          LIST_DEL(&iter->head);
     84          break;
     85       }
     86    }
     87    pipe_mutex_unlock(exit_mutex);
     88 }
     89 
     90 /****************************************************************************
     91  * util_queue implementation
     92  */
     93 
     94 static void
     95 util_queue_fence_signal(struct util_queue_fence *fence)
     96 {
     97    pipe_mutex_lock(fence->mutex);
     98    fence->signalled = true;
     99    pipe_condvar_broadcast(fence->cond);
    100    pipe_mutex_unlock(fence->mutex);
    101 }
    102 
    103 void
    104 util_queue_job_wait(struct util_queue_fence *fence)
    105 {
    106    pipe_mutex_lock(fence->mutex);
    107    while (!fence->signalled)
    108       pipe_condvar_wait(fence->cond, fence->mutex);
    109    pipe_mutex_unlock(fence->mutex);
    110 }
    111 
    112 struct thread_input {
    113    struct util_queue *queue;
    114    int thread_index;
    115 };
    116 
    117 static PIPE_THREAD_ROUTINE(util_queue_thread_func, input)
    118 {
    119    struct util_queue *queue = ((struct thread_input*)input)->queue;
    120    int thread_index = ((struct thread_input*)input)->thread_index;
    121 
    122    FREE(input);
    123 
    124    if (queue->name) {
    125       char name[16];
    126       util_snprintf(name, sizeof(name), "%s:%i", queue->name, thread_index);
    127       pipe_thread_setname(name);
    128    }
    129 
    130    while (1) {
    131       struct util_queue_job job;
    132 
    133       pipe_mutex_lock(queue->lock);
    134       assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
    135 
    136       /* wait if the queue is empty */
    137       while (!queue->kill_threads && queue->num_queued == 0)
    138          pipe_condvar_wait(queue->has_queued_cond, queue->lock);
    139 
    140       if (queue->kill_threads) {
    141          pipe_mutex_unlock(queue->lock);
    142          break;
    143       }
    144 
    145       job = queue->jobs[queue->read_idx];
    146       memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
    147       queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
    148 
    149       queue->num_queued--;
    150       pipe_condvar_signal(queue->has_space_cond);
    151       pipe_mutex_unlock(queue->lock);
    152 
    153       if (job.job) {
    154          job.execute(job.job, thread_index);
    155          util_queue_fence_signal(job.fence);
    156          if (job.cleanup)
    157             job.cleanup(job.job, thread_index);
    158       }
    159    }
    160 
    161    /* signal remaining jobs before terminating */
    162    pipe_mutex_lock(queue->lock);
    163    while (queue->jobs[queue->read_idx].job) {
    164       util_queue_fence_signal(queue->jobs[queue->read_idx].fence);
    165 
    166       queue->jobs[queue->read_idx].job = NULL;
    167       queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
    168    }
    169    queue->num_queued = 0; /* reset this when exiting the thread */
    170    pipe_mutex_unlock(queue->lock);
    171    return 0;
    172 }
    173 
    174 bool
    175 util_queue_init(struct util_queue *queue,
    176                 const char *name,
    177                 unsigned max_jobs,
    178                 unsigned num_threads)
    179 {
    180    unsigned i;
    181 
    182    memset(queue, 0, sizeof(*queue));
    183    queue->name = name;
    184    queue->num_threads = num_threads;
    185    queue->max_jobs = max_jobs;
    186 
    187    queue->jobs = (struct util_queue_job*)
    188                  CALLOC(max_jobs, sizeof(struct util_queue_job));
    189    if (!queue->jobs)
    190       goto fail;
    191 
    192    pipe_mutex_init(queue->lock);
    193 
    194    queue->num_queued = 0;
    195    pipe_condvar_init(queue->has_queued_cond);
    196    pipe_condvar_init(queue->has_space_cond);
    197 
    198    queue->threads = (pipe_thread*)CALLOC(num_threads, sizeof(pipe_thread));
    199    if (!queue->threads)
    200       goto fail;
    201 
    202    /* start threads */
    203    for (i = 0; i < num_threads; i++) {
    204       struct thread_input *input = MALLOC_STRUCT(thread_input);
    205       input->queue = queue;
    206       input->thread_index = i;
    207 
    208       queue->threads[i] = pipe_thread_create(util_queue_thread_func, input);
    209 
    210       if (!queue->threads[i]) {
    211          FREE(input);
    212 
    213          if (i == 0) {
    214             /* no threads created, fail */
    215             goto fail;
    216          } else {
    217             /* at least one thread created, so use it */
    218             queue->num_threads = i;
    219             break;
    220          }
    221       }
    222    }
    223 
    224    add_to_atexit_list(queue);
    225    return true;
    226 
    227 fail:
    228    FREE(queue->threads);
    229 
    230    if (queue->jobs) {
    231       pipe_condvar_destroy(queue->has_space_cond);
    232       pipe_condvar_destroy(queue->has_queued_cond);
    233       pipe_mutex_destroy(queue->lock);
    234       FREE(queue->jobs);
    235    }
    236    /* also util_queue_is_initialized can be used to check for success */
    237    memset(queue, 0, sizeof(*queue));
    238    return false;
    239 }
    240 
    241 static void
    242 util_queue_killall_and_wait(struct util_queue *queue)
    243 {
    244    unsigned i;
    245 
    246    /* Signal all threads to terminate. */
    247    pipe_mutex_lock(queue->lock);
    248    queue->kill_threads = 1;
    249    pipe_condvar_broadcast(queue->has_queued_cond);
    250    pipe_mutex_unlock(queue->lock);
    251 
    252    for (i = 0; i < queue->num_threads; i++)
    253       pipe_thread_wait(queue->threads[i]);
    254    queue->num_threads = 0;
    255 }
    256 
    257 void
    258 util_queue_destroy(struct util_queue *queue)
    259 {
    260    util_queue_killall_and_wait(queue);
    261    remove_from_atexit_list(queue);
    262 
    263    pipe_condvar_destroy(queue->has_space_cond);
    264    pipe_condvar_destroy(queue->has_queued_cond);
    265    pipe_mutex_destroy(queue->lock);
    266    FREE(queue->jobs);
    267    FREE(queue->threads);
    268 }
    269 
    270 void
    271 util_queue_fence_init(struct util_queue_fence *fence)
    272 {
    273    memset(fence, 0, sizeof(*fence));
    274    pipe_mutex_init(fence->mutex);
    275    pipe_condvar_init(fence->cond);
    276    fence->signalled = true;
    277 }
    278 
    279 void
    280 util_queue_fence_destroy(struct util_queue_fence *fence)
    281 {
    282    assert(fence->signalled);
    283    pipe_condvar_destroy(fence->cond);
    284    pipe_mutex_destroy(fence->mutex);
    285 }
    286 
    287 void
    288 util_queue_add_job(struct util_queue *queue,
    289                    void *job,
    290                    struct util_queue_fence *fence,
    291                    util_queue_execute_func execute,
    292                    util_queue_execute_func cleanup)
    293 {
    294    struct util_queue_job *ptr;
    295 
    296    assert(fence->signalled);
    297    fence->signalled = false;
    298 
    299    pipe_mutex_lock(queue->lock);
    300    assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
    301 
    302    /* if the queue is full, wait until there is space */
    303    while (queue->num_queued == queue->max_jobs)
    304       pipe_condvar_wait(queue->has_space_cond, queue->lock);
    305 
    306    ptr = &queue->jobs[queue->write_idx];
    307    assert(ptr->job == NULL);
    308    ptr->job = job;
    309    ptr->fence = fence;
    310    ptr->execute = execute;
    311    ptr->cleanup = cleanup;
    312    queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
    313 
    314    queue->num_queued++;
    315    pipe_condvar_signal(queue->has_queued_cond);
    316    pipe_mutex_unlock(queue->lock);
    317 }
    318