1 /* 2 * net engine 3 * 4 * IO engine that reads/writes to/from sockets. 5 * 6 */ 7 #include <stdio.h> 8 #include <stdlib.h> 9 #include <unistd.h> 10 #include <signal.h> 11 #include <errno.h> 12 #include <assert.h> 13 #include <netinet/in.h> 14 #include <netinet/tcp.h> 15 #include <arpa/inet.h> 16 #include <netdb.h> 17 #include <sys/poll.h> 18 #include <sys/types.h> 19 #include <sys/stat.h> 20 #include <sys/socket.h> 21 #include <sys/un.h> 22 23 #include "../fio.h" 24 #include "../verify.h" 25 26 struct netio_data { 27 int listenfd; 28 int use_splice; 29 int seq_off; 30 int pipes[2]; 31 struct sockaddr_in addr; 32 struct sockaddr_in6 addr6; 33 struct sockaddr_un addr_un; 34 uint64_t udp_send_seq; 35 uint64_t udp_recv_seq; 36 }; 37 38 struct netio_options { 39 struct thread_data *td; 40 unsigned int port; 41 unsigned int proto; 42 unsigned int listen; 43 unsigned int pingpong; 44 unsigned int nodelay; 45 unsigned int ttl; 46 unsigned int window_size; 47 unsigned int mss; 48 char *intfc; 49 }; 50 51 struct udp_close_msg { 52 uint32_t magic; 53 uint32_t cmd; 54 }; 55 56 struct udp_seq { 57 uint64_t magic; 58 uint64_t seq; 59 uint64_t bs; 60 }; 61 62 enum { 63 FIO_LINK_CLOSE = 0x89, 64 FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b, 65 FIO_LINK_OPEN = 0x98, 66 FIO_UDP_SEQ_MAGIC = 0x657375716e556563ULL, 67 68 FIO_TYPE_TCP = 1, 69 FIO_TYPE_UDP = 2, 70 FIO_TYPE_UNIX = 3, 71 FIO_TYPE_TCP_V6 = 4, 72 FIO_TYPE_UDP_V6 = 5, 73 }; 74 75 static int str_hostname_cb(void *data, const char *input); 76 static struct fio_option options[] = { 77 { 78 .name = "hostname", 79 .lname = "net engine hostname", 80 .type = FIO_OPT_STR_STORE, 81 .cb = str_hostname_cb, 82 .help = "Hostname for net IO engine", 83 .category = FIO_OPT_C_ENGINE, 84 .group = FIO_OPT_G_NETIO, 85 }, 86 { 87 .name = "port", 88 .lname = "net engine port", 89 .type = FIO_OPT_INT, 90 .off1 = offsetof(struct netio_options, port), 91 .minval = 1, 92 .maxval = 65535, 93 .help = "Port to use for TCP or UDP net connections", 94 .category = FIO_OPT_C_ENGINE, 95 .group = FIO_OPT_G_NETIO, 96 }, 97 { 98 .name = "protocol", 99 .lname = "net engine protocol", 100 .alias = "proto", 101 .type = FIO_OPT_STR, 102 .off1 = offsetof(struct netio_options, proto), 103 .help = "Network protocol to use", 104 .def = "tcp", 105 .posval = { 106 { .ival = "tcp", 107 .oval = FIO_TYPE_TCP, 108 .help = "Transmission Control Protocol", 109 }, 110 #ifdef CONFIG_IPV6 111 { .ival = "tcpv6", 112 .oval = FIO_TYPE_TCP_V6, 113 .help = "Transmission Control Protocol V6", 114 }, 115 #endif 116 { .ival = "udp", 117 .oval = FIO_TYPE_UDP, 118 .help = "User Datagram Protocol", 119 }, 120 #ifdef CONFIG_IPV6 121 { .ival = "udpv6", 122 .oval = FIO_TYPE_UDP_V6, 123 .help = "User Datagram Protocol V6", 124 }, 125 #endif 126 { .ival = "unix", 127 .oval = FIO_TYPE_UNIX, 128 .help = "UNIX domain socket", 129 }, 130 }, 131 .category = FIO_OPT_C_ENGINE, 132 .group = FIO_OPT_G_NETIO, 133 }, 134 #ifdef CONFIG_TCP_NODELAY 135 { 136 .name = "nodelay", 137 .type = FIO_OPT_BOOL, 138 .off1 = offsetof(struct netio_options, nodelay), 139 .help = "Use TCP_NODELAY on TCP connections", 140 .category = FIO_OPT_C_ENGINE, 141 .group = FIO_OPT_G_NETIO, 142 }, 143 #endif 144 { 145 .name = "listen", 146 .lname = "net engine listen", 147 .type = FIO_OPT_STR_SET, 148 .off1 = offsetof(struct netio_options, listen), 149 .help = "Listen for incoming TCP connections", 150 .category = FIO_OPT_C_ENGINE, 151 .group = FIO_OPT_G_NETIO, 152 }, 153 { 154 .name = "pingpong", 155 .type = FIO_OPT_STR_SET, 156 .off1 = offsetof(struct netio_options, pingpong), 157 .help = "Ping-pong IO requests", 158 .category = FIO_OPT_C_ENGINE, 159 .group = FIO_OPT_G_NETIO, 160 }, 161 { 162 .name = "interface", 163 .lname = "net engine interface", 164 .type = FIO_OPT_STR_STORE, 165 .off1 = offsetof(struct netio_options, intfc), 166 .help = "Network interface to use", 167 .category = FIO_OPT_C_ENGINE, 168 .group = FIO_OPT_G_NETIO, 169 }, 170 { 171 .name = "ttl", 172 .lname = "net engine multicast ttl", 173 .type = FIO_OPT_INT, 174 .off1 = offsetof(struct netio_options, ttl), 175 .def = "1", 176 .minval = 0, 177 .help = "Time-to-live value for outgoing UDP multicast packets", 178 .category = FIO_OPT_C_ENGINE, 179 .group = FIO_OPT_G_NETIO, 180 }, 181 #ifdef CONFIG_NET_WINDOWSIZE 182 { 183 .name = "window_size", 184 .lname = "Window Size", 185 .type = FIO_OPT_INT, 186 .off1 = offsetof(struct netio_options, window_size), 187 .minval = 0, 188 .help = "Set socket buffer window size", 189 .category = FIO_OPT_C_ENGINE, 190 .group = FIO_OPT_G_NETIO, 191 }, 192 #endif 193 #ifdef CONFIG_NET_MSS 194 { 195 .name = "mss", 196 .lname = "Maximum segment size", 197 .type = FIO_OPT_INT, 198 .off1 = offsetof(struct netio_options, mss), 199 .minval = 0, 200 .help = "Set TCP maximum segment size", 201 .category = FIO_OPT_C_ENGINE, 202 .group = FIO_OPT_G_NETIO, 203 }, 204 #endif 205 { 206 .name = NULL, 207 }, 208 }; 209 210 static inline int is_udp(struct netio_options *o) 211 { 212 return o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_UDP_V6; 213 } 214 215 static inline int is_tcp(struct netio_options *o) 216 { 217 return o->proto == FIO_TYPE_TCP || o->proto == FIO_TYPE_TCP_V6; 218 } 219 220 static inline int is_ipv6(struct netio_options *o) 221 { 222 return o->proto == FIO_TYPE_UDP_V6 || o->proto == FIO_TYPE_TCP_V6; 223 } 224 225 static int set_window_size(struct thread_data *td, int fd) 226 { 227 #ifdef CONFIG_NET_WINDOWSIZE 228 struct netio_options *o = td->eo; 229 unsigned int wss; 230 int snd, rcv, ret; 231 232 if (!o->window_size) 233 return 0; 234 235 rcv = o->listen || o->pingpong; 236 snd = !o->listen || o->pingpong; 237 wss = o->window_size; 238 ret = 0; 239 240 if (rcv) { 241 ret = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *) &wss, 242 sizeof(wss)); 243 if (ret < 0) 244 td_verror(td, errno, "rcvbuf window size"); 245 } 246 if (snd && !ret) { 247 ret = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *) &wss, 248 sizeof(wss)); 249 if (ret < 0) 250 td_verror(td, errno, "sndbuf window size"); 251 } 252 253 return ret; 254 #else 255 td_verror(td, -EINVAL, "setsockopt window size"); 256 return -1; 257 #endif 258 } 259 260 static int set_mss(struct thread_data *td, int fd) 261 { 262 #ifdef CONFIG_NET_MSS 263 struct netio_options *o = td->eo; 264 unsigned int mss; 265 int ret; 266 267 if (!o->mss || !is_tcp(o)) 268 return 0; 269 270 mss = o->mss; 271 ret = setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, (void *) &mss, 272 sizeof(mss)); 273 if (ret < 0) 274 td_verror(td, errno, "setsockopt TCP_MAXSEG"); 275 276 return ret; 277 #else 278 td_verror(td, -EINVAL, "setsockopt TCP_MAXSEG"); 279 return -1; 280 #endif 281 } 282 283 284 /* 285 * Return -1 for error and 'nr events' for a positive number 286 * of events 287 */ 288 static int poll_wait(struct thread_data *td, int fd, short events) 289 { 290 struct pollfd pfd; 291 int ret; 292 293 while (!td->terminate) { 294 pfd.fd = fd; 295 pfd.events = events; 296 ret = poll(&pfd, 1, -1); 297 if (ret < 0) { 298 if (errno == EINTR) 299 break; 300 301 td_verror(td, errno, "poll"); 302 return -1; 303 } else if (!ret) 304 continue; 305 306 break; 307 } 308 309 if (pfd.revents & events) 310 return 1; 311 312 return -1; 313 } 314 315 static int fio_netio_is_multicast(const char *mcaddr) 316 { 317 in_addr_t addr = inet_network(mcaddr); 318 if (addr == -1) 319 return 0; 320 321 if (inet_network("224.0.0.0") <= addr && 322 inet_network("239.255.255.255") >= addr) 323 return 1; 324 325 return 0; 326 } 327 328 329 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u) 330 { 331 struct netio_options *o = td->eo; 332 333 /* 334 * Make sure we don't see spurious reads to a receiver, and vice versa 335 */ 336 if (is_tcp(o)) 337 return 0; 338 339 if ((o->listen && io_u->ddir == DDIR_WRITE) || 340 (!o->listen && io_u->ddir == DDIR_READ)) { 341 td_verror(td, EINVAL, "bad direction"); 342 return 1; 343 } 344 345 return 0; 346 } 347 348 #ifdef CONFIG_LINUX_SPLICE 349 static int splice_io_u(int fdin, int fdout, unsigned int len) 350 { 351 int bytes = 0; 352 353 while (len) { 354 int ret = splice(fdin, NULL, fdout, NULL, len, 0); 355 356 if (ret < 0) { 357 if (!bytes) 358 bytes = ret; 359 360 break; 361 } else if (!ret) 362 break; 363 364 bytes += ret; 365 len -= ret; 366 } 367 368 return bytes; 369 } 370 371 /* 372 * Receive bytes from a socket and fill them into the internal pipe 373 */ 374 static int splice_in(struct thread_data *td, struct io_u *io_u) 375 { 376 struct netio_data *nd = td->io_ops->data; 377 378 return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen); 379 } 380 381 /* 382 * Transmit 'len' bytes from the internal pipe 383 */ 384 static int splice_out(struct thread_data *td, struct io_u *io_u, 385 unsigned int len) 386 { 387 struct netio_data *nd = td->io_ops->data; 388 389 return splice_io_u(nd->pipes[0], io_u->file->fd, len); 390 } 391 392 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len) 393 { 394 struct iovec iov = { 395 .iov_base = io_u->xfer_buf, 396 .iov_len = len, 397 }; 398 int bytes = 0; 399 400 while (iov.iov_len) { 401 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE); 402 403 if (ret < 0) { 404 if (!bytes) 405 bytes = ret; 406 break; 407 } else if (!ret) 408 break; 409 410 iov.iov_len -= ret; 411 iov.iov_base += ret; 412 bytes += ret; 413 } 414 415 return bytes; 416 417 } 418 419 /* 420 * vmsplice() pipe to io_u buffer 421 */ 422 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u, 423 unsigned int len) 424 { 425 struct netio_data *nd = td->io_ops->data; 426 427 return vmsplice_io_u(io_u, nd->pipes[0], len); 428 } 429 430 /* 431 * vmsplice() io_u to pipe 432 */ 433 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u) 434 { 435 struct netio_data *nd = td->io_ops->data; 436 437 return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen); 438 } 439 440 /* 441 * splice receive - transfer socket data into a pipe using splice, then map 442 * that pipe data into the io_u using vmsplice. 443 */ 444 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u) 445 { 446 int ret; 447 448 ret = splice_in(td, io_u); 449 if (ret > 0) 450 return vmsplice_io_u_out(td, io_u, ret); 451 452 return ret; 453 } 454 455 /* 456 * splice transmit - map data from the io_u into a pipe by using vmsplice, 457 * then transfer that pipe to a socket using splice. 458 */ 459 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u) 460 { 461 int ret; 462 463 ret = vmsplice_io_u_in(td, io_u); 464 if (ret > 0) 465 return splice_out(td, io_u, ret); 466 467 return ret; 468 } 469 #else 470 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u) 471 { 472 errno = EOPNOTSUPP; 473 return -1; 474 } 475 476 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u) 477 { 478 errno = EOPNOTSUPP; 479 return -1; 480 } 481 #endif 482 483 static void store_udp_seq(struct netio_data *nd, struct io_u *io_u) 484 { 485 struct udp_seq *us; 486 487 if (io_u->xfer_buflen < sizeof(*us)) 488 return; 489 490 us = io_u->xfer_buf + io_u->xfer_buflen - sizeof(*us); 491 us->magic = cpu_to_le64((uint64_t) FIO_UDP_SEQ_MAGIC); 492 us->bs = cpu_to_le64((uint64_t) io_u->xfer_buflen); 493 us->seq = cpu_to_le64(nd->udp_send_seq++); 494 } 495 496 static void verify_udp_seq(struct thread_data *td, struct netio_data *nd, 497 struct io_u *io_u) 498 { 499 struct udp_seq *us; 500 uint64_t seq; 501 502 if (io_u->xfer_buflen < sizeof(*us)) 503 return; 504 505 if (nd->seq_off) 506 return; 507 508 us = io_u->xfer_buf + io_u->xfer_buflen - sizeof(*us); 509 if (le64_to_cpu(us->magic) != FIO_UDP_SEQ_MAGIC) 510 return; 511 if (le64_to_cpu(us->bs) != io_u->xfer_buflen) { 512 nd->seq_off = 1; 513 return; 514 } 515 516 seq = le64_to_cpu(us->seq); 517 518 if (seq != nd->udp_recv_seq) 519 td->ts.drop_io_u[io_u->ddir] += seq - nd->udp_recv_seq; 520 521 nd->udp_recv_seq = seq + 1; 522 } 523 524 static int fio_netio_send(struct thread_data *td, struct io_u *io_u) 525 { 526 struct netio_data *nd = td->io_ops->data; 527 struct netio_options *o = td->eo; 528 int ret, flags = 0; 529 530 do { 531 if (is_udp(o)) { 532 const struct sockaddr *to; 533 socklen_t len; 534 535 if (is_ipv6(o)) { 536 to = (struct sockaddr *) &nd->addr6; 537 len = sizeof(nd->addr6); 538 } else { 539 to = (struct sockaddr *) &nd->addr; 540 len = sizeof(nd->addr); 541 } 542 543 if (td->o.verify == VERIFY_NONE) 544 store_udp_seq(nd, io_u); 545 546 ret = sendto(io_u->file->fd, io_u->xfer_buf, 547 io_u->xfer_buflen, flags, to, len); 548 } else { 549 /* 550 * if we are going to write more, set MSG_MORE 551 */ 552 #ifdef MSG_MORE 553 if ((td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < 554 td->o.size) && !o->pingpong) 555 flags |= MSG_MORE; 556 #endif 557 ret = send(io_u->file->fd, io_u->xfer_buf, 558 io_u->xfer_buflen, flags); 559 } 560 if (ret > 0) 561 break; 562 563 ret = poll_wait(td, io_u->file->fd, POLLOUT); 564 if (ret <= 0) 565 break; 566 } while (1); 567 568 return ret; 569 } 570 571 static int is_close_msg(struct io_u *io_u, int len) 572 { 573 struct udp_close_msg *msg; 574 575 if (len != sizeof(struct udp_close_msg)) 576 return 0; 577 578 msg = io_u->xfer_buf; 579 if (le32_to_cpu(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC) 580 return 0; 581 if (le32_to_cpu(msg->cmd) != FIO_LINK_CLOSE) 582 return 0; 583 584 return 1; 585 } 586 587 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u) 588 { 589 struct netio_data *nd = td->io_ops->data; 590 struct netio_options *o = td->eo; 591 int ret, flags = 0; 592 593 do { 594 if (is_udp(o)) { 595 struct sockaddr *from; 596 socklen_t l, *len = &l; 597 598 if (o->listen) { 599 if (!is_ipv6(o)) { 600 from = (struct sockaddr *) &nd->addr; 601 *len = sizeof(nd->addr); 602 } else { 603 from = (struct sockaddr *) &nd->addr6; 604 *len = sizeof(nd->addr6); 605 } 606 } else { 607 from = NULL; 608 len = NULL; 609 } 610 611 ret = recvfrom(io_u->file->fd, io_u->xfer_buf, 612 io_u->xfer_buflen, flags, from, len); 613 614 if (is_close_msg(io_u, ret)) { 615 td->done = 1; 616 return 0; 617 } 618 } else { 619 ret = recv(io_u->file->fd, io_u->xfer_buf, 620 io_u->xfer_buflen, flags); 621 622 if (is_close_msg(io_u, ret)) { 623 td->done = 1; 624 return 0; 625 } 626 } 627 if (ret > 0) 628 break; 629 else if (!ret && (flags & MSG_WAITALL)) 630 break; 631 632 ret = poll_wait(td, io_u->file->fd, POLLIN); 633 if (ret <= 0) 634 break; 635 flags |= MSG_WAITALL; 636 } while (1); 637 638 if (is_udp(o) && td->o.verify == VERIFY_NONE) 639 verify_udp_seq(td, nd, io_u); 640 641 return ret; 642 } 643 644 static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u, 645 enum fio_ddir ddir) 646 { 647 struct netio_data *nd = td->io_ops->data; 648 struct netio_options *o = td->eo; 649 int ret; 650 651 if (ddir == DDIR_WRITE) { 652 if (!nd->use_splice || is_udp(o) || 653 o->proto == FIO_TYPE_UNIX) 654 ret = fio_netio_send(td, io_u); 655 else 656 ret = fio_netio_splice_out(td, io_u); 657 } else if (ddir == DDIR_READ) { 658 if (!nd->use_splice || is_udp(o) || 659 o->proto == FIO_TYPE_UNIX) 660 ret = fio_netio_recv(td, io_u); 661 else 662 ret = fio_netio_splice_in(td, io_u); 663 } else 664 ret = 0; /* must be a SYNC */ 665 666 if (ret != (int) io_u->xfer_buflen) { 667 if (ret > 0) { 668 io_u->resid = io_u->xfer_buflen - ret; 669 io_u->error = 0; 670 return FIO_Q_COMPLETED; 671 } else if (!ret) 672 return FIO_Q_BUSY; 673 else { 674 int err = errno; 675 676 if (ddir == DDIR_WRITE && err == EMSGSIZE) 677 return FIO_Q_BUSY; 678 679 io_u->error = err; 680 } 681 } 682 683 if (io_u->error) 684 td_verror(td, io_u->error, "xfer"); 685 686 return FIO_Q_COMPLETED; 687 } 688 689 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u) 690 { 691 struct netio_options *o = td->eo; 692 int ret; 693 694 fio_ro_check(td, io_u); 695 696 ret = __fio_netio_queue(td, io_u, io_u->ddir); 697 if (!o->pingpong || ret != FIO_Q_COMPLETED) 698 return ret; 699 700 /* 701 * For ping-pong mode, receive or send reply as needed 702 */ 703 if (td_read(td) && io_u->ddir == DDIR_READ) 704 ret = __fio_netio_queue(td, io_u, DDIR_WRITE); 705 else if (td_write(td) && io_u->ddir == DDIR_WRITE) 706 ret = __fio_netio_queue(td, io_u, DDIR_READ); 707 708 return ret; 709 } 710 711 static int fio_netio_connect(struct thread_data *td, struct fio_file *f) 712 { 713 struct netio_data *nd = td->io_ops->data; 714 struct netio_options *o = td->eo; 715 int type, domain; 716 717 if (o->proto == FIO_TYPE_TCP) { 718 domain = AF_INET; 719 type = SOCK_STREAM; 720 } else if (o->proto == FIO_TYPE_TCP_V6) { 721 domain = AF_INET6; 722 type = SOCK_STREAM; 723 } else if (o->proto == FIO_TYPE_UDP) { 724 domain = AF_INET; 725 type = SOCK_DGRAM; 726 } else if (o->proto == FIO_TYPE_UDP_V6) { 727 domain = AF_INET6; 728 type = SOCK_DGRAM; 729 } else if (o->proto == FIO_TYPE_UNIX) { 730 domain = AF_UNIX; 731 type = SOCK_STREAM; 732 } else { 733 log_err("fio: bad network type %d\n", o->proto); 734 f->fd = -1; 735 return 1; 736 } 737 738 f->fd = socket(domain, type, 0); 739 if (f->fd < 0) { 740 td_verror(td, errno, "socket"); 741 return 1; 742 } 743 744 #ifdef CONFIG_TCP_NODELAY 745 if (o->nodelay && is_tcp(o)) { 746 int optval = 1; 747 748 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) { 749 log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno)); 750 return 1; 751 } 752 } 753 #endif 754 755 if (set_window_size(td, f->fd)) { 756 close(f->fd); 757 return 1; 758 } 759 if (set_mss(td, f->fd)) { 760 close(f->fd); 761 return 1; 762 } 763 764 if (is_udp(o)) { 765 if (!fio_netio_is_multicast(td->o.filename)) 766 return 0; 767 if (is_ipv6(o)) { 768 log_err("fio: multicast not supported on IPv6\n"); 769 close(f->fd); 770 return 1; 771 } 772 773 if (o->intfc) { 774 struct in_addr interface_addr; 775 776 if (inet_aton(o->intfc, &interface_addr) == 0) { 777 log_err("fio: interface not valid interface IP\n"); 778 close(f->fd); 779 return 1; 780 } 781 if (setsockopt(f->fd, IPPROTO_IP, IP_MULTICAST_IF, (const char*)&interface_addr, sizeof(interface_addr)) < 0) { 782 td_verror(td, errno, "setsockopt IP_MULTICAST_IF"); 783 close(f->fd); 784 return 1; 785 } 786 } 787 if (setsockopt(f->fd, IPPROTO_IP, IP_MULTICAST_TTL, (const char*)&o->ttl, sizeof(o->ttl)) < 0) { 788 td_verror(td, errno, "setsockopt IP_MULTICAST_TTL"); 789 close(f->fd); 790 return 1; 791 } 792 return 0; 793 } else if (o->proto == FIO_TYPE_TCP) { 794 socklen_t len = sizeof(nd->addr); 795 796 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) { 797 td_verror(td, errno, "connect"); 798 close(f->fd); 799 return 1; 800 } 801 } else if (o->proto == FIO_TYPE_TCP_V6) { 802 socklen_t len = sizeof(nd->addr6); 803 804 if (connect(f->fd, (struct sockaddr *) &nd->addr6, len) < 0) { 805 td_verror(td, errno, "connect"); 806 close(f->fd); 807 return 1; 808 } 809 810 } else { 811 struct sockaddr_un *addr = &nd->addr_un; 812 socklen_t len; 813 814 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1; 815 816 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) { 817 td_verror(td, errno, "connect"); 818 close(f->fd); 819 return 1; 820 } 821 } 822 823 return 0; 824 } 825 826 static int fio_netio_accept(struct thread_data *td, struct fio_file *f) 827 { 828 struct netio_data *nd = td->io_ops->data; 829 struct netio_options *o = td->eo; 830 socklen_t socklen; 831 int state; 832 833 if (is_udp(o)) { 834 f->fd = nd->listenfd; 835 return 0; 836 } 837 838 state = td->runstate; 839 td_set_runstate(td, TD_SETTING_UP); 840 841 log_info("fio: waiting for connection\n"); 842 843 if (poll_wait(td, nd->listenfd, POLLIN) < 0) 844 goto err; 845 846 if (o->proto == FIO_TYPE_TCP) { 847 socklen = sizeof(nd->addr); 848 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen); 849 } else { 850 socklen = sizeof(nd->addr6); 851 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr6, &socklen); 852 } 853 854 if (f->fd < 0) { 855 td_verror(td, errno, "accept"); 856 goto err; 857 } 858 859 #ifdef CONFIG_TCP_NODELAY 860 if (o->nodelay && is_tcp(o)) { 861 int optval = 1; 862 863 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) { 864 log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno)); 865 return 1; 866 } 867 } 868 #endif 869 870 reset_all_stats(td); 871 td_set_runstate(td, state); 872 return 0; 873 err: 874 td_set_runstate(td, state); 875 return 1; 876 } 877 878 static void fio_netio_send_close(struct thread_data *td, struct fio_file *f) 879 { 880 struct netio_data *nd = td->io_ops->data; 881 struct netio_options *o = td->eo; 882 struct udp_close_msg msg; 883 struct sockaddr *to; 884 socklen_t len; 885 int ret; 886 887 if (is_ipv6(o)) { 888 to = (struct sockaddr *) &nd->addr6; 889 len = sizeof(nd->addr6); 890 } else { 891 to = (struct sockaddr *) &nd->addr; 892 len = sizeof(nd->addr); 893 } 894 895 msg.magic = cpu_to_le32((uint32_t) FIO_LINK_OPEN_CLOSE_MAGIC); 896 msg.cmd = cpu_to_le32((uint32_t) FIO_LINK_CLOSE); 897 898 ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, len); 899 if (ret < 0) 900 td_verror(td, errno, "sendto udp link close"); 901 } 902 903 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f) 904 { 905 /* 906 * Notify the receiver that we are closing down the link 907 */ 908 fio_netio_send_close(td, f); 909 910 return generic_close_file(td, f); 911 } 912 913 static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f) 914 { 915 struct netio_data *nd = td->io_ops->data; 916 struct netio_options *o = td->eo; 917 struct udp_close_msg msg; 918 struct sockaddr *to; 919 socklen_t len; 920 int ret; 921 922 if (is_ipv6(o)) { 923 len = sizeof(nd->addr6); 924 to = (struct sockaddr *) &nd->addr6; 925 } else { 926 len = sizeof(nd->addr); 927 to = (struct sockaddr *) &nd->addr; 928 } 929 930 ret = recvfrom(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, &len); 931 if (ret < 0) { 932 td_verror(td, errno, "recvfrom udp link open"); 933 return ret; 934 } 935 936 if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC || 937 ntohl(msg.cmd) != FIO_LINK_OPEN) { 938 log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic), 939 ntohl(msg.cmd)); 940 return -1; 941 } 942 943 fio_gettime(&td->start, NULL); 944 return 0; 945 } 946 947 static int fio_netio_send_open(struct thread_data *td, struct fio_file *f) 948 { 949 struct netio_data *nd = td->io_ops->data; 950 struct netio_options *o = td->eo; 951 struct udp_close_msg msg; 952 struct sockaddr *to; 953 socklen_t len; 954 int ret; 955 956 if (is_ipv6(o)) { 957 len = sizeof(nd->addr6); 958 to = (struct sockaddr *) &nd->addr6; 959 } else { 960 len = sizeof(nd->addr); 961 to = (struct sockaddr *) &nd->addr; 962 } 963 964 msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC); 965 msg.cmd = htonl(FIO_LINK_OPEN); 966 967 ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, len); 968 if (ret < 0) { 969 td_verror(td, errno, "sendto udp link open"); 970 return ret; 971 } 972 973 return 0; 974 } 975 976 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f) 977 { 978 int ret; 979 struct netio_options *o = td->eo; 980 981 if (o->listen) 982 ret = fio_netio_accept(td, f); 983 else 984 ret = fio_netio_connect(td, f); 985 986 if (ret) { 987 f->fd = -1; 988 return ret; 989 } 990 991 if (is_udp(o)) { 992 if (td_write(td)) 993 ret = fio_netio_send_open(td, f); 994 else { 995 int state; 996 997 state = td->runstate; 998 td_set_runstate(td, TD_SETTING_UP); 999 ret = fio_netio_udp_recv_open(td, f); 1000 td_set_runstate(td, state); 1001 } 1002 } 1003 1004 if (ret) 1005 fio_netio_close_file(td, f); 1006 1007 return ret; 1008 } 1009 1010 static int fio_fill_addr(struct thread_data *td, const char *host, int af, 1011 void *dst, struct addrinfo **res) 1012 { 1013 struct netio_options *o = td->eo; 1014 struct addrinfo hints; 1015 int ret; 1016 1017 if (inet_pton(af, host, dst)) 1018 return 0; 1019 1020 memset(&hints, 0, sizeof(hints)); 1021 1022 if (is_tcp(o)) 1023 hints.ai_socktype = SOCK_STREAM; 1024 else 1025 hints.ai_socktype = SOCK_DGRAM; 1026 1027 if (is_ipv6(o)) 1028 hints.ai_family = AF_INET6; 1029 else 1030 hints.ai_family = AF_INET; 1031 1032 ret = getaddrinfo(host, NULL, &hints, res); 1033 if (ret) { 1034 int e = EINVAL; 1035 char str[128]; 1036 1037 if (ret == EAI_SYSTEM) 1038 e = errno; 1039 1040 snprintf(str, sizeof(str), "getaddrinfo: %s", gai_strerror(ret)); 1041 td_verror(td, e, str); 1042 return 1; 1043 } 1044 1045 return 0; 1046 } 1047 1048 static int fio_netio_setup_connect_inet(struct thread_data *td, 1049 const char *host, unsigned short port) 1050 { 1051 struct netio_data *nd = td->io_ops->data; 1052 struct netio_options *o = td->eo; 1053 struct addrinfo *res = NULL; 1054 void *dst, *src; 1055 int af, len; 1056 1057 if (!host) { 1058 log_err("fio: connect with no host to connect to.\n"); 1059 if (td_read(td)) 1060 log_err("fio: did you forget to set 'listen'?\n"); 1061 1062 td_verror(td, EINVAL, "no hostname= set"); 1063 return 1; 1064 } 1065 1066 nd->addr.sin_family = AF_INET; 1067 nd->addr.sin_port = htons(port); 1068 nd->addr6.sin6_family = AF_INET6; 1069 nd->addr6.sin6_port = htons(port); 1070 1071 if (is_ipv6(o)) { 1072 af = AF_INET6; 1073 dst = &nd->addr6.sin6_addr; 1074 } else { 1075 af = AF_INET; 1076 dst = &nd->addr.sin_addr; 1077 } 1078 1079 if (fio_fill_addr(td, host, af, dst, &res)) 1080 return 1; 1081 1082 if (!res) 1083 return 0; 1084 1085 if (is_ipv6(o)) { 1086 len = sizeof(nd->addr6.sin6_addr); 1087 src = &((struct sockaddr_in6 *) res->ai_addr)->sin6_addr; 1088 } else { 1089 len = sizeof(nd->addr.sin_addr); 1090 src = &((struct sockaddr_in *) res->ai_addr)->sin_addr; 1091 } 1092 1093 memcpy(dst, src, len); 1094 freeaddrinfo(res); 1095 return 0; 1096 } 1097 1098 static int fio_netio_setup_connect_unix(struct thread_data *td, 1099 const char *path) 1100 { 1101 struct netio_data *nd = td->io_ops->data; 1102 struct sockaddr_un *soun = &nd->addr_un; 1103 1104 soun->sun_family = AF_UNIX; 1105 memset(soun->sun_path, 0, sizeof(soun->sun_path)); 1106 strncpy(soun->sun_path, path, sizeof(soun->sun_path) - 1); 1107 return 0; 1108 } 1109 1110 static int fio_netio_setup_connect(struct thread_data *td) 1111 { 1112 struct netio_options *o = td->eo; 1113 1114 if (is_udp(o) || is_tcp(o)) 1115 return fio_netio_setup_connect_inet(td, td->o.filename,o->port); 1116 else 1117 return fio_netio_setup_connect_unix(td, td->o.filename); 1118 } 1119 1120 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path) 1121 { 1122 struct netio_data *nd = td->io_ops->data; 1123 struct sockaddr_un *addr = &nd->addr_un; 1124 mode_t mode; 1125 int len, fd; 1126 1127 fd = socket(AF_UNIX, SOCK_STREAM, 0); 1128 if (fd < 0) { 1129 log_err("fio: socket: %s\n", strerror(errno)); 1130 return -1; 1131 } 1132 1133 mode = umask(000); 1134 1135 memset(addr, 0, sizeof(*addr)); 1136 addr->sun_family = AF_UNIX; 1137 strncpy(addr->sun_path, path, sizeof(addr->sun_path) - 1); 1138 unlink(path); 1139 1140 len = sizeof(addr->sun_family) + strlen(path) + 1; 1141 1142 if (bind(fd, (struct sockaddr *) addr, len) < 0) { 1143 log_err("fio: bind: %s\n", strerror(errno)); 1144 close(fd); 1145 return -1; 1146 } 1147 1148 umask(mode); 1149 nd->listenfd = fd; 1150 return 0; 1151 } 1152 1153 static int fio_netio_setup_listen_inet(struct thread_data *td, short port) 1154 { 1155 struct netio_data *nd = td->io_ops->data; 1156 struct netio_options *o = td->eo; 1157 struct ip_mreq mr; 1158 struct sockaddr_in sin; 1159 struct sockaddr *saddr; 1160 int fd, opt, type, domain; 1161 socklen_t len; 1162 1163 memset(&sin, 0, sizeof(sin)); 1164 1165 if (o->proto == FIO_TYPE_TCP) { 1166 type = SOCK_STREAM; 1167 domain = AF_INET; 1168 } else if (o->proto == FIO_TYPE_TCP_V6) { 1169 type = SOCK_STREAM; 1170 domain = AF_INET6; 1171 } else if (o->proto == FIO_TYPE_UDP) { 1172 type = SOCK_DGRAM; 1173 domain = AF_INET; 1174 } else if (o->proto == FIO_TYPE_UDP_V6) { 1175 type = SOCK_DGRAM; 1176 domain = AF_INET6; 1177 } else { 1178 log_err("fio: unknown proto %d\n", o->proto); 1179 return 1; 1180 } 1181 1182 fd = socket(domain, type, 0); 1183 if (fd < 0) { 1184 td_verror(td, errno, "socket"); 1185 return 1; 1186 } 1187 1188 opt = 1; 1189 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *) &opt, sizeof(opt)) < 0) { 1190 td_verror(td, errno, "setsockopt"); 1191 close(fd); 1192 return 1; 1193 } 1194 #ifdef SO_REUSEPORT 1195 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (void *) &opt, sizeof(opt)) < 0) { 1196 td_verror(td, errno, "setsockopt"); 1197 close(fd); 1198 return 1; 1199 } 1200 #endif 1201 1202 if (set_window_size(td, fd)) { 1203 close(fd); 1204 return 1; 1205 } 1206 if (set_mss(td, fd)) { 1207 close(fd); 1208 return 1; 1209 } 1210 1211 if (td->o.filename) { 1212 if (!is_udp(o) || !fio_netio_is_multicast(td->o.filename)) { 1213 log_err("fio: hostname not valid for non-multicast inbound network IO\n"); 1214 close(fd); 1215 return 1; 1216 } 1217 if (is_ipv6(o)) { 1218 log_err("fio: IPv6 not supported for multicast network IO"); 1219 close(fd); 1220 return 1; 1221 } 1222 1223 inet_aton(td->o.filename, &sin.sin_addr); 1224 1225 mr.imr_multiaddr = sin.sin_addr; 1226 if (o->intfc) { 1227 if (inet_aton(o->intfc, &mr.imr_interface) == 0) { 1228 log_err("fio: interface not valid interface IP\n"); 1229 close(fd); 1230 return 1; 1231 } 1232 } else { 1233 mr.imr_interface.s_addr = htonl(INADDR_ANY); 1234 } 1235 1236 if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const char*)&mr, sizeof(mr)) < 0) { 1237 td_verror(td, errno, "setsockopt IP_ADD_MEMBERSHIP"); 1238 close(fd); 1239 return 1; 1240 } 1241 } 1242 1243 if (!is_ipv6(o)) { 1244 saddr = (struct sockaddr *) &nd->addr; 1245 len = sizeof(nd->addr); 1246 1247 nd->addr.sin_family = AF_INET; 1248 nd->addr.sin_addr.s_addr = sin.sin_addr.s_addr ? sin.sin_addr.s_addr : htonl(INADDR_ANY); 1249 nd->addr.sin_port = htons(port); 1250 } else { 1251 saddr = (struct sockaddr *) &nd->addr6; 1252 len = sizeof(nd->addr6); 1253 1254 nd->addr6.sin6_family = AF_INET6; 1255 nd->addr6.sin6_addr = in6addr_any; 1256 nd->addr6.sin6_port = htons(port); 1257 } 1258 1259 if (bind(fd, saddr, len) < 0) { 1260 close(fd); 1261 td_verror(td, errno, "bind"); 1262 return 1; 1263 } 1264 1265 nd->listenfd = fd; 1266 return 0; 1267 } 1268 1269 static int fio_netio_setup_listen(struct thread_data *td) 1270 { 1271 struct netio_data *nd = td->io_ops->data; 1272 struct netio_options *o = td->eo; 1273 int ret; 1274 1275 if (is_udp(o) || is_tcp(o)) 1276 ret = fio_netio_setup_listen_inet(td, o->port); 1277 else 1278 ret = fio_netio_setup_listen_unix(td, td->o.filename); 1279 1280 if (ret) 1281 return ret; 1282 if (is_udp(o)) 1283 return 0; 1284 1285 if (listen(nd->listenfd, 10) < 0) { 1286 td_verror(td, errno, "listen"); 1287 nd->listenfd = -1; 1288 return 1; 1289 } 1290 1291 return 0; 1292 } 1293 1294 static int fio_netio_init(struct thread_data *td) 1295 { 1296 struct netio_options *o = td->eo; 1297 int ret; 1298 1299 #ifdef WIN32 1300 WSADATA wsd; 1301 WSAStartup(MAKEWORD(2,2), &wsd); 1302 #endif 1303 1304 if (td_random(td)) { 1305 log_err("fio: network IO can't be random\n"); 1306 return 1; 1307 } 1308 1309 if (o->proto == FIO_TYPE_UNIX && o->port) { 1310 log_err("fio: network IO port not valid with unix socket\n"); 1311 return 1; 1312 } else if (o->proto != FIO_TYPE_UNIX && !o->port) { 1313 log_err("fio: network IO requires port for tcp or udp\n"); 1314 return 1; 1315 } 1316 1317 o->port += td->subjob_number; 1318 1319 if (!is_tcp(o)) { 1320 if (o->listen) { 1321 log_err("fio: listen only valid for TCP proto IO\n"); 1322 return 1; 1323 } 1324 if (td_rw(td)) { 1325 log_err("fio: datagram network connections must be" 1326 " read OR write\n"); 1327 return 1; 1328 } 1329 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) { 1330 log_err("fio: UNIX sockets need host/filename\n"); 1331 return 1; 1332 } 1333 o->listen = td_read(td); 1334 } 1335 1336 if (o->listen) 1337 ret = fio_netio_setup_listen(td); 1338 else 1339 ret = fio_netio_setup_connect(td); 1340 1341 return ret; 1342 } 1343 1344 static void fio_netio_cleanup(struct thread_data *td) 1345 { 1346 struct netio_data *nd = td->io_ops->data; 1347 1348 if (nd) { 1349 if (nd->listenfd != -1) 1350 close(nd->listenfd); 1351 if (nd->pipes[0] != -1) 1352 close(nd->pipes[0]); 1353 if (nd->pipes[1] != -1) 1354 close(nd->pipes[1]); 1355 1356 free(nd); 1357 } 1358 } 1359 1360 static int fio_netio_setup(struct thread_data *td) 1361 { 1362 struct netio_data *nd; 1363 1364 if (!td->files_index) { 1365 add_file(td, td->o.filename ?: "net", 0, 0); 1366 td->o.nr_files = td->o.nr_files ?: 1; 1367 td->o.open_files++; 1368 } 1369 1370 if (!td->io_ops->data) { 1371 nd = malloc(sizeof(*nd));; 1372 1373 memset(nd, 0, sizeof(*nd)); 1374 nd->listenfd = -1; 1375 nd->pipes[0] = nd->pipes[1] = -1; 1376 td->io_ops->data = nd; 1377 } 1378 1379 return 0; 1380 } 1381 1382 static void fio_netio_terminate(struct thread_data *td) 1383 { 1384 kill(td->pid, SIGTERM); 1385 } 1386 1387 #ifdef CONFIG_LINUX_SPLICE 1388 static int fio_netio_setup_splice(struct thread_data *td) 1389 { 1390 struct netio_data *nd; 1391 1392 fio_netio_setup(td); 1393 1394 nd = td->io_ops->data; 1395 if (nd) { 1396 if (pipe(nd->pipes) < 0) 1397 return 1; 1398 1399 nd->use_splice = 1; 1400 return 0; 1401 } 1402 1403 return 1; 1404 } 1405 1406 static struct ioengine_ops ioengine_splice = { 1407 .name = "netsplice", 1408 .version = FIO_IOOPS_VERSION, 1409 .prep = fio_netio_prep, 1410 .queue = fio_netio_queue, 1411 .setup = fio_netio_setup_splice, 1412 .init = fio_netio_init, 1413 .cleanup = fio_netio_cleanup, 1414 .open_file = fio_netio_open_file, 1415 .close_file = fio_netio_close_file, 1416 .terminate = fio_netio_terminate, 1417 .options = options, 1418 .option_struct_size = sizeof(struct netio_options), 1419 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR | 1420 FIO_PIPEIO, 1421 }; 1422 #endif 1423 1424 static struct ioengine_ops ioengine_rw = { 1425 .name = "net", 1426 .version = FIO_IOOPS_VERSION, 1427 .prep = fio_netio_prep, 1428 .queue = fio_netio_queue, 1429 .setup = fio_netio_setup, 1430 .init = fio_netio_init, 1431 .cleanup = fio_netio_cleanup, 1432 .open_file = fio_netio_open_file, 1433 .close_file = fio_netio_close_file, 1434 .terminate = fio_netio_terminate, 1435 .options = options, 1436 .option_struct_size = sizeof(struct netio_options), 1437 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR | 1438 FIO_PIPEIO | FIO_BIT_BASED, 1439 }; 1440 1441 static int str_hostname_cb(void *data, const char *input) 1442 { 1443 struct netio_options *o = data; 1444 1445 if (o->td->o.filename) 1446 free(o->td->o.filename); 1447 o->td->o.filename = strdup(input); 1448 return 0; 1449 } 1450 1451 static void fio_init fio_netio_register(void) 1452 { 1453 register_ioengine(&ioengine_rw); 1454 #ifdef CONFIG_LINUX_SPLICE 1455 register_ioengine(&ioengine_splice); 1456 #endif 1457 } 1458 1459 static void fio_exit fio_netio_unregister(void) 1460 { 1461 unregister_ioengine(&ioengine_rw); 1462 #ifdef CONFIG_LINUX_SPLICE 1463 unregister_ioengine(&ioengine_splice); 1464 #endif 1465 } 1466