Home | History | Annotate | Download | only in fio
      1 /*
      2  * Rated submission helpers
      3  *
      4  * Copyright (C) 2015 Jens Axboe <axboe (at) kernel.dk>
      5  *
      6  */
      7 #include "fio.h"
      8 #include "ioengines.h"
      9 #include "lib/getrusage.h"
     10 #include "rate-submit.h"
     11 
     12 static int io_workqueue_fn(struct submit_worker *sw,
     13 			   struct workqueue_work *work)
     14 {
     15 	struct io_u *io_u = container_of(work, struct io_u, work);
     16 	const enum fio_ddir ddir = io_u->ddir;
     17 	struct thread_data *td = sw->priv;
     18 	int ret;
     19 
     20 	dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid());
     21 
     22 	io_u_set(td, io_u, IO_U_F_NO_FILE_PUT);
     23 
     24 	td->cur_depth++;
     25 
     26 	do {
     27 		ret = td_io_queue(td, io_u);
     28 		if (ret != FIO_Q_BUSY)
     29 			break;
     30 		ret = io_u_queued_complete(td, 1);
     31 		if (ret > 0)
     32 			td->cur_depth -= ret;
     33 		io_u_clear(td, io_u, IO_U_F_FLIGHT);
     34 	} while (1);
     35 
     36 	dprint(FD_RATE, "io_u %p ret %d by %u\n", io_u, ret, gettid());
     37 
     38 	io_queue_event(td, io_u, &ret, ddir, NULL, 0, NULL);
     39 
     40 	if (ret == FIO_Q_COMPLETED)
     41 		td->cur_depth--;
     42 	else if (ret == FIO_Q_QUEUED) {
     43 		unsigned int min_evts;
     44 
     45 		if (td->o.iodepth == 1)
     46 			min_evts = 1;
     47 		else
     48 			min_evts = 0;
     49 
     50 		ret = io_u_queued_complete(td, min_evts);
     51 		if (ret > 0)
     52 			td->cur_depth -= ret;
     53 	} else if (ret == FIO_Q_BUSY) {
     54 		ret = io_u_queued_complete(td, td->cur_depth);
     55 		if (ret > 0)
     56 			td->cur_depth -= ret;
     57 	}
     58 
     59 	return 0;
     60 }
     61 
     62 static bool io_workqueue_pre_sleep_flush_fn(struct submit_worker *sw)
     63 {
     64 	struct thread_data *td = sw->priv;
     65 
     66 	if (td->io_u_queued || td->cur_depth || td->io_u_in_flight)
     67 		return true;
     68 
     69 	return false;
     70 }
     71 
     72 static void io_workqueue_pre_sleep_fn(struct submit_worker *sw)
     73 {
     74 	struct thread_data *td = sw->priv;
     75 	int ret;
     76 
     77 	ret = io_u_quiesce(td);
     78 	if (ret > 0)
     79 		td->cur_depth -= ret;
     80 }
     81 
     82 static int io_workqueue_alloc_fn(struct submit_worker *sw)
     83 {
     84 	struct thread_data *td;
     85 
     86 	td = calloc(1, sizeof(*td));
     87 	sw->priv = td;
     88 	return 0;
     89 }
     90 
     91 static void io_workqueue_free_fn(struct submit_worker *sw)
     92 {
     93 	free(sw->priv);
     94 	sw->priv = NULL;
     95 }
     96 
     97 static int io_workqueue_init_worker_fn(struct submit_worker *sw)
     98 {
     99 	struct thread_data *parent = sw->wq->td;
    100 	struct thread_data *td = sw->priv;
    101 
    102 	memcpy(&td->o, &parent->o, sizeof(td->o));
    103 	memcpy(&td->ts, &parent->ts, sizeof(td->ts));
    104 	td->o.uid = td->o.gid = -1U;
    105 	dup_files(td, parent);
    106 	td->eo = parent->eo;
    107 	fio_options_mem_dupe(td);
    108 
    109 	if (ioengine_load(td))
    110 		goto err;
    111 
    112 	td->pid = gettid();
    113 
    114 	INIT_FLIST_HEAD(&td->io_log_list);
    115 	INIT_FLIST_HEAD(&td->io_hist_list);
    116 	INIT_FLIST_HEAD(&td->verify_list);
    117 	INIT_FLIST_HEAD(&td->trim_list);
    118 	INIT_FLIST_HEAD(&td->next_rand_list);
    119 	td->io_hist_tree = RB_ROOT;
    120 
    121 	td->o.iodepth = 1;
    122 	if (td_io_init(td))
    123 		goto err_io_init;
    124 
    125 	set_epoch_time(td, td->o.log_unix_epoch);
    126 	fio_getrusage(&td->ru_start);
    127 	clear_io_state(td, 1);
    128 
    129 	td_set_runstate(td, TD_RUNNING);
    130 	td->flags |= TD_F_CHILD;
    131 	td->parent = parent;
    132 	return 0;
    133 
    134 err_io_init:
    135 	close_ioengine(td);
    136 err:
    137 	return 1;
    138 
    139 }
    140 
    141 static void io_workqueue_exit_worker_fn(struct submit_worker *sw,
    142 					unsigned int *sum_cnt)
    143 {
    144 	struct thread_data *td = sw->priv;
    145 
    146 	(*sum_cnt)++;
    147 	sum_thread_stats(&sw->wq->td->ts, &td->ts, *sum_cnt == 1);
    148 
    149 	fio_options_free(td);
    150 	close_and_free_files(td);
    151 	if (td->io_ops)
    152 		close_ioengine(td);
    153 	td_set_runstate(td, TD_EXITED);
    154 }
    155 
    156 #ifdef CONFIG_SFAA
    157 static void sum_val(uint64_t *dst, uint64_t *src)
    158 {
    159 	if (*src) {
    160 		__sync_fetch_and_add(dst, *src);
    161 		*src = 0;
    162 	}
    163 }
    164 #else
    165 static void sum_val(uint64_t *dst, uint64_t *src)
    166 {
    167 	if (*src) {
    168 		*dst += *src;
    169 		*src = 0;
    170 	}
    171 }
    172 #endif
    173 
    174 static void pthread_double_unlock(pthread_mutex_t *lock1,
    175 				  pthread_mutex_t *lock2)
    176 {
    177 #ifndef CONFIG_SFAA
    178 	pthread_mutex_unlock(lock1);
    179 	pthread_mutex_unlock(lock2);
    180 #endif
    181 }
    182 
    183 static void pthread_double_lock(pthread_mutex_t *lock1, pthread_mutex_t *lock2)
    184 {
    185 #ifndef CONFIG_SFAA
    186 	if (lock1 < lock2) {
    187 		pthread_mutex_lock(lock1);
    188 		pthread_mutex_lock(lock2);
    189 	} else {
    190 		pthread_mutex_lock(lock2);
    191 		pthread_mutex_lock(lock1);
    192 	}
    193 #endif
    194 }
    195 
    196 static void sum_ddir(struct thread_data *dst, struct thread_data *src,
    197 		     enum fio_ddir ddir)
    198 {
    199 	pthread_double_lock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
    200 
    201 	sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]);
    202 	sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]);
    203 	sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]);
    204 	sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]);
    205 	sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]);
    206 
    207 	pthread_double_unlock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
    208 }
    209 
    210 static void io_workqueue_update_acct_fn(struct submit_worker *sw)
    211 {
    212 	struct thread_data *src = sw->priv;
    213 	struct thread_data *dst = sw->wq->td;
    214 
    215 	if (td_read(src))
    216 		sum_ddir(dst, src, DDIR_READ);
    217 	if (td_write(src))
    218 		sum_ddir(dst, src, DDIR_WRITE);
    219 	if (td_trim(src))
    220 		sum_ddir(dst, src, DDIR_TRIM);
    221 
    222 }
    223 
    224 static struct workqueue_ops rated_wq_ops = {
    225 	.fn			= io_workqueue_fn,
    226 	.pre_sleep_flush_fn	= io_workqueue_pre_sleep_flush_fn,
    227 	.pre_sleep_fn		= io_workqueue_pre_sleep_fn,
    228 	.update_acct_fn		= io_workqueue_update_acct_fn,
    229 	.alloc_worker_fn	= io_workqueue_alloc_fn,
    230 	.free_worker_fn		= io_workqueue_free_fn,
    231 	.init_worker_fn		= io_workqueue_init_worker_fn,
    232 	.exit_worker_fn		= io_workqueue_exit_worker_fn,
    233 };
    234 
    235 int rate_submit_init(struct thread_data *td, struct sk_out *sk_out)
    236 {
    237 	if (td->o.io_submit_mode != IO_MODE_OFFLOAD)
    238 		return 0;
    239 
    240 	return workqueue_init(td, &td->io_wq, &rated_wq_ops, td->o.iodepth, sk_out);
    241 }
    242 
    243 void rate_submit_exit(struct thread_data *td)
    244 {
    245 	if (td->o.io_submit_mode != IO_MODE_OFFLOAD)
    246 		return;
    247 
    248 	workqueue_exit(&td->io_wq);
    249 }
    250