1 /* 2 * Copyright 2016 Advanced Micro Devices, Inc. 3 * All Rights Reserved. 4 * 5 * Permission is hereby granted, free of charge, to any person obtaining 6 * a copy of this software and associated documentation files (the 7 * "Software"), to deal in the Software without restriction, including 8 * without limitation the rights to use, copy, modify, merge, publish, 9 * distribute, sub license, and/or sell copies of the Software, and to 10 * permit persons to whom the Software is furnished to do so, subject to 11 * the following conditions: 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 14 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 15 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 16 * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS 17 * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, 19 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20 * USE OR OTHER DEALINGS IN THE SOFTWARE. 21 * 22 * The above copyright notice and this permission notice (including the 23 * next paragraph) shall be included in all copies or substantial portions 24 * of the Software. 25 */ 26 27 #include "u_queue.h" 28 29 #include <time.h> 30 31 #include "util/os_time.h" 32 #include "util/u_string.h" 33 #include "util/u_thread.h" 34 35 static void util_queue_killall_and_wait(struct util_queue *queue); 36 37 /**************************************************************************** 38 * Wait for all queues to assert idle when exit() is called. 39 * 40 * Otherwise, C++ static variable destructors can be called while threads 41 * are using the static variables. 42 */ 43 44 static once_flag atexit_once_flag = ONCE_FLAG_INIT; 45 static struct list_head queue_list; 46 static mtx_t exit_mutex = _MTX_INITIALIZER_NP; 47 48 static void 49 atexit_handler(void) 50 { 51 struct util_queue *iter; 52 53 mtx_lock(&exit_mutex); 54 /* Wait for all queues to assert idle. */ 55 LIST_FOR_EACH_ENTRY(iter, &queue_list, head) { 56 util_queue_killall_and_wait(iter); 57 } 58 mtx_unlock(&exit_mutex); 59 } 60 61 static void 62 global_init(void) 63 { 64 LIST_INITHEAD(&queue_list); 65 atexit(atexit_handler); 66 } 67 68 static void 69 add_to_atexit_list(struct util_queue *queue) 70 { 71 call_once(&atexit_once_flag, global_init); 72 73 mtx_lock(&exit_mutex); 74 LIST_ADD(&queue->head, &queue_list); 75 mtx_unlock(&exit_mutex); 76 } 77 78 static void 79 remove_from_atexit_list(struct util_queue *queue) 80 { 81 struct util_queue *iter, *tmp; 82 83 mtx_lock(&exit_mutex); 84 LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) { 85 if (iter == queue) { 86 LIST_DEL(&iter->head); 87 break; 88 } 89 } 90 mtx_unlock(&exit_mutex); 91 } 92 93 /**************************************************************************** 94 * util_queue_fence 95 */ 96 97 #ifdef UTIL_QUEUE_FENCE_FUTEX 98 static bool 99 do_futex_fence_wait(struct util_queue_fence *fence, 100 bool timeout, int64_t abs_timeout) 101 { 102 uint32_t v = fence->val; 103 struct timespec ts; 104 ts.tv_sec = abs_timeout / (1000*1000*1000); 105 ts.tv_nsec = abs_timeout % (1000*1000*1000); 106 107 while (v != 0) { 108 if (v != 2) { 109 v = p_atomic_cmpxchg(&fence->val, 1, 2); 110 if (v == 0) 111 return true; 112 } 113 114 int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL); 115 if (timeout && r < 0) { 116 if (errno == ETIMEDOUT) 117 return false; 118 } 119 120 v = fence->val; 121 } 122 123 return true; 124 } 125 126 void 127 _util_queue_fence_wait(struct util_queue_fence *fence) 128 { 129 do_futex_fence_wait(fence, false, 0); 130 } 131 132 bool 133 _util_queue_fence_wait_timeout(struct util_queue_fence *fence, 134 int64_t abs_timeout) 135 { 136 return do_futex_fence_wait(fence, true, abs_timeout); 137 } 138 139 #endif 140 141 #ifdef UTIL_QUEUE_FENCE_STANDARD 142 void 143 util_queue_fence_signal(struct util_queue_fence *fence) 144 { 145 mtx_lock(&fence->mutex); 146 fence->signalled = true; 147 cnd_broadcast(&fence->cond); 148 mtx_unlock(&fence->mutex); 149 } 150 151 void 152 _util_queue_fence_wait(struct util_queue_fence *fence) 153 { 154 mtx_lock(&fence->mutex); 155 while (!fence->signalled) 156 cnd_wait(&fence->cond, &fence->mutex); 157 mtx_unlock(&fence->mutex); 158 } 159 160 bool 161 _util_queue_fence_wait_timeout(struct util_queue_fence *fence, 162 int64_t abs_timeout) 163 { 164 /* This terrible hack is made necessary by the fact that we really want an 165 * internal interface consistent with os_time_*, but cnd_timedwait is spec'd 166 * to be relative to the TIME_UTC clock. 167 */ 168 int64_t rel = abs_timeout - os_time_get_nano(); 169 170 if (rel > 0) { 171 struct timespec ts; 172 173 timespec_get(&ts, TIME_UTC); 174 175 ts.tv_sec += abs_timeout / (1000*1000*1000); 176 ts.tv_nsec += abs_timeout % (1000*1000*1000); 177 if (ts.tv_nsec >= (1000*1000*1000)) { 178 ts.tv_sec++; 179 ts.tv_nsec -= (1000*1000*1000); 180 } 181 182 mtx_lock(&fence->mutex); 183 while (!fence->signalled) { 184 if (cnd_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success) 185 break; 186 } 187 mtx_unlock(&fence->mutex); 188 } 189 190 return fence->signalled; 191 } 192 193 void 194 util_queue_fence_init(struct util_queue_fence *fence) 195 { 196 memset(fence, 0, sizeof(*fence)); 197 (void) mtx_init(&fence->mutex, mtx_plain); 198 cnd_init(&fence->cond); 199 fence->signalled = true; 200 } 201 202 void 203 util_queue_fence_destroy(struct util_queue_fence *fence) 204 { 205 assert(fence->signalled); 206 207 /* Ensure that another thread is not in the middle of 208 * util_queue_fence_signal (having set the fence to signalled but still 209 * holding the fence mutex). 210 * 211 * A common contract between threads is that as soon as a fence is signalled 212 * by thread A, thread B is allowed to destroy it. Since 213 * util_queue_fence_is_signalled does not lock the fence mutex (for 214 * performance reasons), we must do so here. 215 */ 216 mtx_lock(&fence->mutex); 217 mtx_unlock(&fence->mutex); 218 219 cnd_destroy(&fence->cond); 220 mtx_destroy(&fence->mutex); 221 } 222 #endif 223 224 /**************************************************************************** 225 * util_queue implementation 226 */ 227 228 struct thread_input { 229 struct util_queue *queue; 230 int thread_index; 231 }; 232 233 static int 234 util_queue_thread_func(void *input) 235 { 236 struct util_queue *queue = ((struct thread_input*)input)->queue; 237 int thread_index = ((struct thread_input*)input)->thread_index; 238 239 free(input); 240 241 if (queue->name) { 242 char name[16]; 243 util_snprintf(name, sizeof(name), "%s:%i", queue->name, thread_index); 244 u_thread_setname(name); 245 } 246 247 while (1) { 248 struct util_queue_job job; 249 250 mtx_lock(&queue->lock); 251 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); 252 253 /* wait if the queue is empty */ 254 while (!queue->kill_threads && queue->num_queued == 0) 255 cnd_wait(&queue->has_queued_cond, &queue->lock); 256 257 if (queue->kill_threads) { 258 mtx_unlock(&queue->lock); 259 break; 260 } 261 262 job = queue->jobs[queue->read_idx]; 263 memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job)); 264 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs; 265 266 queue->num_queued--; 267 cnd_signal(&queue->has_space_cond); 268 mtx_unlock(&queue->lock); 269 270 if (job.job) { 271 job.execute(job.job, thread_index); 272 util_queue_fence_signal(job.fence); 273 if (job.cleanup) 274 job.cleanup(job.job, thread_index); 275 } 276 } 277 278 /* signal remaining jobs before terminating */ 279 mtx_lock(&queue->lock); 280 for (unsigned i = queue->read_idx; i != queue->write_idx; 281 i = (i + 1) % queue->max_jobs) { 282 if (queue->jobs[i].job) { 283 util_queue_fence_signal(queue->jobs[i].fence); 284 queue->jobs[i].job = NULL; 285 } 286 } 287 queue->read_idx = queue->write_idx; 288 queue->num_queued = 0; 289 mtx_unlock(&queue->lock); 290 return 0; 291 } 292 293 bool 294 util_queue_init(struct util_queue *queue, 295 const char *name, 296 unsigned max_jobs, 297 unsigned num_threads, 298 unsigned flags) 299 { 300 unsigned i; 301 302 memset(queue, 0, sizeof(*queue)); 303 queue->name = name; 304 queue->flags = flags; 305 queue->num_threads = num_threads; 306 queue->max_jobs = max_jobs; 307 308 queue->jobs = (struct util_queue_job*) 309 calloc(max_jobs, sizeof(struct util_queue_job)); 310 if (!queue->jobs) 311 goto fail; 312 313 (void) mtx_init(&queue->lock, mtx_plain); 314 (void) mtx_init(&queue->finish_lock, mtx_plain); 315 316 queue->num_queued = 0; 317 cnd_init(&queue->has_queued_cond); 318 cnd_init(&queue->has_space_cond); 319 320 queue->threads = (thrd_t*) calloc(num_threads, sizeof(thrd_t)); 321 if (!queue->threads) 322 goto fail; 323 324 /* start threads */ 325 for (i = 0; i < num_threads; i++) { 326 struct thread_input *input = 327 (struct thread_input *) malloc(sizeof(struct thread_input)); 328 input->queue = queue; 329 input->thread_index = i; 330 331 queue->threads[i] = u_thread_create(util_queue_thread_func, input); 332 333 if (!queue->threads[i]) { 334 free(input); 335 336 if (i == 0) { 337 /* no threads created, fail */ 338 goto fail; 339 } else { 340 /* at least one thread created, so use it */ 341 queue->num_threads = i; 342 break; 343 } 344 } 345 346 if (flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) { 347 #if defined(__linux__) && defined(SCHED_IDLE) 348 struct sched_param sched_param = {0}; 349 350 /* The nice() function can only set a maximum of 19. 351 * SCHED_IDLE is the same as nice = 20. 352 * 353 * Note that Linux only allows decreasing the priority. The original 354 * priority can't be restored. 355 */ 356 pthread_setschedparam(queue->threads[i], SCHED_IDLE, &sched_param); 357 #endif 358 } 359 } 360 361 add_to_atexit_list(queue); 362 return true; 363 364 fail: 365 free(queue->threads); 366 367 if (queue->jobs) { 368 cnd_destroy(&queue->has_space_cond); 369 cnd_destroy(&queue->has_queued_cond); 370 mtx_destroy(&queue->lock); 371 free(queue->jobs); 372 } 373 /* also util_queue_is_initialized can be used to check for success */ 374 memset(queue, 0, sizeof(*queue)); 375 return false; 376 } 377 378 static void 379 util_queue_killall_and_wait(struct util_queue *queue) 380 { 381 unsigned i; 382 383 /* Signal all threads to terminate. */ 384 mtx_lock(&queue->lock); 385 queue->kill_threads = 1; 386 cnd_broadcast(&queue->has_queued_cond); 387 mtx_unlock(&queue->lock); 388 389 for (i = 0; i < queue->num_threads; i++) 390 thrd_join(queue->threads[i], NULL); 391 queue->num_threads = 0; 392 } 393 394 void 395 util_queue_destroy(struct util_queue *queue) 396 { 397 util_queue_killall_and_wait(queue); 398 remove_from_atexit_list(queue); 399 400 cnd_destroy(&queue->has_space_cond); 401 cnd_destroy(&queue->has_queued_cond); 402 mtx_destroy(&queue->finish_lock); 403 mtx_destroy(&queue->lock); 404 free(queue->jobs); 405 free(queue->threads); 406 } 407 408 void 409 util_queue_add_job(struct util_queue *queue, 410 void *job, 411 struct util_queue_fence *fence, 412 util_queue_execute_func execute, 413 util_queue_execute_func cleanup) 414 { 415 struct util_queue_job *ptr; 416 417 mtx_lock(&queue->lock); 418 if (queue->kill_threads) { 419 mtx_unlock(&queue->lock); 420 /* well no good option here, but any leaks will be 421 * short-lived as things are shutting down.. 422 */ 423 return; 424 } 425 426 util_queue_fence_reset(fence); 427 428 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); 429 430 if (queue->num_queued == queue->max_jobs) { 431 if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL) { 432 /* If the queue is full, make it larger to avoid waiting for a free 433 * slot. 434 */ 435 unsigned new_max_jobs = queue->max_jobs + 8; 436 struct util_queue_job *jobs = 437 (struct util_queue_job*)calloc(new_max_jobs, 438 sizeof(struct util_queue_job)); 439 assert(jobs); 440 441 /* Copy all queued jobs into the new list. */ 442 unsigned num_jobs = 0; 443 unsigned i = queue->read_idx; 444 445 do { 446 jobs[num_jobs++] = queue->jobs[i]; 447 i = (i + 1) % queue->max_jobs; 448 } while (i != queue->write_idx); 449 450 assert(num_jobs == queue->num_queued); 451 452 free(queue->jobs); 453 queue->jobs = jobs; 454 queue->read_idx = 0; 455 queue->write_idx = num_jobs; 456 queue->max_jobs = new_max_jobs; 457 } else { 458 /* Wait until there is a free slot. */ 459 while (queue->num_queued == queue->max_jobs) 460 cnd_wait(&queue->has_space_cond, &queue->lock); 461 } 462 } 463 464 ptr = &queue->jobs[queue->write_idx]; 465 assert(ptr->job == NULL); 466 ptr->job = job; 467 ptr->fence = fence; 468 ptr->execute = execute; 469 ptr->cleanup = cleanup; 470 queue->write_idx = (queue->write_idx + 1) % queue->max_jobs; 471 472 queue->num_queued++; 473 cnd_signal(&queue->has_queued_cond); 474 mtx_unlock(&queue->lock); 475 } 476 477 /** 478 * Remove a queued job. If the job hasn't started execution, it's removed from 479 * the queue. If the job has started execution, the function waits for it to 480 * complete. 481 * 482 * In all cases, the fence is signalled when the function returns. 483 * 484 * The function can be used when destroying an object associated with the job 485 * when you don't care about the job completion state. 486 */ 487 void 488 util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence) 489 { 490 bool removed = false; 491 492 if (util_queue_fence_is_signalled(fence)) 493 return; 494 495 mtx_lock(&queue->lock); 496 for (unsigned i = queue->read_idx; i != queue->write_idx; 497 i = (i + 1) % queue->max_jobs) { 498 if (queue->jobs[i].fence == fence) { 499 if (queue->jobs[i].cleanup) 500 queue->jobs[i].cleanup(queue->jobs[i].job, -1); 501 502 /* Just clear it. The threads will treat as a no-op job. */ 503 memset(&queue->jobs[i], 0, sizeof(queue->jobs[i])); 504 removed = true; 505 break; 506 } 507 } 508 mtx_unlock(&queue->lock); 509 510 if (removed) 511 util_queue_fence_signal(fence); 512 else 513 util_queue_fence_wait(fence); 514 } 515 516 static void 517 util_queue_finish_execute(void *data, int num_thread) 518 { 519 util_barrier *barrier = data; 520 util_barrier_wait(barrier); 521 } 522 523 /** 524 * Wait until all previously added jobs have completed. 525 */ 526 void 527 util_queue_finish(struct util_queue *queue) 528 { 529 util_barrier barrier; 530 struct util_queue_fence *fences = malloc(queue->num_threads * sizeof(*fences)); 531 532 util_barrier_init(&barrier, queue->num_threads); 533 534 /* If 2 threads were adding jobs for 2 different barries at the same time, 535 * a deadlock would happen, because 1 barrier requires that all threads 536 * wait for it exclusively. 537 */ 538 mtx_lock(&queue->finish_lock); 539 540 for (unsigned i = 0; i < queue->num_threads; ++i) { 541 util_queue_fence_init(&fences[i]); 542 util_queue_add_job(queue, &barrier, &fences[i], util_queue_finish_execute, NULL); 543 } 544 545 for (unsigned i = 0; i < queue->num_threads; ++i) { 546 util_queue_fence_wait(&fences[i]); 547 util_queue_fence_destroy(&fences[i]); 548 } 549 mtx_unlock(&queue->finish_lock); 550 551 util_barrier_destroy(&barrier); 552 553 free(fences); 554 } 555 556 int64_t 557 util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index) 558 { 559 /* Allow some flexibility by not raising an error. */ 560 if (thread_index >= queue->num_threads) 561 return 0; 562 563 return u_thread_get_time_nano(queue->threads[thread_index]); 564 } 565