Home | History | Annotate | Download | only in engines
      1 /*
      2  * posixaio engine
      3  *
      4  * IO engine that uses the posix defined aio interface.
      5  *
      6  */
      7 #include <stdio.h>
      8 #include <stdlib.h>
      9 #include <unistd.h>
     10 #include <errno.h>
     11 #include <fcntl.h>
     12 
     13 #include "../fio.h"
     14 
     15 struct posixaio_data {
     16 	struct io_u **aio_events;
     17 	unsigned int queued;
     18 };
     19 
     20 static int fill_timespec(struct timespec *ts)
     21 {
     22 #ifdef CONFIG_CLOCK_GETTIME
     23 #ifdef CONFIG_CLOCK_MONOTONIC
     24 	clockid_t clk = CLOCK_MONOTONIC;
     25 #else
     26 	clockid_t clk = CLOCK_REALTIME;
     27 #endif
     28 	if (!clock_gettime(clk, ts))
     29 		return 0;
     30 
     31 	perror("clock_gettime");
     32 	return 1;
     33 #else
     34 	struct timeval tv;
     35 
     36 	gettimeofday(&tv, NULL);
     37 	ts->tv_sec = tv.tv_sec;
     38 	ts->tv_nsec = tv.tv_usec * 1000;
     39 	return 0;
     40 #endif
     41 }
     42 
     43 static unsigned long long ts_utime_since_now(struct timespec *t)
     44 {
     45 	long long sec, nsec;
     46 	struct timespec now;
     47 
     48 	if (fill_timespec(&now))
     49 		return 0;
     50 
     51 	sec = now.tv_sec - t->tv_sec;
     52 	nsec = now.tv_nsec - t->tv_nsec;
     53 	if (sec > 0 && nsec < 0) {
     54 		sec--;
     55 		nsec += 1000000000;
     56 	}
     57 
     58 	sec *= 1000000;
     59 	nsec /= 1000;
     60 	return sec + nsec;
     61 }
     62 
     63 static int fio_posixaio_cancel(struct thread_data fio_unused *td,
     64 			       struct io_u *io_u)
     65 {
     66 	struct fio_file *f = io_u->file;
     67 	int r = aio_cancel(f->fd, &io_u->aiocb);
     68 
     69 	if (r == AIO_ALLDONE || r == AIO_CANCELED)
     70 		return 0;
     71 
     72 	return 1;
     73 }
     74 
     75 static int fio_posixaio_prep(struct thread_data fio_unused *td,
     76 			     struct io_u *io_u)
     77 {
     78 	os_aiocb_t *aiocb = &io_u->aiocb;
     79 	struct fio_file *f = io_u->file;
     80 
     81 	aiocb->aio_fildes = f->fd;
     82 	aiocb->aio_buf = io_u->xfer_buf;
     83 	aiocb->aio_nbytes = io_u->xfer_buflen;
     84 	aiocb->aio_offset = io_u->offset;
     85 	aiocb->aio_sigevent.sigev_notify = SIGEV_NONE;
     86 
     87 	io_u->seen = 0;
     88 	return 0;
     89 }
     90 
     91 #define SUSPEND_ENTRIES	8
     92 
     93 static int fio_posixaio_getevents(struct thread_data *td, unsigned int min,
     94 				  unsigned int max, const struct timespec *t)
     95 {
     96 	struct posixaio_data *pd = td->io_ops_data;
     97 	os_aiocb_t *suspend_list[SUSPEND_ENTRIES];
     98 	struct timespec start;
     99 	int have_timeout = 0;
    100 	int suspend_entries;
    101 	struct io_u *io_u;
    102 	unsigned int r;
    103 	int i;
    104 
    105 	if (t && !fill_timespec(&start))
    106 		have_timeout = 1;
    107 	else
    108 		memset(&start, 0, sizeof(start));
    109 
    110 	r = 0;
    111 restart:
    112 	memset(suspend_list, 0, sizeof(suspend_list));
    113 	suspend_entries = 0;
    114 	io_u_qiter(&td->io_u_all, io_u, i) {
    115 		int err;
    116 
    117 		if (io_u->seen || !(io_u->flags & IO_U_F_FLIGHT))
    118 			continue;
    119 
    120 		err = aio_error(&io_u->aiocb);
    121 		if (err == EINPROGRESS) {
    122 			if (suspend_entries < SUSPEND_ENTRIES) {
    123 				suspend_list[suspend_entries] = &io_u->aiocb;
    124 				suspend_entries++;
    125 			}
    126 			continue;
    127 		}
    128 
    129 		io_u->seen = 1;
    130 		pd->queued--;
    131 		pd->aio_events[r++] = io_u;
    132 
    133 		if (err == ECANCELED)
    134 			io_u->resid = io_u->xfer_buflen;
    135 		else if (!err) {
    136 			ssize_t retval = aio_return(&io_u->aiocb);
    137 
    138 			io_u->resid = io_u->xfer_buflen - retval;
    139 		} else
    140 			io_u->error = err;
    141 	}
    142 
    143 	if (r >= min)
    144 		return r;
    145 
    146 	if (have_timeout) {
    147 		unsigned long long usec;
    148 
    149 		usec = (t->tv_sec * 1000000) + (t->tv_nsec / 1000);
    150 		if (ts_utime_since_now(&start) > usec)
    151 			return r;
    152 	}
    153 
    154 	/*
    155 	 * must have some in-flight, wait for at least one
    156 	 */
    157 	aio_suspend((const os_aiocb_t * const *)suspend_list,
    158 							suspend_entries, t);
    159 	goto restart;
    160 }
    161 
    162 static struct io_u *fio_posixaio_event(struct thread_data *td, int event)
    163 {
    164 	struct posixaio_data *pd = td->io_ops_data;
    165 
    166 	return pd->aio_events[event];
    167 }
    168 
    169 static int fio_posixaio_queue(struct thread_data *td,
    170 			      struct io_u *io_u)
    171 {
    172 	struct posixaio_data *pd = td->io_ops_data;
    173 	os_aiocb_t *aiocb = &io_u->aiocb;
    174 	int ret;
    175 
    176 	fio_ro_check(td, io_u);
    177 
    178 	if (io_u->ddir == DDIR_READ)
    179 		ret = aio_read(aiocb);
    180 	else if (io_u->ddir == DDIR_WRITE)
    181 		ret = aio_write(aiocb);
    182 	else if (io_u->ddir == DDIR_TRIM) {
    183 		if (pd->queued)
    184 			return FIO_Q_BUSY;
    185 
    186 		do_io_u_trim(td, io_u);
    187 		return FIO_Q_COMPLETED;
    188 	} else {
    189 #ifdef CONFIG_POSIXAIO_FSYNC
    190 		ret = aio_fsync(O_SYNC, aiocb);
    191 #else
    192 		if (pd->queued)
    193 			return FIO_Q_BUSY;
    194 
    195 		do_io_u_sync(td, io_u);
    196 		return FIO_Q_COMPLETED;
    197 #endif
    198 	}
    199 
    200 	if (ret) {
    201 		int aio_err = errno;
    202 
    203 		/*
    204 		 * At least OSX has a very low limit on the number of pending
    205 		 * IOs, so if it returns EAGAIN, we are out of resources
    206 		 * to queue more. Just return FIO_Q_BUSY to naturally
    207 		 * drop off at this depth.
    208 		 */
    209 		if (aio_err == EAGAIN)
    210 			return FIO_Q_BUSY;
    211 
    212 		io_u->error = aio_err;
    213 		td_verror(td, io_u->error, "xfer");
    214 		return FIO_Q_COMPLETED;
    215 	}
    216 
    217 	pd->queued++;
    218 	return FIO_Q_QUEUED;
    219 }
    220 
    221 static void fio_posixaio_cleanup(struct thread_data *td)
    222 {
    223 	struct posixaio_data *pd = td->io_ops_data;
    224 
    225 	if (pd) {
    226 		free(pd->aio_events);
    227 		free(pd);
    228 	}
    229 }
    230 
    231 static int fio_posixaio_init(struct thread_data *td)
    232 {
    233 	struct posixaio_data *pd = malloc(sizeof(*pd));
    234 
    235 	memset(pd, 0, sizeof(*pd));
    236 	pd->aio_events = malloc(td->o.iodepth * sizeof(struct io_u *));
    237 	memset(pd->aio_events, 0, td->o.iodepth * sizeof(struct io_u *));
    238 
    239 	td->io_ops_data = pd;
    240 	return 0;
    241 }
    242 
    243 static struct ioengine_ops ioengine = {
    244 	.name		= "posixaio",
    245 	.version	= FIO_IOOPS_VERSION,
    246 	.init		= fio_posixaio_init,
    247 	.prep		= fio_posixaio_prep,
    248 	.queue		= fio_posixaio_queue,
    249 	.cancel		= fio_posixaio_cancel,
    250 	.getevents	= fio_posixaio_getevents,
    251 	.event		= fio_posixaio_event,
    252 	.cleanup	= fio_posixaio_cleanup,
    253 	.open_file	= generic_open_file,
    254 	.close_file	= generic_close_file,
    255 	.get_file_size	= generic_get_file_size,
    256 };
    257 
    258 static void fio_init fio_posixaio_register(void)
    259 {
    260 	register_ioengine(&ioengine);
    261 }
    262 
    263 static void fio_exit fio_posixaio_unregister(void)
    264 {
    265 	unregister_ioengine(&ioengine);
    266 }
    267