Home | History | Annotate | Download | only in engines
      1 /*
      2  * libaio engine
      3  *
      4  * IO engine using the Linux native aio interface.
      5  *
      6  */
      7 #include <stdio.h>
      8 #include <stdlib.h>
      9 #include <unistd.h>
     10 #include <errno.h>
     11 #include <assert.h>
     12 #include <libaio.h>
     13 
     14 #include "../fio.h"
     15 
     16 struct libaio_data {
     17 	io_context_t aio_ctx;
     18 	struct io_event *aio_events;
     19 	struct iocb **iocbs;
     20 	struct io_u **io_us;
     21 	int iocbs_nr;
     22 };
     23 
     24 struct libaio_options {
     25 	struct thread_data *td;
     26 	unsigned int userspace_reap;
     27 };
     28 
     29 static struct fio_option options[] = {
     30 	{
     31 		.name	= "userspace_reap",
     32 		.lname	= "Libaio userspace reaping",
     33 		.type	= FIO_OPT_STR_SET,
     34 		.off1	= offsetof(struct libaio_options, userspace_reap),
     35 		.help	= "Use alternative user-space reap implementation",
     36 		.category = FIO_OPT_C_ENGINE,
     37 		.group	= FIO_OPT_G_LIBAIO,
     38 	},
     39 	{
     40 		.name	= NULL,
     41 	},
     42 };
     43 
     44 static int fio_libaio_prep(struct thread_data fio_unused *td, struct io_u *io_u)
     45 {
     46 	struct fio_file *f = io_u->file;
     47 
     48 	if (io_u->ddir == DDIR_READ)
     49 		io_prep_pread(&io_u->iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
     50 	else if (io_u->ddir == DDIR_WRITE)
     51 		io_prep_pwrite(&io_u->iocb, f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
     52 	else if (ddir_sync(io_u->ddir))
     53 		io_prep_fsync(&io_u->iocb, f->fd);
     54 
     55 	return 0;
     56 }
     57 
     58 static struct io_u *fio_libaio_event(struct thread_data *td, int event)
     59 {
     60 	struct libaio_data *ld = td->io_ops->data;
     61 	struct io_event *ev;
     62 	struct io_u *io_u;
     63 
     64 	ev = ld->aio_events + event;
     65 	io_u = container_of(ev->obj, struct io_u, iocb);
     66 
     67 	if (ev->res != io_u->xfer_buflen) {
     68 		if (ev->res > io_u->xfer_buflen)
     69 			io_u->error = -ev->res;
     70 		else
     71 			io_u->resid = io_u->xfer_buflen - ev->res;
     72 	} else
     73 		io_u->error = 0;
     74 
     75 	return io_u;
     76 }
     77 
     78 struct aio_ring {
     79 	unsigned id;		 /** kernel internal index number */
     80 	unsigned nr;		 /** number of io_events */
     81 	unsigned head;
     82 	unsigned tail;
     83 
     84 	unsigned magic;
     85 	unsigned compat_features;
     86 	unsigned incompat_features;
     87 	unsigned header_length;	/** size of aio_ring */
     88 
     89 	struct io_event events[0];
     90 };
     91 
     92 #define AIO_RING_MAGIC	0xa10a10a1
     93 
     94 static int user_io_getevents(io_context_t aio_ctx, unsigned int max,
     95 			     struct io_event *events)
     96 {
     97 	long i = 0;
     98 	unsigned head;
     99 	struct aio_ring *ring = (struct aio_ring*) aio_ctx;
    100 
    101 	while (i < max) {
    102 		head = ring->head;
    103 
    104 		if (head == ring->tail) {
    105 			/* There are no more completions */
    106 			break;
    107 		} else {
    108 			/* There is another completion to reap */
    109 			events[i] = ring->events[head];
    110 			read_barrier();
    111 			ring->head = (head + 1) % ring->nr;
    112 			i++;
    113 		}
    114 	}
    115 
    116 	return i;
    117 }
    118 
    119 static int fio_libaio_getevents(struct thread_data *td, unsigned int min,
    120 				unsigned int max, struct timespec *t)
    121 {
    122 	struct libaio_data *ld = td->io_ops->data;
    123 	struct libaio_options *o = td->eo;
    124 	unsigned actual_min = td->o.iodepth_batch_complete == 0 ? 0 : min;
    125 	int r, events = 0;
    126 
    127 	do {
    128 		if (o->userspace_reap == 1
    129 		    && actual_min == 0
    130 		    && ((struct aio_ring *)(ld->aio_ctx))->magic
    131 				== AIO_RING_MAGIC) {
    132 			r = user_io_getevents(ld->aio_ctx, max,
    133 				ld->aio_events + events);
    134 		} else {
    135 			r = io_getevents(ld->aio_ctx, actual_min,
    136 				max, ld->aio_events + events, t);
    137 		}
    138 		if (r >= 0)
    139 			events += r;
    140 		else if (r == -EAGAIN)
    141 			usleep(100);
    142 	} while (events < min);
    143 
    144 	return r < 0 ? r : events;
    145 }
    146 
    147 static int fio_libaio_queue(struct thread_data *td, struct io_u *io_u)
    148 {
    149 	struct libaio_data *ld = td->io_ops->data;
    150 
    151 	fio_ro_check(td, io_u);
    152 
    153 	if (ld->iocbs_nr == (int) td->o.iodepth)
    154 		return FIO_Q_BUSY;
    155 
    156 	/*
    157 	 * fsync is tricky, since it can fail and we need to do it
    158 	 * serialized with other io. the reason is that linux doesn't
    159 	 * support aio fsync yet. So return busy for the case where we
    160 	 * have pending io, to let fio complete those first.
    161 	 */
    162 	if (ddir_sync(io_u->ddir)) {
    163 		if (ld->iocbs_nr)
    164 			return FIO_Q_BUSY;
    165 
    166 		do_io_u_sync(td, io_u);
    167 		return FIO_Q_COMPLETED;
    168 	}
    169 
    170 	if (io_u->ddir == DDIR_TRIM) {
    171 		if (ld->iocbs_nr)
    172 			return FIO_Q_BUSY;
    173 
    174 		do_io_u_trim(td, io_u);
    175 		return FIO_Q_COMPLETED;
    176 	}
    177 
    178 	ld->iocbs[ld->iocbs_nr] = &io_u->iocb;
    179 	ld->io_us[ld->iocbs_nr] = io_u;
    180 	ld->iocbs_nr++;
    181 	return FIO_Q_QUEUED;
    182 }
    183 
    184 static void fio_libaio_queued(struct thread_data *td, struct io_u **io_us,
    185 			      unsigned int nr)
    186 {
    187 	struct timeval now;
    188 	unsigned int i;
    189 
    190 	if (!fio_fill_issue_time(td))
    191 		return;
    192 
    193 	fio_gettime(&now, NULL);
    194 
    195 	for (i = 0; i < nr; i++) {
    196 		struct io_u *io_u = io_us[i];
    197 
    198 		memcpy(&io_u->issue_time, &now, sizeof(now));
    199 		io_u_queued(td, io_u);
    200 	}
    201 }
    202 
    203 static int fio_libaio_commit(struct thread_data *td)
    204 {
    205 	struct libaio_data *ld = td->io_ops->data;
    206 	struct iocb **iocbs;
    207 	struct io_u **io_us;
    208 	int ret;
    209 
    210 	if (!ld->iocbs_nr)
    211 		return 0;
    212 
    213 	io_us = ld->io_us;
    214 	iocbs = ld->iocbs;
    215 	do {
    216 		ret = io_submit(ld->aio_ctx, ld->iocbs_nr, iocbs);
    217 		if (ret > 0) {
    218 			fio_libaio_queued(td, io_us, ret);
    219 			io_u_mark_submit(td, ret);
    220 			ld->iocbs_nr -= ret;
    221 			io_us += ret;
    222 			iocbs += ret;
    223 			ret = 0;
    224 		} else if (!ret || ret == -EAGAIN || ret == -EINTR) {
    225 			if (!ret)
    226 				io_u_mark_submit(td, ret);
    227 			continue;
    228 		} else
    229 			break;
    230 	} while (ld->iocbs_nr);
    231 
    232 	return ret;
    233 }
    234 
    235 static int fio_libaio_cancel(struct thread_data *td, struct io_u *io_u)
    236 {
    237 	struct libaio_data *ld = td->io_ops->data;
    238 
    239 	return io_cancel(ld->aio_ctx, &io_u->iocb, ld->aio_events);
    240 }
    241 
    242 static void fio_libaio_cleanup(struct thread_data *td)
    243 {
    244 	struct libaio_data *ld = td->io_ops->data;
    245 
    246 	if (ld) {
    247 		io_destroy(ld->aio_ctx);
    248 		free(ld->aio_events);
    249 		free(ld->iocbs);
    250 		free(ld->io_us);
    251 		free(ld);
    252 	}
    253 }
    254 
    255 static int fio_libaio_init(struct thread_data *td)
    256 {
    257 	struct libaio_data *ld = malloc(sizeof(*ld));
    258 	struct libaio_options *o = td->eo;
    259 	int err = 0;
    260 
    261 	memset(ld, 0, sizeof(*ld));
    262 
    263 	/*
    264 	 * First try passing in 0 for queue depth, since we don't
    265 	 * care about the user ring. If that fails, the kernel is too old
    266 	 * and we need the right depth.
    267 	 */
    268 	if (!o->userspace_reap)
    269 		err = io_queue_init(INT_MAX, &ld->aio_ctx);
    270 	if (o->userspace_reap || err == -EINVAL)
    271 		err = io_queue_init(td->o.iodepth, &ld->aio_ctx);
    272 	if (err) {
    273 		td_verror(td, -err, "io_queue_init");
    274 		log_err("fio: check /proc/sys/fs/aio-max-nr\n");
    275 		free(ld);
    276 		return 1;
    277 	}
    278 
    279 	ld->aio_events = malloc(td->o.iodepth * sizeof(struct io_event));
    280 	memset(ld->aio_events, 0, td->o.iodepth * sizeof(struct io_event));
    281 	ld->iocbs = malloc(td->o.iodepth * sizeof(struct iocb *));
    282 	memset(ld->iocbs, 0, sizeof(struct iocb *));
    283 	ld->io_us = malloc(td->o.iodepth * sizeof(struct io_u *));
    284 	memset(ld->io_us, 0, td->o.iodepth * sizeof(struct io_u *));
    285 	ld->iocbs_nr = 0;
    286 
    287 	td->io_ops->data = ld;
    288 	return 0;
    289 }
    290 
    291 static struct ioengine_ops ioengine = {
    292 	.name			= "libaio",
    293 	.version		= FIO_IOOPS_VERSION,
    294 	.init			= fio_libaio_init,
    295 	.prep			= fio_libaio_prep,
    296 	.queue			= fio_libaio_queue,
    297 	.commit			= fio_libaio_commit,
    298 	.cancel			= fio_libaio_cancel,
    299 	.getevents		= fio_libaio_getevents,
    300 	.event			= fio_libaio_event,
    301 	.cleanup		= fio_libaio_cleanup,
    302 	.open_file		= generic_open_file,
    303 	.close_file		= generic_close_file,
    304 	.get_file_size		= generic_get_file_size,
    305 	.options		= options,
    306 	.option_struct_size	= sizeof(struct libaio_options),
    307 };
    308 
    309 static void fio_init fio_libaio_register(void)
    310 {
    311 	register_ioengine(&ioengine);
    312 }
    313 
    314 static void fio_exit fio_libaio_unregister(void)
    315 {
    316 	unregister_ioengine(&ioengine);
    317 }
    318