Home | History | Annotate | Download | only in engines
      1 /*
      2  * sync/psync engine
      3  *
      4  * IO engine that does regular read(2)/write(2) with lseek(2) to transfer
      5  * data and IO engine that does regular pread(2)/pwrite(2) to transfer data.
      6  *
      7  */
      8 #include <stdio.h>
      9 #include <stdlib.h>
     10 #include <unistd.h>
     11 #include <sys/uio.h>
     12 #include <errno.h>
     13 #include <assert.h>
     14 
     15 #include "../fio.h"
     16 
     17 /*
     18  * Sync engine uses engine_data to store last offset
     19  */
     20 #define LAST_POS(f)	((f)->engine_data)
     21 
     22 struct syncio_data {
     23 	struct iovec *iovecs;
     24 	struct io_u **io_us;
     25 	unsigned int queued;
     26 	unsigned int events;
     27 	unsigned long queued_bytes;
     28 
     29 	unsigned long long last_offset;
     30 	struct fio_file *last_file;
     31 	enum fio_ddir last_ddir;
     32 };
     33 
     34 static int fio_syncio_prep(struct thread_data *td, struct io_u *io_u)
     35 {
     36 	struct fio_file *f = io_u->file;
     37 
     38 	if (!ddir_rw(io_u->ddir))
     39 		return 0;
     40 
     41 	if (LAST_POS(f) != -1ULL && LAST_POS(f) == io_u->offset)
     42 		return 0;
     43 
     44 	if (lseek(f->fd, io_u->offset, SEEK_SET) == -1) {
     45 		td_verror(td, errno, "lseek");
     46 		return 1;
     47 	}
     48 
     49 	return 0;
     50 }
     51 
     52 static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret)
     53 {
     54 	if (io_u->file && ret >= 0 && ddir_rw(io_u->ddir))
     55 		LAST_POS(io_u->file) = io_u->offset + ret;
     56 
     57 	if (ret != (int) io_u->xfer_buflen) {
     58 		if (ret >= 0) {
     59 			io_u->resid = io_u->xfer_buflen - ret;
     60 			io_u->error = 0;
     61 			return FIO_Q_COMPLETED;
     62 		} else
     63 			io_u->error = errno;
     64 	}
     65 
     66 	if (io_u->error)
     67 		td_verror(td, io_u->error, "xfer");
     68 
     69 	return FIO_Q_COMPLETED;
     70 }
     71 
     72 #ifdef CONFIG_PWRITEV
     73 static int fio_pvsyncio_queue(struct thread_data *td, struct io_u *io_u)
     74 {
     75 	struct syncio_data *sd = td->io_ops->data;
     76 	struct iovec *iov = &sd->iovecs[0];
     77 	struct fio_file *f = io_u->file;
     78 	int ret;
     79 
     80 	fio_ro_check(td, io_u);
     81 
     82 	iov->iov_base = io_u->xfer_buf;
     83 	iov->iov_len = io_u->xfer_buflen;
     84 
     85 	if (io_u->ddir == DDIR_READ)
     86 		ret = preadv(f->fd, iov, 1, io_u->offset);
     87 	else if (io_u->ddir == DDIR_WRITE)
     88 		ret = pwritev(f->fd, iov, 1, io_u->offset);
     89 	else if (io_u->ddir == DDIR_TRIM) {
     90 		do_io_u_trim(td, io_u);
     91 		return FIO_Q_COMPLETED;
     92 	} else
     93 		ret = do_io_u_sync(td, io_u);
     94 
     95 	return fio_io_end(td, io_u, ret);
     96 }
     97 #endif
     98 
     99 static int fio_psyncio_queue(struct thread_data *td, struct io_u *io_u)
    100 {
    101 	struct fio_file *f = io_u->file;
    102 	int ret;
    103 
    104 	fio_ro_check(td, io_u);
    105 
    106 	if (io_u->ddir == DDIR_READ)
    107 		ret = pread(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
    108 	else if (io_u->ddir == DDIR_WRITE)
    109 		ret = pwrite(f->fd, io_u->xfer_buf, io_u->xfer_buflen, io_u->offset);
    110 	else if (io_u->ddir == DDIR_TRIM) {
    111 		do_io_u_trim(td, io_u);
    112 		return FIO_Q_COMPLETED;
    113 	} else
    114 		ret = do_io_u_sync(td, io_u);
    115 
    116 	return fio_io_end(td, io_u, ret);
    117 }
    118 
    119 static int fio_syncio_queue(struct thread_data *td, struct io_u *io_u)
    120 {
    121 	struct fio_file *f = io_u->file;
    122 	int ret;
    123 
    124 	fio_ro_check(td, io_u);
    125 
    126 	if (io_u->ddir == DDIR_READ)
    127 		ret = read(f->fd, io_u->xfer_buf, io_u->xfer_buflen);
    128 	else if (io_u->ddir == DDIR_WRITE)
    129 		ret = write(f->fd, io_u->xfer_buf, io_u->xfer_buflen);
    130 	else if (io_u->ddir == DDIR_TRIM) {
    131 		do_io_u_trim(td, io_u);
    132 		return FIO_Q_COMPLETED;
    133 	} else
    134 		ret = do_io_u_sync(td, io_u);
    135 
    136 	return fio_io_end(td, io_u, ret);
    137 }
    138 
    139 static int fio_vsyncio_getevents(struct thread_data *td, unsigned int min,
    140 				 unsigned int max,
    141 				 struct timespec fio_unused *t)
    142 {
    143 	struct syncio_data *sd = td->io_ops->data;
    144 	int ret;
    145 
    146 	if (min) {
    147 		ret = sd->events;
    148 		sd->events = 0;
    149 	} else
    150 		ret = 0;
    151 
    152 	dprint(FD_IO, "vsyncio_getevents: min=%d,max=%d: %d\n", min, max, ret);
    153 	return ret;
    154 }
    155 
    156 static struct io_u *fio_vsyncio_event(struct thread_data *td, int event)
    157 {
    158 	struct syncio_data *sd = td->io_ops->data;
    159 
    160 	return sd->io_us[event];
    161 }
    162 
    163 static int fio_vsyncio_append(struct thread_data *td, struct io_u *io_u)
    164 {
    165 	struct syncio_data *sd = td->io_ops->data;
    166 
    167 	if (ddir_sync(io_u->ddir))
    168 		return 0;
    169 
    170 	if (io_u->offset == sd->last_offset && io_u->file == sd->last_file &&
    171 	    io_u->ddir == sd->last_ddir)
    172 		return 1;
    173 
    174 	return 0;
    175 }
    176 
    177 static void fio_vsyncio_set_iov(struct syncio_data *sd, struct io_u *io_u,
    178 				int idx)
    179 {
    180 	sd->io_us[idx] = io_u;
    181 	sd->iovecs[idx].iov_base = io_u->xfer_buf;
    182 	sd->iovecs[idx].iov_len = io_u->xfer_buflen;
    183 	sd->last_offset = io_u->offset + io_u->xfer_buflen;
    184 	sd->last_file = io_u->file;
    185 	sd->last_ddir = io_u->ddir;
    186 	sd->queued_bytes += io_u->xfer_buflen;
    187 	sd->queued++;
    188 }
    189 
    190 static int fio_vsyncio_queue(struct thread_data *td, struct io_u *io_u)
    191 {
    192 	struct syncio_data *sd = td->io_ops->data;
    193 
    194 	fio_ro_check(td, io_u);
    195 
    196 	if (!fio_vsyncio_append(td, io_u)) {
    197 		dprint(FD_IO, "vsyncio_queue: no append (%d)\n", sd->queued);
    198 		/*
    199 		 * If we can't append and have stuff queued, tell fio to
    200 		 * commit those first and then retry this io
    201 		 */
    202 		if (sd->queued)
    203 			return FIO_Q_BUSY;
    204 		if (ddir_sync(io_u->ddir)) {
    205 			int ret = do_io_u_sync(td, io_u);
    206 
    207 			return fio_io_end(td, io_u, ret);
    208 		}
    209 
    210 		sd->queued = 0;
    211 		sd->queued_bytes = 0;
    212 		fio_vsyncio_set_iov(sd, io_u, 0);
    213 	} else {
    214 		if (sd->queued == td->o.iodepth) {
    215 			dprint(FD_IO, "vsyncio_queue: max depth %d\n", sd->queued);
    216 			return FIO_Q_BUSY;
    217 		}
    218 
    219 		dprint(FD_IO, "vsyncio_queue: append\n");
    220 		fio_vsyncio_set_iov(sd, io_u, sd->queued);
    221 	}
    222 
    223 	dprint(FD_IO, "vsyncio_queue: depth now %d\n", sd->queued);
    224 	return FIO_Q_QUEUED;
    225 }
    226 
    227 /*
    228  * Check that we transferred all bytes, or saw an error, etc
    229  */
    230 static int fio_vsyncio_end(struct thread_data *td, ssize_t bytes)
    231 {
    232 	struct syncio_data *sd = td->io_ops->data;
    233 	struct io_u *io_u;
    234 	unsigned int i;
    235 	int err;
    236 
    237 	/*
    238 	 * transferred everything, perfect
    239 	 */
    240 	if (bytes == sd->queued_bytes)
    241 		return 0;
    242 
    243 	err = errno;
    244 	for (i = 0; i < sd->queued; i++) {
    245 		io_u = sd->io_us[i];
    246 
    247 		if (bytes == -1) {
    248 			io_u->error = err;
    249 		} else {
    250 			unsigned int this_io;
    251 
    252 			this_io = bytes;
    253 			if (this_io > io_u->xfer_buflen)
    254 				this_io = io_u->xfer_buflen;
    255 
    256 			io_u->resid = io_u->xfer_buflen - this_io;
    257 			io_u->error = 0;
    258 			bytes -= this_io;
    259 		}
    260 	}
    261 
    262 	if (bytes == -1) {
    263 		td_verror(td, err, "xfer vsync");
    264 		return -err;
    265 	}
    266 
    267 	return 0;
    268 }
    269 
    270 static int fio_vsyncio_commit(struct thread_data *td)
    271 {
    272 	struct syncio_data *sd = td->io_ops->data;
    273 	struct fio_file *f;
    274 	ssize_t ret;
    275 
    276 	if (!sd->queued)
    277 		return 0;
    278 
    279 	io_u_mark_submit(td, sd->queued);
    280 	f = sd->last_file;
    281 
    282 	if (lseek(f->fd, sd->io_us[0]->offset, SEEK_SET) == -1) {
    283 		int err = -errno;
    284 
    285 		td_verror(td, errno, "lseek");
    286 		return err;
    287 	}
    288 
    289 	if (sd->last_ddir == DDIR_READ)
    290 		ret = readv(f->fd, sd->iovecs, sd->queued);
    291 	else
    292 		ret = writev(f->fd, sd->iovecs, sd->queued);
    293 
    294 	dprint(FD_IO, "vsyncio_commit: %d\n", (int) ret);
    295 	sd->events = sd->queued;
    296 	sd->queued = 0;
    297 	return fio_vsyncio_end(td, ret);
    298 }
    299 
    300 static int fio_vsyncio_init(struct thread_data *td)
    301 {
    302 	struct syncio_data *sd;
    303 
    304 	sd = malloc(sizeof(*sd));
    305 	memset(sd, 0, sizeof(*sd));
    306 	sd->last_offset = -1ULL;
    307 	sd->iovecs = malloc(td->o.iodepth * sizeof(struct iovec));
    308 	sd->io_us = malloc(td->o.iodepth * sizeof(struct io_u *));
    309 
    310 	td->io_ops->data = sd;
    311 	return 0;
    312 }
    313 
    314 static void fio_vsyncio_cleanup(struct thread_data *td)
    315 {
    316 	struct syncio_data *sd = td->io_ops->data;
    317 
    318 	free(sd->iovecs);
    319 	free(sd->io_us);
    320 	free(sd);
    321 }
    322 
    323 static struct ioengine_ops ioengine_rw = {
    324 	.name		= "sync",
    325 	.version	= FIO_IOOPS_VERSION,
    326 	.prep		= fio_syncio_prep,
    327 	.queue		= fio_syncio_queue,
    328 	.open_file	= generic_open_file,
    329 	.close_file	= generic_close_file,
    330 	.get_file_size	= generic_get_file_size,
    331 	.flags		= FIO_SYNCIO,
    332 };
    333 
    334 static struct ioengine_ops ioengine_prw = {
    335 	.name		= "psync",
    336 	.version	= FIO_IOOPS_VERSION,
    337 	.queue		= fio_psyncio_queue,
    338 	.open_file	= generic_open_file,
    339 	.close_file	= generic_close_file,
    340 	.get_file_size	= generic_get_file_size,
    341 	.flags		= FIO_SYNCIO,
    342 };
    343 
    344 static struct ioengine_ops ioengine_vrw = {
    345 	.name		= "vsync",
    346 	.version	= FIO_IOOPS_VERSION,
    347 	.init		= fio_vsyncio_init,
    348 	.cleanup	= fio_vsyncio_cleanup,
    349 	.queue		= fio_vsyncio_queue,
    350 	.commit		= fio_vsyncio_commit,
    351 	.event		= fio_vsyncio_event,
    352 	.getevents	= fio_vsyncio_getevents,
    353 	.open_file	= generic_open_file,
    354 	.close_file	= generic_close_file,
    355 	.get_file_size	= generic_get_file_size,
    356 	.flags		= FIO_SYNCIO,
    357 };
    358 
    359 #ifdef CONFIG_PWRITEV
    360 static struct ioengine_ops ioengine_pvrw = {
    361 	.name		= "pvsync",
    362 	.version	= FIO_IOOPS_VERSION,
    363 	.init		= fio_vsyncio_init,
    364 	.cleanup	= fio_vsyncio_cleanup,
    365 	.queue		= fio_pvsyncio_queue,
    366 	.open_file	= generic_open_file,
    367 	.close_file	= generic_close_file,
    368 	.get_file_size	= generic_get_file_size,
    369 	.flags		= FIO_SYNCIO,
    370 };
    371 #endif
    372 
    373 static void fio_init fio_syncio_register(void)
    374 {
    375 	register_ioengine(&ioengine_rw);
    376 	register_ioengine(&ioengine_prw);
    377 	register_ioengine(&ioengine_vrw);
    378 #ifdef CONFIG_PWRITEV
    379 	register_ioengine(&ioengine_pvrw);
    380 #endif
    381 }
    382 
    383 static void fio_exit fio_syncio_unregister(void)
    384 {
    385 	unregister_ioengine(&ioengine_rw);
    386 	unregister_ioengine(&ioengine_prw);
    387 	unregister_ioengine(&ioengine_vrw);
    388 #ifdef CONFIG_PWRITEV
    389 	unregister_ioengine(&ioengine_pvrw);
    390 #endif
    391 }
    392