Home | History | Annotate | Download | only in util
      1 /*
      2  * Copyright  2016 Advanced Micro Devices, Inc.
      3  * All Rights Reserved.
      4  *
      5  * Permission is hereby granted, free of charge, to any person obtaining
      6  * a copy of this software and associated documentation files (the
      7  * "Software"), to deal in the Software without restriction, including
      8  * without limitation the rights to use, copy, modify, merge, publish,
      9  * distribute, sub license, and/or sell copies of the Software, and to
     10  * permit persons to whom the Software is furnished to do so, subject to
     11  * the following conditions:
     12  *
     13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
     14  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
     15  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
     16  * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
     17  * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
     19  * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
     20  * USE OR OTHER DEALINGS IN THE SOFTWARE.
     21  *
     22  * The above copyright notice and this permission notice (including the
     23  * next paragraph) shall be included in all copies or substantial portions
     24  * of the Software.
     25  */
     26 
     27 #include "u_queue.h"
     28 
     29 #include <time.h>
     30 
     31 #include "util/os_time.h"
     32 #include "util/u_string.h"
     33 #include "util/u_thread.h"
     34 
     35 static void util_queue_killall_and_wait(struct util_queue *queue);
     36 
     37 /****************************************************************************
     38  * Wait for all queues to assert idle when exit() is called.
     39  *
     40  * Otherwise, C++ static variable destructors can be called while threads
     41  * are using the static variables.
     42  */
     43 
     44 static once_flag atexit_once_flag = ONCE_FLAG_INIT;
     45 static struct list_head queue_list;
     46 static mtx_t exit_mutex = _MTX_INITIALIZER_NP;
     47 
     48 static void
     49 atexit_handler(void)
     50 {
     51    struct util_queue *iter;
     52 
     53    mtx_lock(&exit_mutex);
     54    /* Wait for all queues to assert idle. */
     55    LIST_FOR_EACH_ENTRY(iter, &queue_list, head) {
     56       util_queue_killall_and_wait(iter);
     57    }
     58    mtx_unlock(&exit_mutex);
     59 }
     60 
     61 static void
     62 global_init(void)
     63 {
     64    LIST_INITHEAD(&queue_list);
     65    atexit(atexit_handler);
     66 }
     67 
     68 static void
     69 add_to_atexit_list(struct util_queue *queue)
     70 {
     71    call_once(&atexit_once_flag, global_init);
     72 
     73    mtx_lock(&exit_mutex);
     74    LIST_ADD(&queue->head, &queue_list);
     75    mtx_unlock(&exit_mutex);
     76 }
     77 
     78 static void
     79 remove_from_atexit_list(struct util_queue *queue)
     80 {
     81    struct util_queue *iter, *tmp;
     82 
     83    mtx_lock(&exit_mutex);
     84    LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) {
     85       if (iter == queue) {
     86          LIST_DEL(&iter->head);
     87          break;
     88       }
     89    }
     90    mtx_unlock(&exit_mutex);
     91 }
     92 
     93 /****************************************************************************
     94  * util_queue_fence
     95  */
     96 
     97 #ifdef UTIL_QUEUE_FENCE_FUTEX
     98 static bool
     99 do_futex_fence_wait(struct util_queue_fence *fence,
    100                     bool timeout, int64_t abs_timeout)
    101 {
    102    uint32_t v = fence->val;
    103    struct timespec ts;
    104    ts.tv_sec = abs_timeout / (1000*1000*1000);
    105    ts.tv_nsec = abs_timeout % (1000*1000*1000);
    106 
    107    while (v != 0) {
    108       if (v != 2) {
    109          v = p_atomic_cmpxchg(&fence->val, 1, 2);
    110          if (v == 0)
    111             return true;
    112       }
    113 
    114       int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL);
    115       if (timeout && r < 0) {
    116          if (errno == ETIMEDOUT)
    117             return false;
    118       }
    119 
    120       v = fence->val;
    121    }
    122 
    123    return true;
    124 }
    125 
    126 void
    127 _util_queue_fence_wait(struct util_queue_fence *fence)
    128 {
    129    do_futex_fence_wait(fence, false, 0);
    130 }
    131 
    132 bool
    133 _util_queue_fence_wait_timeout(struct util_queue_fence *fence,
    134                                int64_t abs_timeout)
    135 {
    136    return do_futex_fence_wait(fence, true, abs_timeout);
    137 }
    138 
    139 #endif
    140 
    141 #ifdef UTIL_QUEUE_FENCE_STANDARD
    142 void
    143 util_queue_fence_signal(struct util_queue_fence *fence)
    144 {
    145    mtx_lock(&fence->mutex);
    146    fence->signalled = true;
    147    cnd_broadcast(&fence->cond);
    148    mtx_unlock(&fence->mutex);
    149 }
    150 
    151 void
    152 _util_queue_fence_wait(struct util_queue_fence *fence)
    153 {
    154    mtx_lock(&fence->mutex);
    155    while (!fence->signalled)
    156       cnd_wait(&fence->cond, &fence->mutex);
    157    mtx_unlock(&fence->mutex);
    158 }
    159 
    160 bool
    161 _util_queue_fence_wait_timeout(struct util_queue_fence *fence,
    162                                int64_t abs_timeout)
    163 {
    164    /* This terrible hack is made necessary by the fact that we really want an
    165     * internal interface consistent with os_time_*, but cnd_timedwait is spec'd
    166     * to be relative to the TIME_UTC clock.
    167     */
    168    int64_t rel = abs_timeout - os_time_get_nano();
    169 
    170    if (rel > 0) {
    171       struct timespec ts;
    172 
    173       timespec_get(&ts, TIME_UTC);
    174 
    175       ts.tv_sec += abs_timeout / (1000*1000*1000);
    176       ts.tv_nsec += abs_timeout % (1000*1000*1000);
    177       if (ts.tv_nsec >= (1000*1000*1000)) {
    178          ts.tv_sec++;
    179          ts.tv_nsec -= (1000*1000*1000);
    180       }
    181 
    182       mtx_lock(&fence->mutex);
    183       while (!fence->signalled) {
    184          if (cnd_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success)
    185             break;
    186       }
    187       mtx_unlock(&fence->mutex);
    188    }
    189 
    190    return fence->signalled;
    191 }
    192 
    193 void
    194 util_queue_fence_init(struct util_queue_fence *fence)
    195 {
    196    memset(fence, 0, sizeof(*fence));
    197    (void) mtx_init(&fence->mutex, mtx_plain);
    198    cnd_init(&fence->cond);
    199    fence->signalled = true;
    200 }
    201 
    202 void
    203 util_queue_fence_destroy(struct util_queue_fence *fence)
    204 {
    205    assert(fence->signalled);
    206 
    207    /* Ensure that another thread is not in the middle of
    208     * util_queue_fence_signal (having set the fence to signalled but still
    209     * holding the fence mutex).
    210     *
    211     * A common contract between threads is that as soon as a fence is signalled
    212     * by thread A, thread B is allowed to destroy it. Since
    213     * util_queue_fence_is_signalled does not lock the fence mutex (for
    214     * performance reasons), we must do so here.
    215     */
    216    mtx_lock(&fence->mutex);
    217    mtx_unlock(&fence->mutex);
    218 
    219    cnd_destroy(&fence->cond);
    220    mtx_destroy(&fence->mutex);
    221 }
    222 #endif
    223 
    224 /****************************************************************************
    225  * util_queue implementation
    226  */
    227 
    228 struct thread_input {
    229    struct util_queue *queue;
    230    int thread_index;
    231 };
    232 
    233 static int
    234 util_queue_thread_func(void *input)
    235 {
    236    struct util_queue *queue = ((struct thread_input*)input)->queue;
    237    int thread_index = ((struct thread_input*)input)->thread_index;
    238 
    239    free(input);
    240 
    241    if (queue->name) {
    242       char name[16];
    243       util_snprintf(name, sizeof(name), "%s:%i", queue->name, thread_index);
    244       u_thread_setname(name);
    245    }
    246 
    247    while (1) {
    248       struct util_queue_job job;
    249 
    250       mtx_lock(&queue->lock);
    251       assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
    252 
    253       /* wait if the queue is empty */
    254       while (!queue->kill_threads && queue->num_queued == 0)
    255          cnd_wait(&queue->has_queued_cond, &queue->lock);
    256 
    257       if (queue->kill_threads) {
    258          mtx_unlock(&queue->lock);
    259          break;
    260       }
    261 
    262       job = queue->jobs[queue->read_idx];
    263       memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
    264       queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
    265 
    266       queue->num_queued--;
    267       cnd_signal(&queue->has_space_cond);
    268       mtx_unlock(&queue->lock);
    269 
    270       if (job.job) {
    271          job.execute(job.job, thread_index);
    272          util_queue_fence_signal(job.fence);
    273          if (job.cleanup)
    274             job.cleanup(job.job, thread_index);
    275       }
    276    }
    277 
    278    /* signal remaining jobs before terminating */
    279    mtx_lock(&queue->lock);
    280    for (unsigned i = queue->read_idx; i != queue->write_idx;
    281         i = (i + 1) % queue->max_jobs) {
    282       if (queue->jobs[i].job) {
    283          util_queue_fence_signal(queue->jobs[i].fence);
    284          queue->jobs[i].job = NULL;
    285       }
    286    }
    287    queue->read_idx = queue->write_idx;
    288    queue->num_queued = 0;
    289    mtx_unlock(&queue->lock);
    290    return 0;
    291 }
    292 
    293 bool
    294 util_queue_init(struct util_queue *queue,
    295                 const char *name,
    296                 unsigned max_jobs,
    297                 unsigned num_threads,
    298                 unsigned flags)
    299 {
    300    unsigned i;
    301 
    302    memset(queue, 0, sizeof(*queue));
    303    queue->name = name;
    304    queue->flags = flags;
    305    queue->num_threads = num_threads;
    306    queue->max_jobs = max_jobs;
    307 
    308    queue->jobs = (struct util_queue_job*)
    309                  calloc(max_jobs, sizeof(struct util_queue_job));
    310    if (!queue->jobs)
    311       goto fail;
    312 
    313    (void) mtx_init(&queue->lock, mtx_plain);
    314    (void) mtx_init(&queue->finish_lock, mtx_plain);
    315 
    316    queue->num_queued = 0;
    317    cnd_init(&queue->has_queued_cond);
    318    cnd_init(&queue->has_space_cond);
    319 
    320    queue->threads = (thrd_t*) calloc(num_threads, sizeof(thrd_t));
    321    if (!queue->threads)
    322       goto fail;
    323 
    324    /* start threads */
    325    for (i = 0; i < num_threads; i++) {
    326       struct thread_input *input =
    327          (struct thread_input *) malloc(sizeof(struct thread_input));
    328       input->queue = queue;
    329       input->thread_index = i;
    330 
    331       queue->threads[i] = u_thread_create(util_queue_thread_func, input);
    332 
    333       if (!queue->threads[i]) {
    334          free(input);
    335 
    336          if (i == 0) {
    337             /* no threads created, fail */
    338             goto fail;
    339          } else {
    340             /* at least one thread created, so use it */
    341             queue->num_threads = i;
    342             break;
    343          }
    344       }
    345 
    346       if (flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
    347    #if defined(__linux__) && defined(SCHED_IDLE)
    348          struct sched_param sched_param = {0};
    349 
    350          /* The nice() function can only set a maximum of 19.
    351           * SCHED_IDLE is the same as nice = 20.
    352           *
    353           * Note that Linux only allows decreasing the priority. The original
    354           * priority can't be restored.
    355           */
    356          pthread_setschedparam(queue->threads[i], SCHED_IDLE, &sched_param);
    357    #endif
    358       }
    359    }
    360 
    361    add_to_atexit_list(queue);
    362    return true;
    363 
    364 fail:
    365    free(queue->threads);
    366 
    367    if (queue->jobs) {
    368       cnd_destroy(&queue->has_space_cond);
    369       cnd_destroy(&queue->has_queued_cond);
    370       mtx_destroy(&queue->lock);
    371       free(queue->jobs);
    372    }
    373    /* also util_queue_is_initialized can be used to check for success */
    374    memset(queue, 0, sizeof(*queue));
    375    return false;
    376 }
    377 
    378 static void
    379 util_queue_killall_and_wait(struct util_queue *queue)
    380 {
    381    unsigned i;
    382 
    383    /* Signal all threads to terminate. */
    384    mtx_lock(&queue->lock);
    385    queue->kill_threads = 1;
    386    cnd_broadcast(&queue->has_queued_cond);
    387    mtx_unlock(&queue->lock);
    388 
    389    for (i = 0; i < queue->num_threads; i++)
    390       thrd_join(queue->threads[i], NULL);
    391    queue->num_threads = 0;
    392 }
    393 
    394 void
    395 util_queue_destroy(struct util_queue *queue)
    396 {
    397    util_queue_killall_and_wait(queue);
    398    remove_from_atexit_list(queue);
    399 
    400    cnd_destroy(&queue->has_space_cond);
    401    cnd_destroy(&queue->has_queued_cond);
    402    mtx_destroy(&queue->finish_lock);
    403    mtx_destroy(&queue->lock);
    404    free(queue->jobs);
    405    free(queue->threads);
    406 }
    407 
    408 void
    409 util_queue_add_job(struct util_queue *queue,
    410                    void *job,
    411                    struct util_queue_fence *fence,
    412                    util_queue_execute_func execute,
    413                    util_queue_execute_func cleanup)
    414 {
    415    struct util_queue_job *ptr;
    416 
    417    mtx_lock(&queue->lock);
    418    if (queue->kill_threads) {
    419       mtx_unlock(&queue->lock);
    420       /* well no good option here, but any leaks will be
    421        * short-lived as things are shutting down..
    422        */
    423       return;
    424    }
    425 
    426    util_queue_fence_reset(fence);
    427 
    428    assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
    429 
    430    if (queue->num_queued == queue->max_jobs) {
    431       if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL) {
    432          /* If the queue is full, make it larger to avoid waiting for a free
    433           * slot.
    434           */
    435          unsigned new_max_jobs = queue->max_jobs + 8;
    436          struct util_queue_job *jobs =
    437             (struct util_queue_job*)calloc(new_max_jobs,
    438                                            sizeof(struct util_queue_job));
    439          assert(jobs);
    440 
    441          /* Copy all queued jobs into the new list. */
    442          unsigned num_jobs = 0;
    443          unsigned i = queue->read_idx;
    444 
    445          do {
    446             jobs[num_jobs++] = queue->jobs[i];
    447             i = (i + 1) % queue->max_jobs;
    448          } while (i != queue->write_idx);
    449 
    450          assert(num_jobs == queue->num_queued);
    451 
    452          free(queue->jobs);
    453          queue->jobs = jobs;
    454          queue->read_idx = 0;
    455          queue->write_idx = num_jobs;
    456          queue->max_jobs = new_max_jobs;
    457       } else {
    458          /* Wait until there is a free slot. */
    459          while (queue->num_queued == queue->max_jobs)
    460             cnd_wait(&queue->has_space_cond, &queue->lock);
    461       }
    462    }
    463 
    464    ptr = &queue->jobs[queue->write_idx];
    465    assert(ptr->job == NULL);
    466    ptr->job = job;
    467    ptr->fence = fence;
    468    ptr->execute = execute;
    469    ptr->cleanup = cleanup;
    470    queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
    471 
    472    queue->num_queued++;
    473    cnd_signal(&queue->has_queued_cond);
    474    mtx_unlock(&queue->lock);
    475 }
    476 
    477 /**
    478  * Remove a queued job. If the job hasn't started execution, it's removed from
    479  * the queue. If the job has started execution, the function waits for it to
    480  * complete.
    481  *
    482  * In all cases, the fence is signalled when the function returns.
    483  *
    484  * The function can be used when destroying an object associated with the job
    485  * when you don't care about the job completion state.
    486  */
    487 void
    488 util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence)
    489 {
    490    bool removed = false;
    491 
    492    if (util_queue_fence_is_signalled(fence))
    493       return;
    494 
    495    mtx_lock(&queue->lock);
    496    for (unsigned i = queue->read_idx; i != queue->write_idx;
    497         i = (i + 1) % queue->max_jobs) {
    498       if (queue->jobs[i].fence == fence) {
    499          if (queue->jobs[i].cleanup)
    500             queue->jobs[i].cleanup(queue->jobs[i].job, -1);
    501 
    502          /* Just clear it. The threads will treat as a no-op job. */
    503          memset(&queue->jobs[i], 0, sizeof(queue->jobs[i]));
    504          removed = true;
    505          break;
    506       }
    507    }
    508    mtx_unlock(&queue->lock);
    509 
    510    if (removed)
    511       util_queue_fence_signal(fence);
    512    else
    513       util_queue_fence_wait(fence);
    514 }
    515 
    516 static void
    517 util_queue_finish_execute(void *data, int num_thread)
    518 {
    519    util_barrier *barrier = data;
    520    util_barrier_wait(barrier);
    521 }
    522 
    523 /**
    524  * Wait until all previously added jobs have completed.
    525  */
    526 void
    527 util_queue_finish(struct util_queue *queue)
    528 {
    529    util_barrier barrier;
    530    struct util_queue_fence *fences = malloc(queue->num_threads * sizeof(*fences));
    531 
    532    util_barrier_init(&barrier, queue->num_threads);
    533 
    534    /* If 2 threads were adding jobs for 2 different barries at the same time,
    535     * a deadlock would happen, because 1 barrier requires that all threads
    536     * wait for it exclusively.
    537     */
    538    mtx_lock(&queue->finish_lock);
    539 
    540    for (unsigned i = 0; i < queue->num_threads; ++i) {
    541       util_queue_fence_init(&fences[i]);
    542       util_queue_add_job(queue, &barrier, &fences[i], util_queue_finish_execute, NULL);
    543    }
    544 
    545    for (unsigned i = 0; i < queue->num_threads; ++i) {
    546       util_queue_fence_wait(&fences[i]);
    547       util_queue_fence_destroy(&fences[i]);
    548    }
    549    mtx_unlock(&queue->finish_lock);
    550 
    551    util_barrier_destroy(&barrier);
    552 
    553    free(fences);
    554 }
    555 
    556 int64_t
    557 util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index)
    558 {
    559    /* Allow some flexibility by not raising an error. */
    560    if (thread_index >= queue->num_threads)
    561       return 0;
    562 
    563    return u_thread_get_time_nano(queue->threads[thread_index]);
    564 }
    565