Home | History | Annotate | Download | only in fio
      1 /*
      2  * Generic workqueue offload mechanism
      3  *
      4  * Copyright (C) 2015 Jens Axboe <axboe (at) kernel.dk>
      5  *
      6  */
      7 #include <unistd.h>
      8 
      9 #include "fio.h"
     10 #include "flist.h"
     11 #include "workqueue.h"
     12 #include "smalloc.h"
     13 
     14 enum {
     15 	SW_F_IDLE	= 1 << 0,
     16 	SW_F_RUNNING	= 1 << 1,
     17 	SW_F_EXIT	= 1 << 2,
     18 	SW_F_ACCOUNTED	= 1 << 3,
     19 	SW_F_ERROR	= 1 << 4,
     20 };
     21 
     22 static struct submit_worker *__get_submit_worker(struct workqueue *wq,
     23 						 unsigned int start,
     24 						 unsigned int end,
     25 						 struct submit_worker **best)
     26 {
     27 	struct submit_worker *sw = NULL;
     28 
     29 	while (start <= end) {
     30 		sw = &wq->workers[start];
     31 		if (sw->flags & SW_F_IDLE)
     32 			return sw;
     33 		if (!(*best) || sw->seq < (*best)->seq)
     34 			*best = sw;
     35 		start++;
     36 	}
     37 
     38 	return NULL;
     39 }
     40 
     41 static struct submit_worker *get_submit_worker(struct workqueue *wq)
     42 {
     43 	unsigned int next = wq->next_free_worker;
     44 	struct submit_worker *sw, *best = NULL;
     45 
     46 	assert(next < wq->max_workers);
     47 
     48 	sw = __get_submit_worker(wq, next, wq->max_workers - 1, &best);
     49 	if (!sw && next)
     50 		sw = __get_submit_worker(wq, 0, next - 1, &best);
     51 
     52 	/*
     53 	 * No truly idle found, use best match
     54 	 */
     55 	if (!sw)
     56 		sw = best;
     57 
     58 	if (sw->index == wq->next_free_worker) {
     59 		if (sw->index + 1 < wq->max_workers)
     60 			wq->next_free_worker = sw->index + 1;
     61 		else
     62 			wq->next_free_worker = 0;
     63 	}
     64 
     65 	return sw;
     66 }
     67 
     68 static bool all_sw_idle(struct workqueue *wq)
     69 {
     70 	int i;
     71 
     72 	for (i = 0; i < wq->max_workers; i++) {
     73 		struct submit_worker *sw = &wq->workers[i];
     74 
     75 		if (!(sw->flags & SW_F_IDLE))
     76 			return false;
     77 	}
     78 
     79 	return true;
     80 }
     81 
     82 /*
     83  * Must be serialized wrt workqueue_enqueue() by caller
     84  */
     85 void workqueue_flush(struct workqueue *wq)
     86 {
     87 	wq->wake_idle = 1;
     88 
     89 	while (!all_sw_idle(wq)) {
     90 		pthread_mutex_lock(&wq->flush_lock);
     91 		pthread_cond_wait(&wq->flush_cond, &wq->flush_lock);
     92 		pthread_mutex_unlock(&wq->flush_lock);
     93 	}
     94 
     95 	wq->wake_idle = 0;
     96 }
     97 
     98 /*
     99  * Must be serialized by caller. Returns true for queued, false for busy.
    100  */
    101 void workqueue_enqueue(struct workqueue *wq, struct workqueue_work *work)
    102 {
    103 	struct submit_worker *sw;
    104 
    105 	sw = get_submit_worker(wq);
    106 	assert(sw);
    107 
    108 	pthread_mutex_lock(&sw->lock);
    109 	flist_add_tail(&work->list, &sw->work_list);
    110 	sw->seq = ++wq->work_seq;
    111 	sw->flags &= ~SW_F_IDLE;
    112 	pthread_mutex_unlock(&sw->lock);
    113 
    114 	pthread_cond_signal(&sw->cond);
    115 }
    116 
    117 static void handle_list(struct submit_worker *sw, struct flist_head *list)
    118 {
    119 	struct workqueue *wq = sw->wq;
    120 	struct workqueue_work *work;
    121 
    122 	while (!flist_empty(list)) {
    123 		work = flist_first_entry(list, struct workqueue_work, list);
    124 		flist_del_init(&work->list);
    125 		wq->ops.fn(sw, work);
    126 	}
    127 }
    128 
    129 static void *worker_thread(void *data)
    130 {
    131 	struct submit_worker *sw = data;
    132 	struct workqueue *wq = sw->wq;
    133 	unsigned int ret = 0;
    134 	FLIST_HEAD(local_list);
    135 
    136 	sk_out_assign(sw->sk_out);
    137 
    138 	if (wq->ops.nice) {
    139 		if (nice(wq->ops.nice) < 0) {
    140 			log_err("workqueue: nice %s\n", strerror(errno));
    141 			ret = 1;
    142 		}
    143 	}
    144 
    145 	if (!ret)
    146 		ret = workqueue_init_worker(sw);
    147 
    148 	pthread_mutex_lock(&sw->lock);
    149 	sw->flags |= SW_F_RUNNING;
    150 	if (ret)
    151 		sw->flags |= SW_F_ERROR;
    152 	pthread_mutex_unlock(&sw->lock);
    153 
    154 	pthread_mutex_lock(&wq->flush_lock);
    155 	pthread_cond_signal(&wq->flush_cond);
    156 	pthread_mutex_unlock(&wq->flush_lock);
    157 
    158 	if (sw->flags & SW_F_ERROR)
    159 		goto done;
    160 
    161 	while (1) {
    162 		pthread_mutex_lock(&sw->lock);
    163 
    164 		if (flist_empty(&sw->work_list)) {
    165 			if (sw->flags & SW_F_EXIT) {
    166 				pthread_mutex_unlock(&sw->lock);
    167 				break;
    168 			}
    169 
    170 			if (workqueue_pre_sleep_check(sw)) {
    171 				pthread_mutex_unlock(&sw->lock);
    172 				workqueue_pre_sleep(sw);
    173 				pthread_mutex_lock(&sw->lock);
    174 			}
    175 
    176 			/*
    177 			 * We dropped and reaquired the lock, check
    178 			 * state again.
    179 			 */
    180 			if (!flist_empty(&sw->work_list))
    181 				goto handle_work;
    182 
    183 			if (sw->flags & SW_F_EXIT) {
    184 				pthread_mutex_unlock(&sw->lock);
    185 				break;
    186 			} else if (!(sw->flags & SW_F_IDLE)) {
    187 				sw->flags |= SW_F_IDLE;
    188 				wq->next_free_worker = sw->index;
    189 				if (wq->wake_idle)
    190 					pthread_cond_signal(&wq->flush_cond);
    191 			}
    192 			if (wq->ops.update_acct_fn)
    193 				wq->ops.update_acct_fn(sw);
    194 
    195 			pthread_cond_wait(&sw->cond, &sw->lock);
    196 		} else {
    197 handle_work:
    198 			flist_splice_init(&sw->work_list, &local_list);
    199 		}
    200 		pthread_mutex_unlock(&sw->lock);
    201 		handle_list(sw, &local_list);
    202 	}
    203 
    204 	if (wq->ops.update_acct_fn)
    205 		wq->ops.update_acct_fn(sw);
    206 
    207 done:
    208 	sk_out_drop();
    209 	return NULL;
    210 }
    211 
    212 static void free_worker(struct submit_worker *sw, unsigned int *sum_cnt)
    213 {
    214 	struct workqueue *wq = sw->wq;
    215 
    216 	workqueue_exit_worker(sw, sum_cnt);
    217 
    218 	pthread_cond_destroy(&sw->cond);
    219 	pthread_mutex_destroy(&sw->lock);
    220 
    221 	if (wq->ops.free_worker_fn)
    222 		wq->ops.free_worker_fn(sw);
    223 }
    224 
    225 static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt)
    226 {
    227 	pthread_join(sw->thread, NULL);
    228 	free_worker(sw, sum_cnt);
    229 }
    230 
    231 void workqueue_exit(struct workqueue *wq)
    232 {
    233 	unsigned int shutdown, sum_cnt = 0;
    234 	struct submit_worker *sw;
    235 	int i;
    236 
    237 	if (!wq->workers)
    238 		return;
    239 
    240 	for (i = 0; i < wq->max_workers; i++) {
    241 		sw = &wq->workers[i];
    242 
    243 		pthread_mutex_lock(&sw->lock);
    244 		sw->flags |= SW_F_EXIT;
    245 		pthread_cond_signal(&sw->cond);
    246 		pthread_mutex_unlock(&sw->lock);
    247 	}
    248 
    249 	do {
    250 		shutdown = 0;
    251 		for (i = 0; i < wq->max_workers; i++) {
    252 			sw = &wq->workers[i];
    253 			if (sw->flags & SW_F_ACCOUNTED)
    254 				continue;
    255 			pthread_mutex_lock(&sw->lock);
    256 			sw->flags |= SW_F_ACCOUNTED;
    257 			pthread_mutex_unlock(&sw->lock);
    258 			shutdown_worker(sw, &sum_cnt);
    259 			shutdown++;
    260 		}
    261 	} while (shutdown && shutdown != wq->max_workers);
    262 
    263 	sfree(wq->workers);
    264 	wq->workers = NULL;
    265 	pthread_mutex_destroy(&wq->flush_lock);
    266 	pthread_cond_destroy(&wq->flush_cond);
    267 	pthread_mutex_destroy(&wq->stat_lock);
    268 }
    269 
    270 static int start_worker(struct workqueue *wq, unsigned int index,
    271 			struct sk_out *sk_out)
    272 {
    273 	struct submit_worker *sw = &wq->workers[index];
    274 	int ret;
    275 
    276 	INIT_FLIST_HEAD(&sw->work_list);
    277 
    278 	ret = mutex_cond_init_pshared(&sw->lock, &sw->cond);
    279 	if (ret)
    280 		return ret;
    281 
    282 	sw->wq = wq;
    283 	sw->index = index;
    284 	sw->sk_out = sk_out;
    285 
    286 	if (wq->ops.alloc_worker_fn) {
    287 		ret = wq->ops.alloc_worker_fn(sw);
    288 		if (ret)
    289 			return ret;
    290 	}
    291 
    292 	ret = pthread_create(&sw->thread, NULL, worker_thread, sw);
    293 	if (!ret) {
    294 		pthread_mutex_lock(&sw->lock);
    295 		sw->flags = SW_F_IDLE;
    296 		pthread_mutex_unlock(&sw->lock);
    297 		return 0;
    298 	}
    299 
    300 	free_worker(sw, NULL);
    301 	return 1;
    302 }
    303 
    304 int workqueue_init(struct thread_data *td, struct workqueue *wq,
    305 		   struct workqueue_ops *ops, unsigned int max_workers,
    306 		   struct sk_out *sk_out)
    307 {
    308 	unsigned int running;
    309 	int i, error;
    310 	int ret;
    311 
    312 	wq->max_workers = max_workers;
    313 	wq->td = td;
    314 	wq->ops = *ops;
    315 	wq->work_seq = 0;
    316 	wq->next_free_worker = 0;
    317 
    318 	ret = mutex_cond_init_pshared(&wq->flush_lock, &wq->flush_cond);
    319 	if (ret)
    320 		goto err;
    321 	ret = mutex_init_pshared(&wq->stat_lock);
    322 	if (ret)
    323 		goto err;
    324 
    325 	wq->workers = smalloc(wq->max_workers * sizeof(struct submit_worker));
    326 	if (!wq->workers)
    327 		goto err;
    328 
    329 	for (i = 0; i < wq->max_workers; i++)
    330 		if (start_worker(wq, i, sk_out))
    331 			break;
    332 
    333 	wq->max_workers = i;
    334 	if (!wq->max_workers)
    335 		goto err;
    336 
    337 	/*
    338 	 * Wait for them all to be started and initialized
    339 	 */
    340 	error = 0;
    341 	do {
    342 		struct submit_worker *sw;
    343 
    344 		running = 0;
    345 		pthread_mutex_lock(&wq->flush_lock);
    346 		for (i = 0; i < wq->max_workers; i++) {
    347 			sw = &wq->workers[i];
    348 			pthread_mutex_lock(&sw->lock);
    349 			if (sw->flags & SW_F_RUNNING)
    350 				running++;
    351 			if (sw->flags & SW_F_ERROR)
    352 				error++;
    353 			pthread_mutex_unlock(&sw->lock);
    354 		}
    355 
    356 		if (error || running == wq->max_workers) {
    357 			pthread_mutex_unlock(&wq->flush_lock);
    358 			break;
    359 		}
    360 
    361 		pthread_cond_wait(&wq->flush_cond, &wq->flush_lock);
    362 		pthread_mutex_unlock(&wq->flush_lock);
    363 	} while (1);
    364 
    365 	if (!error)
    366 		return 0;
    367 
    368 err:
    369 	log_err("Can't create rate workqueue\n");
    370 	td_verror(td, ESRCH, "workqueue_init");
    371 	workqueue_exit(wq);
    372 	return 1;
    373 }
    374