Home | History | Annotate | Download | only in engines
      1 /*
      2  * net engine
      3  *
      4  * IO engine that reads/writes to/from sockets.
      5  *
      6  */
      7 #include <stdio.h>
      8 #include <stdlib.h>
      9 #include <unistd.h>
     10 #include <signal.h>
     11 #include <errno.h>
     12 #include <assert.h>
     13 #include <netinet/in.h>
     14 #include <netinet/tcp.h>
     15 #include <arpa/inet.h>
     16 #include <netdb.h>
     17 #include <sys/poll.h>
     18 #include <sys/types.h>
     19 #include <sys/stat.h>
     20 #include <sys/socket.h>
     21 #include <sys/un.h>
     22 
     23 #include "../fio.h"
     24 #include "../verify.h"
     25 #include "../optgroup.h"
     26 
     27 struct netio_data {
     28 	int listenfd;
     29 	int use_splice;
     30 	int seq_off;
     31 	int pipes[2];
     32 	struct sockaddr_in addr;
     33 	struct sockaddr_in6 addr6;
     34 	struct sockaddr_un addr_un;
     35 	uint64_t udp_send_seq;
     36 	uint64_t udp_recv_seq;
     37 };
     38 
     39 struct netio_options {
     40 	struct thread_data *td;
     41 	unsigned int port;
     42 	unsigned int proto;
     43 	unsigned int listen;
     44 	unsigned int pingpong;
     45 	unsigned int nodelay;
     46 	unsigned int ttl;
     47 	unsigned int window_size;
     48 	unsigned int mss;
     49 	char *intfc;
     50 };
     51 
     52 struct udp_close_msg {
     53 	uint32_t magic;
     54 	uint32_t cmd;
     55 };
     56 
     57 struct udp_seq {
     58 	uint64_t magic;
     59 	uint64_t seq;
     60 	uint64_t bs;
     61 };
     62 
     63 enum {
     64 	FIO_LINK_CLOSE = 0x89,
     65 	FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b,
     66 	FIO_LINK_OPEN = 0x98,
     67 	FIO_UDP_SEQ_MAGIC = 0x657375716e556563ULL,
     68 
     69 	FIO_TYPE_TCP	= 1,
     70 	FIO_TYPE_UDP	= 2,
     71 	FIO_TYPE_UNIX	= 3,
     72 	FIO_TYPE_TCP_V6	= 4,
     73 	FIO_TYPE_UDP_V6	= 5,
     74 };
     75 
     76 static int str_hostname_cb(void *data, const char *input);
     77 static struct fio_option options[] = {
     78 	{
     79 		.name	= "hostname",
     80 		.lname	= "net engine hostname",
     81 		.type	= FIO_OPT_STR_STORE,
     82 		.cb	= str_hostname_cb,
     83 		.help	= "Hostname for net IO engine",
     84 		.category = FIO_OPT_C_ENGINE,
     85 		.group	= FIO_OPT_G_NETIO,
     86 	},
     87 	{
     88 		.name	= "port",
     89 		.lname	= "net engine port",
     90 		.type	= FIO_OPT_INT,
     91 		.off1	= offsetof(struct netio_options, port),
     92 		.minval	= 1,
     93 		.maxval	= 65535,
     94 		.help	= "Port to use for TCP or UDP net connections",
     95 		.category = FIO_OPT_C_ENGINE,
     96 		.group	= FIO_OPT_G_NETIO,
     97 	},
     98 	{
     99 		.name	= "protocol",
    100 		.lname	= "net engine protocol",
    101 		.alias	= "proto",
    102 		.type	= FIO_OPT_STR,
    103 		.off1	= offsetof(struct netio_options, proto),
    104 		.help	= "Network protocol to use",
    105 		.def	= "tcp",
    106 		.posval = {
    107 			  { .ival = "tcp",
    108 			    .oval = FIO_TYPE_TCP,
    109 			    .help = "Transmission Control Protocol",
    110 			  },
    111 #ifdef CONFIG_IPV6
    112 			  { .ival = "tcpv6",
    113 			    .oval = FIO_TYPE_TCP_V6,
    114 			    .help = "Transmission Control Protocol V6",
    115 			  },
    116 #endif
    117 			  { .ival = "udp",
    118 			    .oval = FIO_TYPE_UDP,
    119 			    .help = "User Datagram Protocol",
    120 			  },
    121 #ifdef CONFIG_IPV6
    122 			  { .ival = "udpv6",
    123 			    .oval = FIO_TYPE_UDP_V6,
    124 			    .help = "User Datagram Protocol V6",
    125 			  },
    126 #endif
    127 			  { .ival = "unix",
    128 			    .oval = FIO_TYPE_UNIX,
    129 			    .help = "UNIX domain socket",
    130 			  },
    131 		},
    132 		.category = FIO_OPT_C_ENGINE,
    133 		.group	= FIO_OPT_G_NETIO,
    134 	},
    135 #ifdef CONFIG_TCP_NODELAY
    136 	{
    137 		.name	= "nodelay",
    138 		.lname	= "No Delay",
    139 		.type	= FIO_OPT_BOOL,
    140 		.off1	= offsetof(struct netio_options, nodelay),
    141 		.help	= "Use TCP_NODELAY on TCP connections",
    142 		.category = FIO_OPT_C_ENGINE,
    143 		.group	= FIO_OPT_G_NETIO,
    144 	},
    145 #endif
    146 	{
    147 		.name	= "listen",
    148 		.lname	= "net engine listen",
    149 		.type	= FIO_OPT_STR_SET,
    150 		.off1	= offsetof(struct netio_options, listen),
    151 		.help	= "Listen for incoming TCP connections",
    152 		.category = FIO_OPT_C_ENGINE,
    153 		.group	= FIO_OPT_G_NETIO,
    154 	},
    155 	{
    156 		.name	= "pingpong",
    157 		.lname	= "Ping Pong",
    158 		.type	= FIO_OPT_STR_SET,
    159 		.off1	= offsetof(struct netio_options, pingpong),
    160 		.help	= "Ping-pong IO requests",
    161 		.category = FIO_OPT_C_ENGINE,
    162 		.group	= FIO_OPT_G_NETIO,
    163 	},
    164 	{
    165 		.name	= "interface",
    166 		.lname	= "net engine interface",
    167 		.type	= FIO_OPT_STR_STORE,
    168 		.off1	= offsetof(struct netio_options, intfc),
    169 		.help	= "Network interface to use",
    170 		.category = FIO_OPT_C_ENGINE,
    171 		.group	= FIO_OPT_G_NETIO,
    172 	},
    173 	{
    174 		.name	= "ttl",
    175 		.lname	= "net engine multicast ttl",
    176 		.type	= FIO_OPT_INT,
    177 		.off1	= offsetof(struct netio_options, ttl),
    178 		.def    = "1",
    179 		.minval	= 0,
    180 		.help	= "Time-to-live value for outgoing UDP multicast packets",
    181 		.category = FIO_OPT_C_ENGINE,
    182 		.group	= FIO_OPT_G_NETIO,
    183 	},
    184 #ifdef CONFIG_NET_WINDOWSIZE
    185 	{
    186 		.name	= "window_size",
    187 		.lname	= "Window Size",
    188 		.type	= FIO_OPT_INT,
    189 		.off1	= offsetof(struct netio_options, window_size),
    190 		.minval	= 0,
    191 		.help	= "Set socket buffer window size",
    192 		.category = FIO_OPT_C_ENGINE,
    193 		.group	= FIO_OPT_G_NETIO,
    194 	},
    195 #endif
    196 #ifdef CONFIG_NET_MSS
    197 	{
    198 		.name	= "mss",
    199 		.lname	= "Maximum segment size",
    200 		.type	= FIO_OPT_INT,
    201 		.off1	= offsetof(struct netio_options, mss),
    202 		.minval	= 0,
    203 		.help	= "Set TCP maximum segment size",
    204 		.category = FIO_OPT_C_ENGINE,
    205 		.group	= FIO_OPT_G_NETIO,
    206 	},
    207 #endif
    208 	{
    209 		.name	= NULL,
    210 	},
    211 };
    212 
    213 static inline int is_udp(struct netio_options *o)
    214 {
    215 	return o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_UDP_V6;
    216 }
    217 
    218 static inline int is_tcp(struct netio_options *o)
    219 {
    220 	return o->proto == FIO_TYPE_TCP || o->proto == FIO_TYPE_TCP_V6;
    221 }
    222 
    223 static inline int is_ipv6(struct netio_options *o)
    224 {
    225 	return o->proto == FIO_TYPE_UDP_V6 || o->proto == FIO_TYPE_TCP_V6;
    226 }
    227 
    228 static int set_window_size(struct thread_data *td, int fd)
    229 {
    230 #ifdef CONFIG_NET_WINDOWSIZE
    231 	struct netio_options *o = td->eo;
    232 	unsigned int wss;
    233 	int snd, rcv, ret;
    234 
    235 	if (!o->window_size)
    236 		return 0;
    237 
    238 	rcv = o->listen || o->pingpong;
    239 	snd = !o->listen || o->pingpong;
    240 	wss = o->window_size;
    241 	ret = 0;
    242 
    243 	if (rcv) {
    244 		ret = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *) &wss,
    245 					sizeof(wss));
    246 		if (ret < 0)
    247 			td_verror(td, errno, "rcvbuf window size");
    248 	}
    249 	if (snd && !ret) {
    250 		ret = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *) &wss,
    251 					sizeof(wss));
    252 		if (ret < 0)
    253 			td_verror(td, errno, "sndbuf window size");
    254 	}
    255 
    256 	return ret;
    257 #else
    258 	td_verror(td, -EINVAL, "setsockopt window size");
    259 	return -1;
    260 #endif
    261 }
    262 
    263 static int set_mss(struct thread_data *td, int fd)
    264 {
    265 #ifdef CONFIG_NET_MSS
    266 	struct netio_options *o = td->eo;
    267 	unsigned int mss;
    268 	int ret;
    269 
    270 	if (!o->mss || !is_tcp(o))
    271 		return 0;
    272 
    273 	mss = o->mss;
    274 	ret = setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, (void *) &mss,
    275 				sizeof(mss));
    276 	if (ret < 0)
    277 		td_verror(td, errno, "setsockopt TCP_MAXSEG");
    278 
    279 	return ret;
    280 #else
    281 	td_verror(td, -EINVAL, "setsockopt TCP_MAXSEG");
    282 	return -1;
    283 #endif
    284 }
    285 
    286 
    287 /*
    288  * Return -1 for error and 'nr events' for a positive number
    289  * of events
    290  */
    291 static int poll_wait(struct thread_data *td, int fd, short events)
    292 {
    293 	struct pollfd pfd;
    294 	int ret;
    295 
    296 	while (!td->terminate) {
    297 		pfd.fd = fd;
    298 		pfd.events = events;
    299 		ret = poll(&pfd, 1, -1);
    300 		if (ret < 0) {
    301 			if (errno == EINTR)
    302 				break;
    303 
    304 			td_verror(td, errno, "poll");
    305 			return -1;
    306 		} else if (!ret)
    307 			continue;
    308 
    309 		break;
    310 	}
    311 
    312 	if (pfd.revents & events)
    313 		return 1;
    314 
    315 	return -1;
    316 }
    317 
    318 static int fio_netio_is_multicast(const char *mcaddr)
    319 {
    320 	in_addr_t addr = inet_network(mcaddr);
    321 	if (addr == -1)
    322 		return 0;
    323 
    324 	if (inet_network("224.0.0.0") <= addr &&
    325 	    inet_network("239.255.255.255") >= addr)
    326 		return 1;
    327 
    328 	return 0;
    329 }
    330 
    331 
    332 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
    333 {
    334 	struct netio_options *o = td->eo;
    335 
    336 	/*
    337 	 * Make sure we don't see spurious reads to a receiver, and vice versa
    338 	 */
    339 	if (is_tcp(o))
    340 		return 0;
    341 
    342 	if ((o->listen && io_u->ddir == DDIR_WRITE) ||
    343 	    (!o->listen && io_u->ddir == DDIR_READ)) {
    344 		td_verror(td, EINVAL, "bad direction");
    345 		return 1;
    346 	}
    347 
    348 	return 0;
    349 }
    350 
    351 #ifdef CONFIG_LINUX_SPLICE
    352 static int splice_io_u(int fdin, int fdout, unsigned int len)
    353 {
    354 	int bytes = 0;
    355 
    356 	while (len) {
    357 		int ret = splice(fdin, NULL, fdout, NULL, len, 0);
    358 
    359 		if (ret < 0) {
    360 			if (!bytes)
    361 				bytes = ret;
    362 
    363 			break;
    364 		} else if (!ret)
    365 			break;
    366 
    367 		bytes += ret;
    368 		len -= ret;
    369 	}
    370 
    371 	return bytes;
    372 }
    373 
    374 /*
    375  * Receive bytes from a socket and fill them into the internal pipe
    376  */
    377 static int splice_in(struct thread_data *td, struct io_u *io_u)
    378 {
    379 	struct netio_data *nd = td->io_ops_data;
    380 
    381 	return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
    382 }
    383 
    384 /*
    385  * Transmit 'len' bytes from the internal pipe
    386  */
    387 static int splice_out(struct thread_data *td, struct io_u *io_u,
    388 		      unsigned int len)
    389 {
    390 	struct netio_data *nd = td->io_ops_data;
    391 
    392 	return splice_io_u(nd->pipes[0], io_u->file->fd, len);
    393 }
    394 
    395 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
    396 {
    397 	struct iovec iov = {
    398 		.iov_base = io_u->xfer_buf,
    399 		.iov_len = len,
    400 	};
    401 	int bytes = 0;
    402 
    403 	while (iov.iov_len) {
    404 		int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
    405 
    406 		if (ret < 0) {
    407 			if (!bytes)
    408 				bytes = ret;
    409 			break;
    410 		} else if (!ret)
    411 			break;
    412 
    413 		iov.iov_len -= ret;
    414 		iov.iov_base += ret;
    415 		bytes += ret;
    416 	}
    417 
    418 	return bytes;
    419 
    420 }
    421 
    422 /*
    423  * vmsplice() pipe to io_u buffer
    424  */
    425 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
    426 			     unsigned int len)
    427 {
    428 	struct netio_data *nd = td->io_ops_data;
    429 
    430 	return vmsplice_io_u(io_u, nd->pipes[0], len);
    431 }
    432 
    433 /*
    434  * vmsplice() io_u to pipe
    435  */
    436 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
    437 {
    438 	struct netio_data *nd = td->io_ops_data;
    439 
    440 	return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
    441 }
    442 
    443 /*
    444  * splice receive - transfer socket data into a pipe using splice, then map
    445  * that pipe data into the io_u using vmsplice.
    446  */
    447 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
    448 {
    449 	int ret;
    450 
    451 	ret = splice_in(td, io_u);
    452 	if (ret > 0)
    453 		return vmsplice_io_u_out(td, io_u, ret);
    454 
    455 	return ret;
    456 }
    457 
    458 /*
    459  * splice transmit - map data from the io_u into a pipe by using vmsplice,
    460  * then transfer that pipe to a socket using splice.
    461  */
    462 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
    463 {
    464 	int ret;
    465 
    466 	ret = vmsplice_io_u_in(td, io_u);
    467 	if (ret > 0)
    468 		return splice_out(td, io_u, ret);
    469 
    470 	return ret;
    471 }
    472 #else
    473 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
    474 {
    475 	errno = EOPNOTSUPP;
    476 	return -1;
    477 }
    478 
    479 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
    480 {
    481 	errno = EOPNOTSUPP;
    482 	return -1;
    483 }
    484 #endif
    485 
    486 static void store_udp_seq(struct netio_data *nd, struct io_u *io_u)
    487 {
    488 	struct udp_seq *us;
    489 
    490 	if (io_u->xfer_buflen < sizeof(*us))
    491 		return;
    492 
    493 	us = io_u->xfer_buf + io_u->xfer_buflen - sizeof(*us);
    494 	us->magic = cpu_to_le64((uint64_t) FIO_UDP_SEQ_MAGIC);
    495 	us->bs = cpu_to_le64((uint64_t) io_u->xfer_buflen);
    496 	us->seq = cpu_to_le64(nd->udp_send_seq++);
    497 }
    498 
    499 static void verify_udp_seq(struct thread_data *td, struct netio_data *nd,
    500 			   struct io_u *io_u)
    501 {
    502 	struct udp_seq *us;
    503 	uint64_t seq;
    504 
    505 	if (io_u->xfer_buflen < sizeof(*us))
    506 		return;
    507 
    508 	if (nd->seq_off)
    509 		return;
    510 
    511 	us = io_u->xfer_buf + io_u->xfer_buflen - sizeof(*us);
    512 	if (le64_to_cpu(us->magic) != FIO_UDP_SEQ_MAGIC)
    513 		return;
    514 	if (le64_to_cpu(us->bs) != io_u->xfer_buflen) {
    515 		nd->seq_off = 1;
    516 		return;
    517 	}
    518 
    519 	seq = le64_to_cpu(us->seq);
    520 
    521 	if (seq != nd->udp_recv_seq)
    522 		td->ts.drop_io_u[io_u->ddir] += seq - nd->udp_recv_seq;
    523 
    524 	nd->udp_recv_seq = seq + 1;
    525 }
    526 
    527 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
    528 {
    529 	struct netio_data *nd = td->io_ops_data;
    530 	struct netio_options *o = td->eo;
    531 	int ret, flags = 0;
    532 
    533 	do {
    534 		if (is_udp(o)) {
    535 			const struct sockaddr *to;
    536 			socklen_t len;
    537 
    538 			if (is_ipv6(o)) {
    539 				to = (struct sockaddr *) &nd->addr6;
    540 				len = sizeof(nd->addr6);
    541 			} else {
    542 				to = (struct sockaddr *) &nd->addr;
    543 				len = sizeof(nd->addr);
    544 			}
    545 
    546 			if (td->o.verify == VERIFY_NONE)
    547 				store_udp_seq(nd, io_u);
    548 
    549 			ret = sendto(io_u->file->fd, io_u->xfer_buf,
    550 					io_u->xfer_buflen, flags, to, len);
    551 		} else {
    552 			/*
    553 			 * if we are going to write more, set MSG_MORE
    554 			 */
    555 #ifdef MSG_MORE
    556 			if ((td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
    557 			    td->o.size) && !o->pingpong)
    558 				flags |= MSG_MORE;
    559 #endif
    560 			ret = send(io_u->file->fd, io_u->xfer_buf,
    561 					io_u->xfer_buflen, flags);
    562 		}
    563 		if (ret > 0)
    564 			break;
    565 
    566 		ret = poll_wait(td, io_u->file->fd, POLLOUT);
    567 		if (ret <= 0)
    568 			break;
    569 	} while (1);
    570 
    571 	return ret;
    572 }
    573 
    574 static int is_close_msg(struct io_u *io_u, int len)
    575 {
    576 	struct udp_close_msg *msg;
    577 
    578 	if (len != sizeof(struct udp_close_msg))
    579 		return 0;
    580 
    581 	msg = io_u->xfer_buf;
    582 	if (le32_to_cpu(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC)
    583 		return 0;
    584 	if (le32_to_cpu(msg->cmd) != FIO_LINK_CLOSE)
    585 		return 0;
    586 
    587 	return 1;
    588 }
    589 
    590 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
    591 {
    592 	struct netio_data *nd = td->io_ops_data;
    593 	struct netio_options *o = td->eo;
    594 	int ret, flags = 0;
    595 
    596 	do {
    597 		if (is_udp(o)) {
    598 			struct sockaddr *from;
    599 			socklen_t l, *len = &l;
    600 
    601 			if (o->listen) {
    602 				if (!is_ipv6(o)) {
    603 					from = (struct sockaddr *) &nd->addr;
    604 					*len = sizeof(nd->addr);
    605 				} else {
    606 					from = (struct sockaddr *) &nd->addr6;
    607 					*len = sizeof(nd->addr6);
    608 				}
    609 			} else {
    610 				from = NULL;
    611 				len = NULL;
    612 			}
    613 
    614 			ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
    615 					io_u->xfer_buflen, flags, from, len);
    616 
    617 			if (is_close_msg(io_u, ret)) {
    618 				td->done = 1;
    619 				return 0;
    620 			}
    621 		} else {
    622 			ret = recv(io_u->file->fd, io_u->xfer_buf,
    623 					io_u->xfer_buflen, flags);
    624 
    625 			if (is_close_msg(io_u, ret)) {
    626 				td->done = 1;
    627 				return 0;
    628 			}
    629 		}
    630 		if (ret > 0)
    631 			break;
    632 		else if (!ret && (flags & MSG_WAITALL))
    633 			break;
    634 
    635 		ret = poll_wait(td, io_u->file->fd, POLLIN);
    636 		if (ret <= 0)
    637 			break;
    638 		flags |= MSG_WAITALL;
    639 	} while (1);
    640 
    641 	if (is_udp(o) && td->o.verify == VERIFY_NONE)
    642 		verify_udp_seq(td, nd, io_u);
    643 
    644 	return ret;
    645 }
    646 
    647 static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u,
    648 			     enum fio_ddir ddir)
    649 {
    650 	struct netio_data *nd = td->io_ops_data;
    651 	struct netio_options *o = td->eo;
    652 	int ret;
    653 
    654 	if (ddir == DDIR_WRITE) {
    655 		if (!nd->use_splice || is_udp(o) ||
    656 		    o->proto == FIO_TYPE_UNIX)
    657 			ret = fio_netio_send(td, io_u);
    658 		else
    659 			ret = fio_netio_splice_out(td, io_u);
    660 	} else if (ddir == DDIR_READ) {
    661 		if (!nd->use_splice || is_udp(o) ||
    662 		    o->proto == FIO_TYPE_UNIX)
    663 			ret = fio_netio_recv(td, io_u);
    664 		else
    665 			ret = fio_netio_splice_in(td, io_u);
    666 	} else
    667 		ret = 0;	/* must be a SYNC */
    668 
    669 	if (ret != (int) io_u->xfer_buflen) {
    670 		if (ret > 0) {
    671 			io_u->resid = io_u->xfer_buflen - ret;
    672 			io_u->error = 0;
    673 			return FIO_Q_COMPLETED;
    674 		} else if (!ret)
    675 			return FIO_Q_BUSY;
    676 		else {
    677 			int err = errno;
    678 
    679 			if (ddir == DDIR_WRITE && err == EMSGSIZE)
    680 				return FIO_Q_BUSY;
    681 
    682 			io_u->error = err;
    683 		}
    684 	}
    685 
    686 	if (io_u->error)
    687 		td_verror(td, io_u->error, "xfer");
    688 
    689 	return FIO_Q_COMPLETED;
    690 }
    691 
    692 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
    693 {
    694 	struct netio_options *o = td->eo;
    695 	int ret;
    696 
    697 	fio_ro_check(td, io_u);
    698 
    699 	ret = __fio_netio_queue(td, io_u, io_u->ddir);
    700 	if (!o->pingpong || ret != FIO_Q_COMPLETED)
    701 		return ret;
    702 
    703 	/*
    704 	 * For ping-pong mode, receive or send reply as needed
    705 	 */
    706 	if (td_read(td) && io_u->ddir == DDIR_READ)
    707 		ret = __fio_netio_queue(td, io_u, DDIR_WRITE);
    708 	else if (td_write(td) && io_u->ddir == DDIR_WRITE)
    709 		ret = __fio_netio_queue(td, io_u, DDIR_READ);
    710 
    711 	return ret;
    712 }
    713 
    714 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
    715 {
    716 	struct netio_data *nd = td->io_ops_data;
    717 	struct netio_options *o = td->eo;
    718 	int type, domain;
    719 
    720 	if (o->proto == FIO_TYPE_TCP) {
    721 		domain = AF_INET;
    722 		type = SOCK_STREAM;
    723 	} else if (o->proto == FIO_TYPE_TCP_V6) {
    724 		domain = AF_INET6;
    725 		type = SOCK_STREAM;
    726 	} else if (o->proto == FIO_TYPE_UDP) {
    727 		domain = AF_INET;
    728 		type = SOCK_DGRAM;
    729 	} else if (o->proto == FIO_TYPE_UDP_V6) {
    730 		domain = AF_INET6;
    731 		type = SOCK_DGRAM;
    732 	} else if (o->proto == FIO_TYPE_UNIX) {
    733 		domain = AF_UNIX;
    734 		type = SOCK_STREAM;
    735 	} else {
    736 		log_err("fio: bad network type %d\n", o->proto);
    737 		f->fd = -1;
    738 		return 1;
    739 	}
    740 
    741 	f->fd = socket(domain, type, 0);
    742 	if (f->fd < 0) {
    743 		td_verror(td, errno, "socket");
    744 		return 1;
    745 	}
    746 
    747 #ifdef CONFIG_TCP_NODELAY
    748 	if (o->nodelay && is_tcp(o)) {
    749 		int optval = 1;
    750 
    751 		if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
    752 			log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
    753 			return 1;
    754 		}
    755 	}
    756 #endif
    757 
    758 	if (set_window_size(td, f->fd)) {
    759 		close(f->fd);
    760 		return 1;
    761 	}
    762 	if (set_mss(td, f->fd)) {
    763 		close(f->fd);
    764 		return 1;
    765 	}
    766 
    767 	if (is_udp(o)) {
    768 		if (!fio_netio_is_multicast(td->o.filename))
    769 			return 0;
    770 		if (is_ipv6(o)) {
    771 			log_err("fio: multicast not supported on IPv6\n");
    772 			close(f->fd);
    773 			return 1;
    774 		}
    775 
    776 		if (o->intfc) {
    777 			struct in_addr interface_addr;
    778 
    779 			if (inet_aton(o->intfc, &interface_addr) == 0) {
    780 				log_err("fio: interface not valid interface IP\n");
    781 				close(f->fd);
    782 				return 1;
    783 			}
    784 			if (setsockopt(f->fd, IPPROTO_IP, IP_MULTICAST_IF, (const char*)&interface_addr, sizeof(interface_addr)) < 0) {
    785 				td_verror(td, errno, "setsockopt IP_MULTICAST_IF");
    786 				close(f->fd);
    787 				return 1;
    788 			}
    789 		}
    790 		if (setsockopt(f->fd, IPPROTO_IP, IP_MULTICAST_TTL, (const char*)&o->ttl, sizeof(o->ttl)) < 0) {
    791 			td_verror(td, errno, "setsockopt IP_MULTICAST_TTL");
    792 			close(f->fd);
    793 			return 1;
    794 		}
    795 		return 0;
    796 	} else if (o->proto == FIO_TYPE_TCP) {
    797 		socklen_t len = sizeof(nd->addr);
    798 
    799 		if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
    800 			td_verror(td, errno, "connect");
    801 			close(f->fd);
    802 			return 1;
    803 		}
    804 	} else if (o->proto == FIO_TYPE_TCP_V6) {
    805 		socklen_t len = sizeof(nd->addr6);
    806 
    807 		if (connect(f->fd, (struct sockaddr *) &nd->addr6, len) < 0) {
    808 			td_verror(td, errno, "connect");
    809 			close(f->fd);
    810 			return 1;
    811 		}
    812 
    813 	} else {
    814 		struct sockaddr_un *addr = &nd->addr_un;
    815 		socklen_t len;
    816 
    817 		len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
    818 
    819 		if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
    820 			td_verror(td, errno, "connect");
    821 			close(f->fd);
    822 			return 1;
    823 		}
    824 	}
    825 
    826 	return 0;
    827 }
    828 
    829 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
    830 {
    831 	struct netio_data *nd = td->io_ops_data;
    832 	struct netio_options *o = td->eo;
    833 	socklen_t socklen;
    834 	int state;
    835 
    836 	if (is_udp(o)) {
    837 		f->fd = nd->listenfd;
    838 		return 0;
    839 	}
    840 
    841 	state = td->runstate;
    842 	td_set_runstate(td, TD_SETTING_UP);
    843 
    844 	log_info("fio: waiting for connection\n");
    845 
    846 	if (poll_wait(td, nd->listenfd, POLLIN) < 0)
    847 		goto err;
    848 
    849 	if (o->proto == FIO_TYPE_TCP) {
    850 		socklen = sizeof(nd->addr);
    851 		f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
    852 	} else {
    853 		socklen = sizeof(nd->addr6);
    854 		f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr6, &socklen);
    855 	}
    856 
    857 	if (f->fd < 0) {
    858 		td_verror(td, errno, "accept");
    859 		goto err;
    860 	}
    861 
    862 #ifdef CONFIG_TCP_NODELAY
    863 	if (o->nodelay && is_tcp(o)) {
    864 		int optval = 1;
    865 
    866 		if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) {
    867 			log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno));
    868 			return 1;
    869 		}
    870 	}
    871 #endif
    872 
    873 	reset_all_stats(td);
    874 	td_set_runstate(td, state);
    875 	return 0;
    876 err:
    877 	td_set_runstate(td, state);
    878 	return 1;
    879 }
    880 
    881 static void fio_netio_send_close(struct thread_data *td, struct fio_file *f)
    882 {
    883 	struct netio_data *nd = td->io_ops_data;
    884 	struct netio_options *o = td->eo;
    885 	struct udp_close_msg msg;
    886 	struct sockaddr *to;
    887 	socklen_t len;
    888 	int ret;
    889 
    890 	if (is_ipv6(o)) {
    891 		to = (struct sockaddr *) &nd->addr6;
    892 		len = sizeof(nd->addr6);
    893 	} else {
    894 		to = (struct sockaddr *) &nd->addr;
    895 		len = sizeof(nd->addr);
    896 	}
    897 
    898 	msg.magic = cpu_to_le32((uint32_t) FIO_LINK_OPEN_CLOSE_MAGIC);
    899 	msg.cmd = cpu_to_le32((uint32_t) FIO_LINK_CLOSE);
    900 
    901 	ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, len);
    902 	if (ret < 0)
    903 		td_verror(td, errno, "sendto udp link close");
    904 }
    905 
    906 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
    907 {
    908 	/*
    909 	 * Notify the receiver that we are closing down the link
    910 	 */
    911 	fio_netio_send_close(td, f);
    912 
    913 	return generic_close_file(td, f);
    914 }
    915 
    916 static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f)
    917 {
    918 	struct netio_data *nd = td->io_ops_data;
    919 	struct netio_options *o = td->eo;
    920 	struct udp_close_msg msg;
    921 	struct sockaddr *to;
    922 	socklen_t len;
    923 	int ret;
    924 
    925 	if (is_ipv6(o)) {
    926 		len = sizeof(nd->addr6);
    927 		to = (struct sockaddr *) &nd->addr6;
    928 	} else {
    929 		len = sizeof(nd->addr);
    930 		to = (struct sockaddr *) &nd->addr;
    931 	}
    932 
    933 	ret = recvfrom(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, &len);
    934 	if (ret < 0) {
    935 		td_verror(td, errno, "recvfrom udp link open");
    936 		return ret;
    937 	}
    938 
    939 	if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC ||
    940 	    ntohl(msg.cmd) != FIO_LINK_OPEN) {
    941 		log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic),
    942 								ntohl(msg.cmd));
    943 		return -1;
    944 	}
    945 
    946 	fio_gettime(&td->start, NULL);
    947 	return 0;
    948 }
    949 
    950 static int fio_netio_send_open(struct thread_data *td, struct fio_file *f)
    951 {
    952 	struct netio_data *nd = td->io_ops_data;
    953 	struct netio_options *o = td->eo;
    954 	struct udp_close_msg msg;
    955 	struct sockaddr *to;
    956 	socklen_t len;
    957 	int ret;
    958 
    959 	if (is_ipv6(o)) {
    960 		len = sizeof(nd->addr6);
    961 		to = (struct sockaddr *) &nd->addr6;
    962 	} else {
    963 		len = sizeof(nd->addr);
    964 		to = (struct sockaddr *) &nd->addr;
    965 	}
    966 
    967 	msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
    968 	msg.cmd = htonl(FIO_LINK_OPEN);
    969 
    970 	ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, len);
    971 	if (ret < 0) {
    972 		td_verror(td, errno, "sendto udp link open");
    973 		return ret;
    974 	}
    975 
    976 	return 0;
    977 }
    978 
    979 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
    980 {
    981 	int ret;
    982 	struct netio_options *o = td->eo;
    983 
    984 	if (o->listen)
    985 		ret = fio_netio_accept(td, f);
    986 	else
    987 		ret = fio_netio_connect(td, f);
    988 
    989 	if (ret) {
    990 		f->fd = -1;
    991 		return ret;
    992 	}
    993 
    994 	if (is_udp(o)) {
    995 		if (td_write(td))
    996 			ret = fio_netio_send_open(td, f);
    997 		else {
    998 			int state;
    999 
   1000 			state = td->runstate;
   1001 			td_set_runstate(td, TD_SETTING_UP);
   1002 			ret = fio_netio_udp_recv_open(td, f);
   1003 			td_set_runstate(td, state);
   1004 		}
   1005 	}
   1006 
   1007 	if (ret)
   1008 		fio_netio_close_file(td, f);
   1009 
   1010 	return ret;
   1011 }
   1012 
   1013 static int fio_fill_addr(struct thread_data *td, const char *host, int af,
   1014 			 void *dst, struct addrinfo **res)
   1015 {
   1016 	struct netio_options *o = td->eo;
   1017 	struct addrinfo hints;
   1018 	int ret;
   1019 
   1020 	if (inet_pton(af, host, dst))
   1021 		return 0;
   1022 
   1023 	memset(&hints, 0, sizeof(hints));
   1024 
   1025 	if (is_tcp(o))
   1026 		hints.ai_socktype = SOCK_STREAM;
   1027 	else
   1028 		hints.ai_socktype = SOCK_DGRAM;
   1029 
   1030 	if (is_ipv6(o))
   1031 		hints.ai_family = AF_INET6;
   1032 	else
   1033 		hints.ai_family = AF_INET;
   1034 
   1035 	ret = getaddrinfo(host, NULL, &hints, res);
   1036 	if (ret) {
   1037 		int e = EINVAL;
   1038 		char str[128];
   1039 
   1040 		if (ret == EAI_SYSTEM)
   1041 			e = errno;
   1042 
   1043 		snprintf(str, sizeof(str), "getaddrinfo: %s", gai_strerror(ret));
   1044 		td_verror(td, e, str);
   1045 		return 1;
   1046 	}
   1047 
   1048 	return 0;
   1049 }
   1050 
   1051 static int fio_netio_setup_connect_inet(struct thread_data *td,
   1052 					const char *host, unsigned short port)
   1053 {
   1054 	struct netio_data *nd = td->io_ops_data;
   1055 	struct netio_options *o = td->eo;
   1056 	struct addrinfo *res = NULL;
   1057 	void *dst, *src;
   1058 	int af, len;
   1059 
   1060 	if (!host) {
   1061 		log_err("fio: connect with no host to connect to.\n");
   1062 		if (td_read(td))
   1063 			log_err("fio: did you forget to set 'listen'?\n");
   1064 
   1065 		td_verror(td, EINVAL, "no hostname= set");
   1066 		return 1;
   1067 	}
   1068 
   1069 	nd->addr.sin_family = AF_INET;
   1070 	nd->addr.sin_port = htons(port);
   1071 	nd->addr6.sin6_family = AF_INET6;
   1072 	nd->addr6.sin6_port = htons(port);
   1073 
   1074 	if (is_ipv6(o)) {
   1075 		af = AF_INET6;
   1076 		dst = &nd->addr6.sin6_addr;
   1077 	} else {
   1078 		af = AF_INET;
   1079 		dst = &nd->addr.sin_addr;
   1080 	}
   1081 
   1082 	if (fio_fill_addr(td, host, af, dst, &res))
   1083 		return 1;
   1084 
   1085 	if (!res)
   1086 		return 0;
   1087 
   1088 	if (is_ipv6(o)) {
   1089 		len = sizeof(nd->addr6.sin6_addr);
   1090 		src = &((struct sockaddr_in6 *) res->ai_addr)->sin6_addr;
   1091 	} else {
   1092 		len = sizeof(nd->addr.sin_addr);
   1093 		src = &((struct sockaddr_in *) res->ai_addr)->sin_addr;
   1094 	}
   1095 
   1096 	memcpy(dst, src, len);
   1097 	freeaddrinfo(res);
   1098 	return 0;
   1099 }
   1100 
   1101 static int fio_netio_setup_connect_unix(struct thread_data *td,
   1102 					const char *path)
   1103 {
   1104 	struct netio_data *nd = td->io_ops_data;
   1105 	struct sockaddr_un *soun = &nd->addr_un;
   1106 
   1107 	soun->sun_family = AF_UNIX;
   1108 	memset(soun->sun_path, 0, sizeof(soun->sun_path));
   1109 	strncpy(soun->sun_path, path, sizeof(soun->sun_path) - 1);
   1110 	return 0;
   1111 }
   1112 
   1113 static int fio_netio_setup_connect(struct thread_data *td)
   1114 {
   1115 	struct netio_options *o = td->eo;
   1116 
   1117 	if (is_udp(o) || is_tcp(o))
   1118 		return fio_netio_setup_connect_inet(td, td->o.filename,o->port);
   1119 	else
   1120 		return fio_netio_setup_connect_unix(td, td->o.filename);
   1121 }
   1122 
   1123 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
   1124 {
   1125 	struct netio_data *nd = td->io_ops_data;
   1126 	struct sockaddr_un *addr = &nd->addr_un;
   1127 	mode_t mode;
   1128 	int len, fd;
   1129 
   1130 	fd = socket(AF_UNIX, SOCK_STREAM, 0);
   1131 	if (fd < 0) {
   1132 		log_err("fio: socket: %s\n", strerror(errno));
   1133 		return -1;
   1134 	}
   1135 
   1136 	mode = umask(000);
   1137 
   1138 	memset(addr, 0, sizeof(*addr));
   1139 	addr->sun_family = AF_UNIX;
   1140 	strncpy(addr->sun_path, path, sizeof(addr->sun_path) - 1);
   1141 	unlink(path);
   1142 
   1143 	len = sizeof(addr->sun_family) + strlen(path) + 1;
   1144 
   1145 	if (bind(fd, (struct sockaddr *) addr, len) < 0) {
   1146 		log_err("fio: bind: %s\n", strerror(errno));
   1147 		close(fd);
   1148 		return -1;
   1149 	}
   1150 
   1151 	umask(mode);
   1152 	nd->listenfd = fd;
   1153 	return 0;
   1154 }
   1155 
   1156 static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
   1157 {
   1158 	struct netio_data *nd = td->io_ops_data;
   1159 	struct netio_options *o = td->eo;
   1160 	struct ip_mreq mr;
   1161 	struct sockaddr_in sin;
   1162 	struct sockaddr *saddr;
   1163 	int fd, opt, type, domain;
   1164 	socklen_t len;
   1165 
   1166 	memset(&sin, 0, sizeof(sin));
   1167 
   1168 	if (o->proto == FIO_TYPE_TCP) {
   1169 		type = SOCK_STREAM;
   1170 		domain = AF_INET;
   1171 	} else if (o->proto == FIO_TYPE_TCP_V6) {
   1172 		type = SOCK_STREAM;
   1173 		domain = AF_INET6;
   1174 	} else if (o->proto == FIO_TYPE_UDP) {
   1175 		type = SOCK_DGRAM;
   1176 		domain = AF_INET;
   1177 	} else if (o->proto == FIO_TYPE_UDP_V6) {
   1178 		type = SOCK_DGRAM;
   1179 		domain = AF_INET6;
   1180 	} else {
   1181 		log_err("fio: unknown proto %d\n", o->proto);
   1182 		return 1;
   1183 	}
   1184 
   1185 	fd = socket(domain, type, 0);
   1186 	if (fd < 0) {
   1187 		td_verror(td, errno, "socket");
   1188 		return 1;
   1189 	}
   1190 
   1191 	opt = 1;
   1192 	if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *) &opt, sizeof(opt)) < 0) {
   1193 		td_verror(td, errno, "setsockopt");
   1194 		close(fd);
   1195 		return 1;
   1196 	}
   1197 #ifdef SO_REUSEPORT
   1198 	if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (void *) &opt, sizeof(opt)) < 0) {
   1199 		td_verror(td, errno, "setsockopt");
   1200 		close(fd);
   1201 		return 1;
   1202 	}
   1203 #endif
   1204 
   1205 	if (set_window_size(td, fd)) {
   1206 		close(fd);
   1207 		return 1;
   1208 	}
   1209 	if (set_mss(td, fd)) {
   1210 		close(fd);
   1211 		return 1;
   1212 	}
   1213 
   1214 	if (td->o.filename) {
   1215 		if (!is_udp(o) || !fio_netio_is_multicast(td->o.filename)) {
   1216 			log_err("fio: hostname not valid for non-multicast inbound network IO\n");
   1217 			close(fd);
   1218 			return 1;
   1219 		}
   1220 		if (is_ipv6(o)) {
   1221 			log_err("fio: IPv6 not supported for multicast network IO\n");
   1222 			close(fd);
   1223 			return 1;
   1224 		}
   1225 
   1226 		inet_aton(td->o.filename, &sin.sin_addr);
   1227 
   1228 		mr.imr_multiaddr = sin.sin_addr;
   1229 		if (o->intfc) {
   1230 			if (inet_aton(o->intfc, &mr.imr_interface) == 0) {
   1231 				log_err("fio: interface not valid interface IP\n");
   1232 				close(fd);
   1233 				return 1;
   1234 			}
   1235 		} else {
   1236 			mr.imr_interface.s_addr = htonl(INADDR_ANY);
   1237 		}
   1238 
   1239 		if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const char*)&mr, sizeof(mr)) < 0) {
   1240 			td_verror(td, errno, "setsockopt IP_ADD_MEMBERSHIP");
   1241 			close(fd);
   1242 			return 1;
   1243 		}
   1244 	}
   1245 
   1246 	if (!is_ipv6(o)) {
   1247 		saddr = (struct sockaddr *) &nd->addr;
   1248 		len = sizeof(nd->addr);
   1249 
   1250 		nd->addr.sin_family = AF_INET;
   1251 		nd->addr.sin_addr.s_addr = sin.sin_addr.s_addr ? sin.sin_addr.s_addr : htonl(INADDR_ANY);
   1252 		nd->addr.sin_port = htons(port);
   1253 	} else {
   1254 		saddr = (struct sockaddr *) &nd->addr6;
   1255 		len = sizeof(nd->addr6);
   1256 
   1257 		nd->addr6.sin6_family = AF_INET6;
   1258 		nd->addr6.sin6_addr = in6addr_any;
   1259 		nd->addr6.sin6_port = htons(port);
   1260 	}
   1261 
   1262 	if (bind(fd, saddr, len) < 0) {
   1263 		close(fd);
   1264 		td_verror(td, errno, "bind");
   1265 		return 1;
   1266 	}
   1267 
   1268 	nd->listenfd = fd;
   1269 	return 0;
   1270 }
   1271 
   1272 static int fio_netio_setup_listen(struct thread_data *td)
   1273 {
   1274 	struct netio_data *nd = td->io_ops_data;
   1275 	struct netio_options *o = td->eo;
   1276 	int ret;
   1277 
   1278 	if (is_udp(o) || is_tcp(o))
   1279 		ret = fio_netio_setup_listen_inet(td, o->port);
   1280 	else
   1281 		ret = fio_netio_setup_listen_unix(td, td->o.filename);
   1282 
   1283 	if (ret)
   1284 		return ret;
   1285 	if (is_udp(o))
   1286 		return 0;
   1287 
   1288 	if (listen(nd->listenfd, 10) < 0) {
   1289 		td_verror(td, errno, "listen");
   1290 		nd->listenfd = -1;
   1291 		return 1;
   1292 	}
   1293 
   1294 	return 0;
   1295 }
   1296 
   1297 static int fio_netio_init(struct thread_data *td)
   1298 {
   1299 	struct netio_options *o = td->eo;
   1300 	int ret;
   1301 
   1302 #ifdef WIN32
   1303 	WSADATA wsd;
   1304 	WSAStartup(MAKEWORD(2,2), &wsd);
   1305 #endif
   1306 
   1307 	if (td_random(td)) {
   1308 		log_err("fio: network IO can't be random\n");
   1309 		return 1;
   1310 	}
   1311 
   1312 	if (o->proto == FIO_TYPE_UNIX && o->port) {
   1313 		log_err("fio: network IO port not valid with unix socket\n");
   1314 		return 1;
   1315 	} else if (o->proto != FIO_TYPE_UNIX && !o->port) {
   1316 		log_err("fio: network IO requires port for tcp or udp\n");
   1317 		return 1;
   1318 	}
   1319 
   1320 	o->port += td->subjob_number;
   1321 
   1322 	if (!is_tcp(o)) {
   1323 		if (o->listen) {
   1324 			log_err("fio: listen only valid for TCP proto IO\n");
   1325 			return 1;
   1326 		}
   1327 		if (td_rw(td)) {
   1328 			log_err("fio: datagram network connections must be"
   1329 				   " read OR write\n");
   1330 			return 1;
   1331 		}
   1332 		if (o->proto == FIO_TYPE_UNIX && !td->o.filename) {
   1333 			log_err("fio: UNIX sockets need host/filename\n");
   1334 			return 1;
   1335 		}
   1336 		o->listen = td_read(td);
   1337 	}
   1338 
   1339 	if (o->listen)
   1340 		ret = fio_netio_setup_listen(td);
   1341 	else
   1342 		ret = fio_netio_setup_connect(td);
   1343 
   1344 	return ret;
   1345 }
   1346 
   1347 static void fio_netio_cleanup(struct thread_data *td)
   1348 {
   1349 	struct netio_data *nd = td->io_ops_data;
   1350 
   1351 	if (nd) {
   1352 		if (nd->listenfd != -1)
   1353 			close(nd->listenfd);
   1354 		if (nd->pipes[0] != -1)
   1355 			close(nd->pipes[0]);
   1356 		if (nd->pipes[1] != -1)
   1357 			close(nd->pipes[1]);
   1358 
   1359 		free(nd);
   1360 	}
   1361 }
   1362 
   1363 static int fio_netio_setup(struct thread_data *td)
   1364 {
   1365 	struct netio_data *nd;
   1366 
   1367 	if (!td->files_index) {
   1368 		add_file(td, td->o.filename ?: "net", 0, 0);
   1369 		td->o.nr_files = td->o.nr_files ?: 1;
   1370 		td->o.open_files++;
   1371 	}
   1372 
   1373 	if (!td->io_ops_data) {
   1374 		nd = malloc(sizeof(*nd));
   1375 
   1376 		memset(nd, 0, sizeof(*nd));
   1377 		nd->listenfd = -1;
   1378 		nd->pipes[0] = nd->pipes[1] = -1;
   1379 		td->io_ops_data = nd;
   1380 	}
   1381 
   1382 	return 0;
   1383 }
   1384 
   1385 static void fio_netio_terminate(struct thread_data *td)
   1386 {
   1387 	kill(td->pid, SIGTERM);
   1388 }
   1389 
   1390 #ifdef CONFIG_LINUX_SPLICE
   1391 static int fio_netio_setup_splice(struct thread_data *td)
   1392 {
   1393 	struct netio_data *nd;
   1394 
   1395 	fio_netio_setup(td);
   1396 
   1397 	nd = td->io_ops_data;
   1398 	if (nd) {
   1399 		if (pipe(nd->pipes) < 0)
   1400 			return 1;
   1401 
   1402 		nd->use_splice = 1;
   1403 		return 0;
   1404 	}
   1405 
   1406 	return 1;
   1407 }
   1408 
   1409 static struct ioengine_ops ioengine_splice = {
   1410 	.name			= "netsplice",
   1411 	.version		= FIO_IOOPS_VERSION,
   1412 	.prep			= fio_netio_prep,
   1413 	.queue			= fio_netio_queue,
   1414 	.setup			= fio_netio_setup_splice,
   1415 	.init			= fio_netio_init,
   1416 	.cleanup		= fio_netio_cleanup,
   1417 	.open_file		= fio_netio_open_file,
   1418 	.close_file		= fio_netio_close_file,
   1419 	.terminate		= fio_netio_terminate,
   1420 	.options		= options,
   1421 	.option_struct_size	= sizeof(struct netio_options),
   1422 	.flags			= FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
   1423 				  FIO_PIPEIO,
   1424 };
   1425 #endif
   1426 
   1427 static struct ioengine_ops ioengine_rw = {
   1428 	.name			= "net",
   1429 	.version		= FIO_IOOPS_VERSION,
   1430 	.prep			= fio_netio_prep,
   1431 	.queue			= fio_netio_queue,
   1432 	.setup			= fio_netio_setup,
   1433 	.init			= fio_netio_init,
   1434 	.cleanup		= fio_netio_cleanup,
   1435 	.open_file		= fio_netio_open_file,
   1436 	.close_file		= fio_netio_close_file,
   1437 	.terminate		= fio_netio_terminate,
   1438 	.options		= options,
   1439 	.option_struct_size	= sizeof(struct netio_options),
   1440 	.flags			= FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
   1441 				  FIO_PIPEIO | FIO_BIT_BASED,
   1442 };
   1443 
   1444 static int str_hostname_cb(void *data, const char *input)
   1445 {
   1446 	struct netio_options *o = data;
   1447 
   1448 	if (o->td->o.filename)
   1449 		free(o->td->o.filename);
   1450 	o->td->o.filename = strdup(input);
   1451 	return 0;
   1452 }
   1453 
   1454 static void fio_init fio_netio_register(void)
   1455 {
   1456 	register_ioengine(&ioengine_rw);
   1457 #ifdef CONFIG_LINUX_SPLICE
   1458 	register_ioengine(&ioengine_splice);
   1459 #endif
   1460 }
   1461 
   1462 static void fio_exit fio_netio_unregister(void)
   1463 {
   1464 	unregister_ioengine(&ioengine_rw);
   1465 #ifdef CONFIG_LINUX_SPLICE
   1466 	unregister_ioengine(&ioengine_splice);
   1467 #endif
   1468 }
   1469