1 /* 2 * Copyright 2016 Advanced Micro Devices, Inc. 3 * All Rights Reserved. 4 * 5 * Permission is hereby granted, free of charge, to any person obtaining 6 * a copy of this software and associated documentation files (the 7 * "Software"), to deal in the Software without restriction, including 8 * without limitation the rights to use, copy, modify, merge, publish, 9 * distribute, sub license, and/or sell copies of the Software, and to 10 * permit persons to whom the Software is furnished to do so, subject to 11 * the following conditions: 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 14 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 15 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 16 * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS 17 * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, 19 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20 * USE OR OTHER DEALINGS IN THE SOFTWARE. 21 * 22 * The above copyright notice and this permission notice (including the 23 * next paragraph) shall be included in all copies or substantial portions 24 * of the Software. 25 */ 26 27 #include "u_queue.h" 28 #include "u_memory.h" 29 #include "u_string.h" 30 #include "os/os_time.h" 31 32 static void util_queue_killall_and_wait(struct util_queue *queue); 33 34 /**************************************************************************** 35 * Wait for all queues to assert idle when exit() is called. 36 * 37 * Otherwise, C++ static variable destructors can be called while threads 38 * are using the static variables. 39 */ 40 41 static once_flag atexit_once_flag = ONCE_FLAG_INIT; 42 static struct list_head queue_list; 43 pipe_static_mutex(exit_mutex); 44 45 static void 46 atexit_handler(void) 47 { 48 struct util_queue *iter; 49 50 pipe_mutex_lock(exit_mutex); 51 /* Wait for all queues to assert idle. */ 52 LIST_FOR_EACH_ENTRY(iter, &queue_list, head) { 53 util_queue_killall_and_wait(iter); 54 } 55 pipe_mutex_unlock(exit_mutex); 56 } 57 58 static void 59 global_init(void) 60 { 61 LIST_INITHEAD(&queue_list); 62 atexit(atexit_handler); 63 } 64 65 static void 66 add_to_atexit_list(struct util_queue *queue) 67 { 68 call_once(&atexit_once_flag, global_init); 69 70 pipe_mutex_lock(exit_mutex); 71 LIST_ADD(&queue->head, &queue_list); 72 pipe_mutex_unlock(exit_mutex); 73 } 74 75 static void 76 remove_from_atexit_list(struct util_queue *queue) 77 { 78 struct util_queue *iter, *tmp; 79 80 pipe_mutex_lock(exit_mutex); 81 LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) { 82 if (iter == queue) { 83 LIST_DEL(&iter->head); 84 break; 85 } 86 } 87 pipe_mutex_unlock(exit_mutex); 88 } 89 90 /**************************************************************************** 91 * util_queue implementation 92 */ 93 94 static void 95 util_queue_fence_signal(struct util_queue_fence *fence) 96 { 97 pipe_mutex_lock(fence->mutex); 98 fence->signalled = true; 99 pipe_condvar_broadcast(fence->cond); 100 pipe_mutex_unlock(fence->mutex); 101 } 102 103 void 104 util_queue_job_wait(struct util_queue_fence *fence) 105 { 106 pipe_mutex_lock(fence->mutex); 107 while (!fence->signalled) 108 pipe_condvar_wait(fence->cond, fence->mutex); 109 pipe_mutex_unlock(fence->mutex); 110 } 111 112 struct thread_input { 113 struct util_queue *queue; 114 int thread_index; 115 }; 116 117 static PIPE_THREAD_ROUTINE(util_queue_thread_func, input) 118 { 119 struct util_queue *queue = ((struct thread_input*)input)->queue; 120 int thread_index = ((struct thread_input*)input)->thread_index; 121 122 FREE(input); 123 124 if (queue->name) { 125 char name[16]; 126 util_snprintf(name, sizeof(name), "%s:%i", queue->name, thread_index); 127 pipe_thread_setname(name); 128 } 129 130 while (1) { 131 struct util_queue_job job; 132 133 pipe_mutex_lock(queue->lock); 134 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); 135 136 /* wait if the queue is empty */ 137 while (!queue->kill_threads && queue->num_queued == 0) 138 pipe_condvar_wait(queue->has_queued_cond, queue->lock); 139 140 if (queue->kill_threads) { 141 pipe_mutex_unlock(queue->lock); 142 break; 143 } 144 145 job = queue->jobs[queue->read_idx]; 146 memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job)); 147 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs; 148 149 queue->num_queued--; 150 pipe_condvar_signal(queue->has_space_cond); 151 pipe_mutex_unlock(queue->lock); 152 153 if (job.job) { 154 job.execute(job.job, thread_index); 155 util_queue_fence_signal(job.fence); 156 if (job.cleanup) 157 job.cleanup(job.job, thread_index); 158 } 159 } 160 161 /* signal remaining jobs before terminating */ 162 pipe_mutex_lock(queue->lock); 163 while (queue->jobs[queue->read_idx].job) { 164 util_queue_fence_signal(queue->jobs[queue->read_idx].fence); 165 166 queue->jobs[queue->read_idx].job = NULL; 167 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs; 168 } 169 queue->num_queued = 0; /* reset this when exiting the thread */ 170 pipe_mutex_unlock(queue->lock); 171 return 0; 172 } 173 174 bool 175 util_queue_init(struct util_queue *queue, 176 const char *name, 177 unsigned max_jobs, 178 unsigned num_threads) 179 { 180 unsigned i; 181 182 memset(queue, 0, sizeof(*queue)); 183 queue->name = name; 184 queue->num_threads = num_threads; 185 queue->max_jobs = max_jobs; 186 187 queue->jobs = (struct util_queue_job*) 188 CALLOC(max_jobs, sizeof(struct util_queue_job)); 189 if (!queue->jobs) 190 goto fail; 191 192 pipe_mutex_init(queue->lock); 193 194 queue->num_queued = 0; 195 pipe_condvar_init(queue->has_queued_cond); 196 pipe_condvar_init(queue->has_space_cond); 197 198 queue->threads = (pipe_thread*)CALLOC(num_threads, sizeof(pipe_thread)); 199 if (!queue->threads) 200 goto fail; 201 202 /* start threads */ 203 for (i = 0; i < num_threads; i++) { 204 struct thread_input *input = MALLOC_STRUCT(thread_input); 205 input->queue = queue; 206 input->thread_index = i; 207 208 queue->threads[i] = pipe_thread_create(util_queue_thread_func, input); 209 210 if (!queue->threads[i]) { 211 FREE(input); 212 213 if (i == 0) { 214 /* no threads created, fail */ 215 goto fail; 216 } else { 217 /* at least one thread created, so use it */ 218 queue->num_threads = i; 219 break; 220 } 221 } 222 } 223 224 add_to_atexit_list(queue); 225 return true; 226 227 fail: 228 FREE(queue->threads); 229 230 if (queue->jobs) { 231 pipe_condvar_destroy(queue->has_space_cond); 232 pipe_condvar_destroy(queue->has_queued_cond); 233 pipe_mutex_destroy(queue->lock); 234 FREE(queue->jobs); 235 } 236 /* also util_queue_is_initialized can be used to check for success */ 237 memset(queue, 0, sizeof(*queue)); 238 return false; 239 } 240 241 static void 242 util_queue_killall_and_wait(struct util_queue *queue) 243 { 244 unsigned i; 245 246 /* Signal all threads to terminate. */ 247 pipe_mutex_lock(queue->lock); 248 queue->kill_threads = 1; 249 pipe_condvar_broadcast(queue->has_queued_cond); 250 pipe_mutex_unlock(queue->lock); 251 252 for (i = 0; i < queue->num_threads; i++) 253 pipe_thread_wait(queue->threads[i]); 254 queue->num_threads = 0; 255 } 256 257 void 258 util_queue_destroy(struct util_queue *queue) 259 { 260 util_queue_killall_and_wait(queue); 261 remove_from_atexit_list(queue); 262 263 pipe_condvar_destroy(queue->has_space_cond); 264 pipe_condvar_destroy(queue->has_queued_cond); 265 pipe_mutex_destroy(queue->lock); 266 FREE(queue->jobs); 267 FREE(queue->threads); 268 } 269 270 void 271 util_queue_fence_init(struct util_queue_fence *fence) 272 { 273 memset(fence, 0, sizeof(*fence)); 274 pipe_mutex_init(fence->mutex); 275 pipe_condvar_init(fence->cond); 276 fence->signalled = true; 277 } 278 279 void 280 util_queue_fence_destroy(struct util_queue_fence *fence) 281 { 282 assert(fence->signalled); 283 pipe_condvar_destroy(fence->cond); 284 pipe_mutex_destroy(fence->mutex); 285 } 286 287 void 288 util_queue_add_job(struct util_queue *queue, 289 void *job, 290 struct util_queue_fence *fence, 291 util_queue_execute_func execute, 292 util_queue_execute_func cleanup) 293 { 294 struct util_queue_job *ptr; 295 296 assert(fence->signalled); 297 fence->signalled = false; 298 299 pipe_mutex_lock(queue->lock); 300 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); 301 302 /* if the queue is full, wait until there is space */ 303 while (queue->num_queued == queue->max_jobs) 304 pipe_condvar_wait(queue->has_space_cond, queue->lock); 305 306 ptr = &queue->jobs[queue->write_idx]; 307 assert(ptr->job == NULL); 308 ptr->job = job; 309 ptr->fence = fence; 310 ptr->execute = execute; 311 ptr->cleanup = cleanup; 312 queue->write_idx = (queue->write_idx + 1) % queue->max_jobs; 313 314 queue->num_queued++; 315 pipe_condvar_signal(queue->has_queued_cond); 316 pipe_mutex_unlock(queue->lock); 317 } 318