1 /* 2 * Generic workqueue offload mechanism 3 * 4 * Copyright (C) 2015 Jens Axboe <axboe (at) kernel.dk> 5 * 6 */ 7 #include <unistd.h> 8 9 #include "fio.h" 10 #include "flist.h" 11 #include "workqueue.h" 12 #include "smalloc.h" 13 14 enum { 15 SW_F_IDLE = 1 << 0, 16 SW_F_RUNNING = 1 << 1, 17 SW_F_EXIT = 1 << 2, 18 SW_F_ACCOUNTED = 1 << 3, 19 SW_F_ERROR = 1 << 4, 20 }; 21 22 static struct submit_worker *__get_submit_worker(struct workqueue *wq, 23 unsigned int start, 24 unsigned int end, 25 struct submit_worker **best) 26 { 27 struct submit_worker *sw = NULL; 28 29 while (start <= end) { 30 sw = &wq->workers[start]; 31 if (sw->flags & SW_F_IDLE) 32 return sw; 33 if (!(*best) || sw->seq < (*best)->seq) 34 *best = sw; 35 start++; 36 } 37 38 return NULL; 39 } 40 41 static struct submit_worker *get_submit_worker(struct workqueue *wq) 42 { 43 unsigned int next = wq->next_free_worker; 44 struct submit_worker *sw, *best = NULL; 45 46 assert(next < wq->max_workers); 47 48 sw = __get_submit_worker(wq, next, wq->max_workers - 1, &best); 49 if (!sw && next) 50 sw = __get_submit_worker(wq, 0, next - 1, &best); 51 52 /* 53 * No truly idle found, use best match 54 */ 55 if (!sw) 56 sw = best; 57 58 if (sw->index == wq->next_free_worker) { 59 if (sw->index + 1 < wq->max_workers) 60 wq->next_free_worker = sw->index + 1; 61 else 62 wq->next_free_worker = 0; 63 } 64 65 return sw; 66 } 67 68 static bool all_sw_idle(struct workqueue *wq) 69 { 70 int i; 71 72 for (i = 0; i < wq->max_workers; i++) { 73 struct submit_worker *sw = &wq->workers[i]; 74 75 if (!(sw->flags & SW_F_IDLE)) 76 return false; 77 } 78 79 return true; 80 } 81 82 /* 83 * Must be serialized wrt workqueue_enqueue() by caller 84 */ 85 void workqueue_flush(struct workqueue *wq) 86 { 87 wq->wake_idle = 1; 88 89 while (!all_sw_idle(wq)) { 90 pthread_mutex_lock(&wq->flush_lock); 91 pthread_cond_wait(&wq->flush_cond, &wq->flush_lock); 92 pthread_mutex_unlock(&wq->flush_lock); 93 } 94 95 wq->wake_idle = 0; 96 } 97 98 /* 99 * Must be serialized by caller. Returns true for queued, false for busy. 100 */ 101 void workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work) 102 { 103 struct submit_worker *sw; 104 105 sw = get_submit_worker(wq); 106 assert(sw); 107 108 pthread_mutex_lock(&sw->lock); 109 flist_add_tail(&work->list, &sw->work_list); 110 sw->seq = ++wq->work_seq; 111 sw->flags &= ~SW_F_IDLE; 112 pthread_mutex_unlock(&sw->lock); 113 114 pthread_cond_signal(&sw->cond); 115 } 116 117 static void handle_list(struct submit_worker *sw, struct flist_head *list) 118 { 119 struct workqueue *wq = sw->wq; 120 struct workqueue_work *work; 121 122 while (!flist_empty(list)) { 123 work = flist_first_entry(list, struct workqueue_work, list); 124 flist_del_init(&work->list); 125 wq->ops.fn(sw, work); 126 } 127 } 128 129 static void *worker_thread(void *data) 130 { 131 struct submit_worker *sw = data; 132 struct workqueue *wq = sw->wq; 133 unsigned int ret = 0; 134 FLIST_HEAD(local_list); 135 136 sk_out_assign(sw->sk_out); 137 138 if (wq->ops.nice) { 139 if (nice(wq->ops.nice) < 0) { 140 log_err("workqueue: nice %s\n", strerror(errno)); 141 ret = 1; 142 } 143 } 144 145 if (!ret) 146 ret = workqueue_init_worker(sw); 147 148 pthread_mutex_lock(&sw->lock); 149 sw->flags |= SW_F_RUNNING; 150 if (ret) 151 sw->flags |= SW_F_ERROR; 152 pthread_mutex_unlock(&sw->lock); 153 154 pthread_mutex_lock(&wq->flush_lock); 155 pthread_cond_signal(&wq->flush_cond); 156 pthread_mutex_unlock(&wq->flush_lock); 157 158 if (sw->flags & SW_F_ERROR) 159 goto done; 160 161 while (1) { 162 pthread_mutex_lock(&sw->lock); 163 164 if (flist_empty(&sw->work_list)) { 165 if (sw->flags & SW_F_EXIT) { 166 pthread_mutex_unlock(&sw->lock); 167 break; 168 } 169 170 if (workqueue_pre_sleep_check(sw)) { 171 pthread_mutex_unlock(&sw->lock); 172 workqueue_pre_sleep(sw); 173 pthread_mutex_lock(&sw->lock); 174 } 175 176 /* 177 * We dropped and reaquired the lock, check 178 * state again. 179 */ 180 if (!flist_empty(&sw->work_list)) 181 goto handle_work; 182 183 if (sw->flags & SW_F_EXIT) { 184 pthread_mutex_unlock(&sw->lock); 185 break; 186 } else if (!(sw->flags & SW_F_IDLE)) { 187 sw->flags |= SW_F_IDLE; 188 wq->next_free_worker = sw->index; 189 if (wq->wake_idle) 190 pthread_cond_signal(&wq->flush_cond); 191 } 192 if (wq->ops.update_acct_fn) 193 wq->ops.update_acct_fn(sw); 194 195 pthread_cond_wait(&sw->cond, &sw->lock); 196 } else { 197 handle_work: 198 flist_splice_init(&sw->work_list, &local_list); 199 } 200 pthread_mutex_unlock(&sw->lock); 201 handle_list(sw, &local_list); 202 } 203 204 if (wq->ops.update_acct_fn) 205 wq->ops.update_acct_fn(sw); 206 207 done: 208 sk_out_drop(); 209 return NULL; 210 } 211 212 static void free_worker(struct submit_worker *sw, unsigned int *sum_cnt) 213 { 214 struct workqueue *wq = sw->wq; 215 216 workqueue_exit_worker(sw, sum_cnt); 217 218 pthread_cond_destroy(&sw->cond); 219 pthread_mutex_destroy(&sw->lock); 220 221 if (wq->ops.free_worker_fn) 222 wq->ops.free_worker_fn(sw); 223 } 224 225 static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt) 226 { 227 pthread_join(sw->thread, NULL); 228 free_worker(sw, sum_cnt); 229 } 230 231 void workqueue_exit(struct workqueue *wq) 232 { 233 unsigned int shutdown, sum_cnt = 0; 234 struct submit_worker *sw; 235 int i; 236 237 if (!wq->workers) 238 return; 239 240 for (i = 0; i < wq->max_workers; i++) { 241 sw = &wq->workers[i]; 242 243 pthread_mutex_lock(&sw->lock); 244 sw->flags |= SW_F_EXIT; 245 pthread_cond_signal(&sw->cond); 246 pthread_mutex_unlock(&sw->lock); 247 } 248 249 do { 250 shutdown = 0; 251 for (i = 0; i < wq->max_workers; i++) { 252 sw = &wq->workers[i]; 253 if (sw->flags & SW_F_ACCOUNTED) 254 continue; 255 pthread_mutex_lock(&sw->lock); 256 sw->flags |= SW_F_ACCOUNTED; 257 pthread_mutex_unlock(&sw->lock); 258 shutdown_worker(sw, &sum_cnt); 259 shutdown++; 260 } 261 } while (shutdown && shutdown != wq->max_workers); 262 263 sfree(wq->workers); 264 wq->workers = NULL; 265 pthread_mutex_destroy(&wq->flush_lock); 266 pthread_cond_destroy(&wq->flush_cond); 267 pthread_mutex_destroy(&wq->stat_lock); 268 } 269 270 static int start_worker(struct workqueue *wq, unsigned int index, 271 struct sk_out *sk_out) 272 { 273 struct submit_worker *sw = &wq->workers[index]; 274 int ret; 275 276 INIT_FLIST_HEAD(&sw->work_list); 277 278 ret = mutex_cond_init_pshared(&sw->lock, &sw->cond); 279 if (ret) 280 return ret; 281 282 sw->wq = wq; 283 sw->index = index; 284 sw->sk_out = sk_out; 285 286 if (wq->ops.alloc_worker_fn) { 287 ret = wq->ops.alloc_worker_fn(sw); 288 if (ret) 289 return ret; 290 } 291 292 ret = pthread_create(&sw->thread, NULL, worker_thread, sw); 293 if (!ret) { 294 pthread_mutex_lock(&sw->lock); 295 sw->flags = SW_F_IDLE; 296 pthread_mutex_unlock(&sw->lock); 297 return 0; 298 } 299 300 free_worker(sw, NULL); 301 return 1; 302 } 303 304 int workqueue_init(struct thread_data *td, struct workqueue *wq, 305 struct workqueue_ops *ops, unsigned int max_workers, 306 struct sk_out *sk_out) 307 { 308 unsigned int running; 309 int i, error; 310 int ret; 311 312 wq->max_workers = max_workers; 313 wq->td = td; 314 wq->ops = *ops; 315 wq->work_seq = 0; 316 wq->next_free_worker = 0; 317 318 ret = mutex_cond_init_pshared(&wq->flush_lock, &wq->flush_cond); 319 if (ret) 320 goto err; 321 ret = mutex_init_pshared(&wq->stat_lock); 322 if (ret) 323 goto err; 324 325 wq->workers = smalloc(wq->max_workers * sizeof(struct submit_worker)); 326 if (!wq->workers) 327 goto err; 328 329 for (i = 0; i < wq->max_workers; i++) 330 if (start_worker(wq, i, sk_out)) 331 break; 332 333 wq->max_workers = i; 334 if (!wq->max_workers) 335 goto err; 336 337 /* 338 * Wait for them all to be started and initialized 339 */ 340 error = 0; 341 do { 342 struct submit_worker *sw; 343 344 running = 0; 345 pthread_mutex_lock(&wq->flush_lock); 346 for (i = 0; i < wq->max_workers; i++) { 347 sw = &wq->workers[i]; 348 pthread_mutex_lock(&sw->lock); 349 if (sw->flags & SW_F_RUNNING) 350 running++; 351 if (sw->flags & SW_F_ERROR) 352 error++; 353 pthread_mutex_unlock(&sw->lock); 354 } 355 356 if (error || running == wq->max_workers) { 357 pthread_mutex_unlock(&wq->flush_lock); 358 break; 359 } 360 361 pthread_cond_wait(&wq->flush_cond, &wq->flush_lock); 362 pthread_mutex_unlock(&wq->flush_lock); 363 } while (1); 364 365 if (!error) 366 return 0; 367 368 err: 369 log_err("Can't create rate workqueue\n"); 370 td_verror(td, ESRCH, "workqueue_init"); 371 workqueue_exit(wq); 372 return 1; 373 } 374