1 /* 2 * net engine 3 * 4 * IO engine that reads/writes to/from sockets. 5 * 6 */ 7 #include <stdio.h> 8 #include <stdlib.h> 9 #include <unistd.h> 10 #include <signal.h> 11 #include <errno.h> 12 #include <assert.h> 13 #include <netinet/in.h> 14 #include <netinet/tcp.h> 15 #include <arpa/inet.h> 16 #include <netdb.h> 17 #include <sys/poll.h> 18 #include <sys/types.h> 19 #include <sys/stat.h> 20 #include <sys/socket.h> 21 #include <sys/un.h> 22 23 #include "../fio.h" 24 #include "../verify.h" 25 #include "../optgroup.h" 26 27 struct netio_data { 28 int listenfd; 29 int use_splice; 30 int seq_off; 31 int pipes[2]; 32 struct sockaddr_in addr; 33 struct sockaddr_in6 addr6; 34 struct sockaddr_un addr_un; 35 uint64_t udp_send_seq; 36 uint64_t udp_recv_seq; 37 }; 38 39 struct netio_options { 40 struct thread_data *td; 41 unsigned int port; 42 unsigned int proto; 43 unsigned int listen; 44 unsigned int pingpong; 45 unsigned int nodelay; 46 unsigned int ttl; 47 unsigned int window_size; 48 unsigned int mss; 49 char *intfc; 50 }; 51 52 struct udp_close_msg { 53 uint32_t magic; 54 uint32_t cmd; 55 }; 56 57 struct udp_seq { 58 uint64_t magic; 59 uint64_t seq; 60 uint64_t bs; 61 }; 62 63 enum { 64 FIO_LINK_CLOSE = 0x89, 65 FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b, 66 FIO_LINK_OPEN = 0x98, 67 FIO_UDP_SEQ_MAGIC = 0x657375716e556563ULL, 68 69 FIO_TYPE_TCP = 1, 70 FIO_TYPE_UDP = 2, 71 FIO_TYPE_UNIX = 3, 72 FIO_TYPE_TCP_V6 = 4, 73 FIO_TYPE_UDP_V6 = 5, 74 }; 75 76 static int str_hostname_cb(void *data, const char *input); 77 static struct fio_option options[] = { 78 { 79 .name = "hostname", 80 .lname = "net engine hostname", 81 .type = FIO_OPT_STR_STORE, 82 .cb = str_hostname_cb, 83 .help = "Hostname for net IO engine", 84 .category = FIO_OPT_C_ENGINE, 85 .group = FIO_OPT_G_NETIO, 86 }, 87 { 88 .name = "port", 89 .lname = "net engine port", 90 .type = FIO_OPT_INT, 91 .off1 = offsetof(struct netio_options, port), 92 .minval = 1, 93 .maxval = 65535, 94 .help = "Port to use for TCP or UDP net connections", 95 .category = FIO_OPT_C_ENGINE, 96 .group = FIO_OPT_G_NETIO, 97 }, 98 { 99 .name = "protocol", 100 .lname = "net engine protocol", 101 .alias = "proto", 102 .type = FIO_OPT_STR, 103 .off1 = offsetof(struct netio_options, proto), 104 .help = "Network protocol to use", 105 .def = "tcp", 106 .posval = { 107 { .ival = "tcp", 108 .oval = FIO_TYPE_TCP, 109 .help = "Transmission Control Protocol", 110 }, 111 #ifdef CONFIG_IPV6 112 { .ival = "tcpv6", 113 .oval = FIO_TYPE_TCP_V6, 114 .help = "Transmission Control Protocol V6", 115 }, 116 #endif 117 { .ival = "udp", 118 .oval = FIO_TYPE_UDP, 119 .help = "User Datagram Protocol", 120 }, 121 #ifdef CONFIG_IPV6 122 { .ival = "udpv6", 123 .oval = FIO_TYPE_UDP_V6, 124 .help = "User Datagram Protocol V6", 125 }, 126 #endif 127 { .ival = "unix", 128 .oval = FIO_TYPE_UNIX, 129 .help = "UNIX domain socket", 130 }, 131 }, 132 .category = FIO_OPT_C_ENGINE, 133 .group = FIO_OPT_G_NETIO, 134 }, 135 #ifdef CONFIG_TCP_NODELAY 136 { 137 .name = "nodelay", 138 .lname = "No Delay", 139 .type = FIO_OPT_BOOL, 140 .off1 = offsetof(struct netio_options, nodelay), 141 .help = "Use TCP_NODELAY on TCP connections", 142 .category = FIO_OPT_C_ENGINE, 143 .group = FIO_OPT_G_NETIO, 144 }, 145 #endif 146 { 147 .name = "listen", 148 .lname = "net engine listen", 149 .type = FIO_OPT_STR_SET, 150 .off1 = offsetof(struct netio_options, listen), 151 .help = "Listen for incoming TCP connections", 152 .category = FIO_OPT_C_ENGINE, 153 .group = FIO_OPT_G_NETIO, 154 }, 155 { 156 .name = "pingpong", 157 .lname = "Ping Pong", 158 .type = FIO_OPT_STR_SET, 159 .off1 = offsetof(struct netio_options, pingpong), 160 .help = "Ping-pong IO requests", 161 .category = FIO_OPT_C_ENGINE, 162 .group = FIO_OPT_G_NETIO, 163 }, 164 { 165 .name = "interface", 166 .lname = "net engine interface", 167 .type = FIO_OPT_STR_STORE, 168 .off1 = offsetof(struct netio_options, intfc), 169 .help = "Network interface to use", 170 .category = FIO_OPT_C_ENGINE, 171 .group = FIO_OPT_G_NETIO, 172 }, 173 { 174 .name = "ttl", 175 .lname = "net engine multicast ttl", 176 .type = FIO_OPT_INT, 177 .off1 = offsetof(struct netio_options, ttl), 178 .def = "1", 179 .minval = 0, 180 .help = "Time-to-live value for outgoing UDP multicast packets", 181 .category = FIO_OPT_C_ENGINE, 182 .group = FIO_OPT_G_NETIO, 183 }, 184 #ifdef CONFIG_NET_WINDOWSIZE 185 { 186 .name = "window_size", 187 .lname = "Window Size", 188 .type = FIO_OPT_INT, 189 .off1 = offsetof(struct netio_options, window_size), 190 .minval = 0, 191 .help = "Set socket buffer window size", 192 .category = FIO_OPT_C_ENGINE, 193 .group = FIO_OPT_G_NETIO, 194 }, 195 #endif 196 #ifdef CONFIG_NET_MSS 197 { 198 .name = "mss", 199 .lname = "Maximum segment size", 200 .type = FIO_OPT_INT, 201 .off1 = offsetof(struct netio_options, mss), 202 .minval = 0, 203 .help = "Set TCP maximum segment size", 204 .category = FIO_OPT_C_ENGINE, 205 .group = FIO_OPT_G_NETIO, 206 }, 207 #endif 208 { 209 .name = NULL, 210 }, 211 }; 212 213 static inline int is_udp(struct netio_options *o) 214 { 215 return o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_UDP_V6; 216 } 217 218 static inline int is_tcp(struct netio_options *o) 219 { 220 return o->proto == FIO_TYPE_TCP || o->proto == FIO_TYPE_TCP_V6; 221 } 222 223 static inline int is_ipv6(struct netio_options *o) 224 { 225 return o->proto == FIO_TYPE_UDP_V6 || o->proto == FIO_TYPE_TCP_V6; 226 } 227 228 static int set_window_size(struct thread_data *td, int fd) 229 { 230 #ifdef CONFIG_NET_WINDOWSIZE 231 struct netio_options *o = td->eo; 232 unsigned int wss; 233 int snd, rcv, ret; 234 235 if (!o->window_size) 236 return 0; 237 238 rcv = o->listen || o->pingpong; 239 snd = !o->listen || o->pingpong; 240 wss = o->window_size; 241 ret = 0; 242 243 if (rcv) { 244 ret = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *) &wss, 245 sizeof(wss)); 246 if (ret < 0) 247 td_verror(td, errno, "rcvbuf window size"); 248 } 249 if (snd && !ret) { 250 ret = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *) &wss, 251 sizeof(wss)); 252 if (ret < 0) 253 td_verror(td, errno, "sndbuf window size"); 254 } 255 256 return ret; 257 #else 258 td_verror(td, -EINVAL, "setsockopt window size"); 259 return -1; 260 #endif 261 } 262 263 static int set_mss(struct thread_data *td, int fd) 264 { 265 #ifdef CONFIG_NET_MSS 266 struct netio_options *o = td->eo; 267 unsigned int mss; 268 int ret; 269 270 if (!o->mss || !is_tcp(o)) 271 return 0; 272 273 mss = o->mss; 274 ret = setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, (void *) &mss, 275 sizeof(mss)); 276 if (ret < 0) 277 td_verror(td, errno, "setsockopt TCP_MAXSEG"); 278 279 return ret; 280 #else 281 td_verror(td, -EINVAL, "setsockopt TCP_MAXSEG"); 282 return -1; 283 #endif 284 } 285 286 287 /* 288 * Return -1 for error and 'nr events' for a positive number 289 * of events 290 */ 291 static int poll_wait(struct thread_data *td, int fd, short events) 292 { 293 struct pollfd pfd; 294 int ret; 295 296 while (!td->terminate) { 297 pfd.fd = fd; 298 pfd.events = events; 299 ret = poll(&pfd, 1, -1); 300 if (ret < 0) { 301 if (errno == EINTR) 302 break; 303 304 td_verror(td, errno, "poll"); 305 return -1; 306 } else if (!ret) 307 continue; 308 309 break; 310 } 311 312 if (pfd.revents & events) 313 return 1; 314 315 return -1; 316 } 317 318 static int fio_netio_is_multicast(const char *mcaddr) 319 { 320 in_addr_t addr = inet_network(mcaddr); 321 if (addr == -1) 322 return 0; 323 324 if (inet_network("224.0.0.0") <= addr && 325 inet_network("239.255.255.255") >= addr) 326 return 1; 327 328 return 0; 329 } 330 331 332 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u) 333 { 334 struct netio_options *o = td->eo; 335 336 /* 337 * Make sure we don't see spurious reads to a receiver, and vice versa 338 */ 339 if (is_tcp(o)) 340 return 0; 341 342 if ((o->listen && io_u->ddir == DDIR_WRITE) || 343 (!o->listen && io_u->ddir == DDIR_READ)) { 344 td_verror(td, EINVAL, "bad direction"); 345 return 1; 346 } 347 348 return 0; 349 } 350 351 #ifdef CONFIG_LINUX_SPLICE 352 static int splice_io_u(int fdin, int fdout, unsigned int len) 353 { 354 int bytes = 0; 355 356 while (len) { 357 int ret = splice(fdin, NULL, fdout, NULL, len, 0); 358 359 if (ret < 0) { 360 if (!bytes) 361 bytes = ret; 362 363 break; 364 } else if (!ret) 365 break; 366 367 bytes += ret; 368 len -= ret; 369 } 370 371 return bytes; 372 } 373 374 /* 375 * Receive bytes from a socket and fill them into the internal pipe 376 */ 377 static int splice_in(struct thread_data *td, struct io_u *io_u) 378 { 379 struct netio_data *nd = td->io_ops_data; 380 381 return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen); 382 } 383 384 /* 385 * Transmit 'len' bytes from the internal pipe 386 */ 387 static int splice_out(struct thread_data *td, struct io_u *io_u, 388 unsigned int len) 389 { 390 struct netio_data *nd = td->io_ops_data; 391 392 return splice_io_u(nd->pipes[0], io_u->file->fd, len); 393 } 394 395 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len) 396 { 397 struct iovec iov = { 398 .iov_base = io_u->xfer_buf, 399 .iov_len = len, 400 }; 401 int bytes = 0; 402 403 while (iov.iov_len) { 404 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE); 405 406 if (ret < 0) { 407 if (!bytes) 408 bytes = ret; 409 break; 410 } else if (!ret) 411 break; 412 413 iov.iov_len -= ret; 414 iov.iov_base += ret; 415 bytes += ret; 416 } 417 418 return bytes; 419 420 } 421 422 /* 423 * vmsplice() pipe to io_u buffer 424 */ 425 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u, 426 unsigned int len) 427 { 428 struct netio_data *nd = td->io_ops_data; 429 430 return vmsplice_io_u(io_u, nd->pipes[0], len); 431 } 432 433 /* 434 * vmsplice() io_u to pipe 435 */ 436 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u) 437 { 438 struct netio_data *nd = td->io_ops_data; 439 440 return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen); 441 } 442 443 /* 444 * splice receive - transfer socket data into a pipe using splice, then map 445 * that pipe data into the io_u using vmsplice. 446 */ 447 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u) 448 { 449 int ret; 450 451 ret = splice_in(td, io_u); 452 if (ret > 0) 453 return vmsplice_io_u_out(td, io_u, ret); 454 455 return ret; 456 } 457 458 /* 459 * splice transmit - map data from the io_u into a pipe by using vmsplice, 460 * then transfer that pipe to a socket using splice. 461 */ 462 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u) 463 { 464 int ret; 465 466 ret = vmsplice_io_u_in(td, io_u); 467 if (ret > 0) 468 return splice_out(td, io_u, ret); 469 470 return ret; 471 } 472 #else 473 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u) 474 { 475 errno = EOPNOTSUPP; 476 return -1; 477 } 478 479 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u) 480 { 481 errno = EOPNOTSUPP; 482 return -1; 483 } 484 #endif 485 486 static void store_udp_seq(struct netio_data *nd, struct io_u *io_u) 487 { 488 struct udp_seq *us; 489 490 if (io_u->xfer_buflen < sizeof(*us)) 491 return; 492 493 us = io_u->xfer_buf + io_u->xfer_buflen - sizeof(*us); 494 us->magic = cpu_to_le64((uint64_t) FIO_UDP_SEQ_MAGIC); 495 us->bs = cpu_to_le64((uint64_t) io_u->xfer_buflen); 496 us->seq = cpu_to_le64(nd->udp_send_seq++); 497 } 498 499 static void verify_udp_seq(struct thread_data *td, struct netio_data *nd, 500 struct io_u *io_u) 501 { 502 struct udp_seq *us; 503 uint64_t seq; 504 505 if (io_u->xfer_buflen < sizeof(*us)) 506 return; 507 508 if (nd->seq_off) 509 return; 510 511 us = io_u->xfer_buf + io_u->xfer_buflen - sizeof(*us); 512 if (le64_to_cpu(us->magic) != FIO_UDP_SEQ_MAGIC) 513 return; 514 if (le64_to_cpu(us->bs) != io_u->xfer_buflen) { 515 nd->seq_off = 1; 516 return; 517 } 518 519 seq = le64_to_cpu(us->seq); 520 521 if (seq != nd->udp_recv_seq) 522 td->ts.drop_io_u[io_u->ddir] += seq - nd->udp_recv_seq; 523 524 nd->udp_recv_seq = seq + 1; 525 } 526 527 static int fio_netio_send(struct thread_data *td, struct io_u *io_u) 528 { 529 struct netio_data *nd = td->io_ops_data; 530 struct netio_options *o = td->eo; 531 int ret, flags = 0; 532 533 do { 534 if (is_udp(o)) { 535 const struct sockaddr *to; 536 socklen_t len; 537 538 if (is_ipv6(o)) { 539 to = (struct sockaddr *) &nd->addr6; 540 len = sizeof(nd->addr6); 541 } else { 542 to = (struct sockaddr *) &nd->addr; 543 len = sizeof(nd->addr); 544 } 545 546 if (td->o.verify == VERIFY_NONE) 547 store_udp_seq(nd, io_u); 548 549 ret = sendto(io_u->file->fd, io_u->xfer_buf, 550 io_u->xfer_buflen, flags, to, len); 551 } else { 552 /* 553 * if we are going to write more, set MSG_MORE 554 */ 555 #ifdef MSG_MORE 556 if ((td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < 557 td->o.size) && !o->pingpong) 558 flags |= MSG_MORE; 559 #endif 560 ret = send(io_u->file->fd, io_u->xfer_buf, 561 io_u->xfer_buflen, flags); 562 } 563 if (ret > 0) 564 break; 565 566 ret = poll_wait(td, io_u->file->fd, POLLOUT); 567 if (ret <= 0) 568 break; 569 } while (1); 570 571 return ret; 572 } 573 574 static int is_close_msg(struct io_u *io_u, int len) 575 { 576 struct udp_close_msg *msg; 577 578 if (len != sizeof(struct udp_close_msg)) 579 return 0; 580 581 msg = io_u->xfer_buf; 582 if (le32_to_cpu(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC) 583 return 0; 584 if (le32_to_cpu(msg->cmd) != FIO_LINK_CLOSE) 585 return 0; 586 587 return 1; 588 } 589 590 static int fio_netio_recv(struct thread_data *td, struct io_u *io_u) 591 { 592 struct netio_data *nd = td->io_ops_data; 593 struct netio_options *o = td->eo; 594 int ret, flags = 0; 595 596 do { 597 if (is_udp(o)) { 598 struct sockaddr *from; 599 socklen_t l, *len = &l; 600 601 if (o->listen) { 602 if (!is_ipv6(o)) { 603 from = (struct sockaddr *) &nd->addr; 604 *len = sizeof(nd->addr); 605 } else { 606 from = (struct sockaddr *) &nd->addr6; 607 *len = sizeof(nd->addr6); 608 } 609 } else { 610 from = NULL; 611 len = NULL; 612 } 613 614 ret = recvfrom(io_u->file->fd, io_u->xfer_buf, 615 io_u->xfer_buflen, flags, from, len); 616 617 if (is_close_msg(io_u, ret)) { 618 td->done = 1; 619 return 0; 620 } 621 } else { 622 ret = recv(io_u->file->fd, io_u->xfer_buf, 623 io_u->xfer_buflen, flags); 624 625 if (is_close_msg(io_u, ret)) { 626 td->done = 1; 627 return 0; 628 } 629 } 630 if (ret > 0) 631 break; 632 else if (!ret && (flags & MSG_WAITALL)) 633 break; 634 635 ret = poll_wait(td, io_u->file->fd, POLLIN); 636 if (ret <= 0) 637 break; 638 flags |= MSG_WAITALL; 639 } while (1); 640 641 if (is_udp(o) && td->o.verify == VERIFY_NONE) 642 verify_udp_seq(td, nd, io_u); 643 644 return ret; 645 } 646 647 static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u, 648 enum fio_ddir ddir) 649 { 650 struct netio_data *nd = td->io_ops_data; 651 struct netio_options *o = td->eo; 652 int ret; 653 654 if (ddir == DDIR_WRITE) { 655 if (!nd->use_splice || is_udp(o) || 656 o->proto == FIO_TYPE_UNIX) 657 ret = fio_netio_send(td, io_u); 658 else 659 ret = fio_netio_splice_out(td, io_u); 660 } else if (ddir == DDIR_READ) { 661 if (!nd->use_splice || is_udp(o) || 662 o->proto == FIO_TYPE_UNIX) 663 ret = fio_netio_recv(td, io_u); 664 else 665 ret = fio_netio_splice_in(td, io_u); 666 } else 667 ret = 0; /* must be a SYNC */ 668 669 if (ret != (int) io_u->xfer_buflen) { 670 if (ret > 0) { 671 io_u->resid = io_u->xfer_buflen - ret; 672 io_u->error = 0; 673 return FIO_Q_COMPLETED; 674 } else if (!ret) 675 return FIO_Q_BUSY; 676 else { 677 int err = errno; 678 679 if (ddir == DDIR_WRITE && err == EMSGSIZE) 680 return FIO_Q_BUSY; 681 682 io_u->error = err; 683 } 684 } 685 686 if (io_u->error) 687 td_verror(td, io_u->error, "xfer"); 688 689 return FIO_Q_COMPLETED; 690 } 691 692 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u) 693 { 694 struct netio_options *o = td->eo; 695 int ret; 696 697 fio_ro_check(td, io_u); 698 699 ret = __fio_netio_queue(td, io_u, io_u->ddir); 700 if (!o->pingpong || ret != FIO_Q_COMPLETED) 701 return ret; 702 703 /* 704 * For ping-pong mode, receive or send reply as needed 705 */ 706 if (td_read(td) && io_u->ddir == DDIR_READ) 707 ret = __fio_netio_queue(td, io_u, DDIR_WRITE); 708 else if (td_write(td) && io_u->ddir == DDIR_WRITE) 709 ret = __fio_netio_queue(td, io_u, DDIR_READ); 710 711 return ret; 712 } 713 714 static int fio_netio_connect(struct thread_data *td, struct fio_file *f) 715 { 716 struct netio_data *nd = td->io_ops_data; 717 struct netio_options *o = td->eo; 718 int type, domain; 719 720 if (o->proto == FIO_TYPE_TCP) { 721 domain = AF_INET; 722 type = SOCK_STREAM; 723 } else if (o->proto == FIO_TYPE_TCP_V6) { 724 domain = AF_INET6; 725 type = SOCK_STREAM; 726 } else if (o->proto == FIO_TYPE_UDP) { 727 domain = AF_INET; 728 type = SOCK_DGRAM; 729 } else if (o->proto == FIO_TYPE_UDP_V6) { 730 domain = AF_INET6; 731 type = SOCK_DGRAM; 732 } else if (o->proto == FIO_TYPE_UNIX) { 733 domain = AF_UNIX; 734 type = SOCK_STREAM; 735 } else { 736 log_err("fio: bad network type %d\n", o->proto); 737 f->fd = -1; 738 return 1; 739 } 740 741 f->fd = socket(domain, type, 0); 742 if (f->fd < 0) { 743 td_verror(td, errno, "socket"); 744 return 1; 745 } 746 747 #ifdef CONFIG_TCP_NODELAY 748 if (o->nodelay && is_tcp(o)) { 749 int optval = 1; 750 751 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) { 752 log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno)); 753 return 1; 754 } 755 } 756 #endif 757 758 if (set_window_size(td, f->fd)) { 759 close(f->fd); 760 return 1; 761 } 762 if (set_mss(td, f->fd)) { 763 close(f->fd); 764 return 1; 765 } 766 767 if (is_udp(o)) { 768 if (!fio_netio_is_multicast(td->o.filename)) 769 return 0; 770 if (is_ipv6(o)) { 771 log_err("fio: multicast not supported on IPv6\n"); 772 close(f->fd); 773 return 1; 774 } 775 776 if (o->intfc) { 777 struct in_addr interface_addr; 778 779 if (inet_aton(o->intfc, &interface_addr) == 0) { 780 log_err("fio: interface not valid interface IP\n"); 781 close(f->fd); 782 return 1; 783 } 784 if (setsockopt(f->fd, IPPROTO_IP, IP_MULTICAST_IF, (const char*)&interface_addr, sizeof(interface_addr)) < 0) { 785 td_verror(td, errno, "setsockopt IP_MULTICAST_IF"); 786 close(f->fd); 787 return 1; 788 } 789 } 790 if (setsockopt(f->fd, IPPROTO_IP, IP_MULTICAST_TTL, (const char*)&o->ttl, sizeof(o->ttl)) < 0) { 791 td_verror(td, errno, "setsockopt IP_MULTICAST_TTL"); 792 close(f->fd); 793 return 1; 794 } 795 return 0; 796 } else if (o->proto == FIO_TYPE_TCP) { 797 socklen_t len = sizeof(nd->addr); 798 799 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) { 800 td_verror(td, errno, "connect"); 801 close(f->fd); 802 return 1; 803 } 804 } else if (o->proto == FIO_TYPE_TCP_V6) { 805 socklen_t len = sizeof(nd->addr6); 806 807 if (connect(f->fd, (struct sockaddr *) &nd->addr6, len) < 0) { 808 td_verror(td, errno, "connect"); 809 close(f->fd); 810 return 1; 811 } 812 813 } else { 814 struct sockaddr_un *addr = &nd->addr_un; 815 socklen_t len; 816 817 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1; 818 819 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) { 820 td_verror(td, errno, "connect"); 821 close(f->fd); 822 return 1; 823 } 824 } 825 826 return 0; 827 } 828 829 static int fio_netio_accept(struct thread_data *td, struct fio_file *f) 830 { 831 struct netio_data *nd = td->io_ops_data; 832 struct netio_options *o = td->eo; 833 socklen_t socklen; 834 int state; 835 836 if (is_udp(o)) { 837 f->fd = nd->listenfd; 838 return 0; 839 } 840 841 state = td->runstate; 842 td_set_runstate(td, TD_SETTING_UP); 843 844 log_info("fio: waiting for connection\n"); 845 846 if (poll_wait(td, nd->listenfd, POLLIN) < 0) 847 goto err; 848 849 if (o->proto == FIO_TYPE_TCP) { 850 socklen = sizeof(nd->addr); 851 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen); 852 } else { 853 socklen = sizeof(nd->addr6); 854 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr6, &socklen); 855 } 856 857 if (f->fd < 0) { 858 td_verror(td, errno, "accept"); 859 goto err; 860 } 861 862 #ifdef CONFIG_TCP_NODELAY 863 if (o->nodelay && is_tcp(o)) { 864 int optval = 1; 865 866 if (setsockopt(f->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &optval, sizeof(int)) < 0) { 867 log_err("fio: cannot set TCP_NODELAY option on socket (%s), disable with 'nodelay=0'\n", strerror(errno)); 868 return 1; 869 } 870 } 871 #endif 872 873 reset_all_stats(td); 874 td_set_runstate(td, state); 875 return 0; 876 err: 877 td_set_runstate(td, state); 878 return 1; 879 } 880 881 static void fio_netio_send_close(struct thread_data *td, struct fio_file *f) 882 { 883 struct netio_data *nd = td->io_ops_data; 884 struct netio_options *o = td->eo; 885 struct udp_close_msg msg; 886 struct sockaddr *to; 887 socklen_t len; 888 int ret; 889 890 if (is_ipv6(o)) { 891 to = (struct sockaddr *) &nd->addr6; 892 len = sizeof(nd->addr6); 893 } else { 894 to = (struct sockaddr *) &nd->addr; 895 len = sizeof(nd->addr); 896 } 897 898 msg.magic = cpu_to_le32((uint32_t) FIO_LINK_OPEN_CLOSE_MAGIC); 899 msg.cmd = cpu_to_le32((uint32_t) FIO_LINK_CLOSE); 900 901 ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, len); 902 if (ret < 0) 903 td_verror(td, errno, "sendto udp link close"); 904 } 905 906 static int fio_netio_close_file(struct thread_data *td, struct fio_file *f) 907 { 908 /* 909 * Notify the receiver that we are closing down the link 910 */ 911 fio_netio_send_close(td, f); 912 913 return generic_close_file(td, f); 914 } 915 916 static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f) 917 { 918 struct netio_data *nd = td->io_ops_data; 919 struct netio_options *o = td->eo; 920 struct udp_close_msg msg; 921 struct sockaddr *to; 922 socklen_t len; 923 int ret; 924 925 if (is_ipv6(o)) { 926 len = sizeof(nd->addr6); 927 to = (struct sockaddr *) &nd->addr6; 928 } else { 929 len = sizeof(nd->addr); 930 to = (struct sockaddr *) &nd->addr; 931 } 932 933 ret = recvfrom(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, &len); 934 if (ret < 0) { 935 td_verror(td, errno, "recvfrom udp link open"); 936 return ret; 937 } 938 939 if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC || 940 ntohl(msg.cmd) != FIO_LINK_OPEN) { 941 log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic), 942 ntohl(msg.cmd)); 943 return -1; 944 } 945 946 fio_gettime(&td->start, NULL); 947 return 0; 948 } 949 950 static int fio_netio_send_open(struct thread_data *td, struct fio_file *f) 951 { 952 struct netio_data *nd = td->io_ops_data; 953 struct netio_options *o = td->eo; 954 struct udp_close_msg msg; 955 struct sockaddr *to; 956 socklen_t len; 957 int ret; 958 959 if (is_ipv6(o)) { 960 len = sizeof(nd->addr6); 961 to = (struct sockaddr *) &nd->addr6; 962 } else { 963 len = sizeof(nd->addr); 964 to = (struct sockaddr *) &nd->addr; 965 } 966 967 msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC); 968 msg.cmd = htonl(FIO_LINK_OPEN); 969 970 ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, len); 971 if (ret < 0) { 972 td_verror(td, errno, "sendto udp link open"); 973 return ret; 974 } 975 976 return 0; 977 } 978 979 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f) 980 { 981 int ret; 982 struct netio_options *o = td->eo; 983 984 if (o->listen) 985 ret = fio_netio_accept(td, f); 986 else 987 ret = fio_netio_connect(td, f); 988 989 if (ret) { 990 f->fd = -1; 991 return ret; 992 } 993 994 if (is_udp(o)) { 995 if (td_write(td)) 996 ret = fio_netio_send_open(td, f); 997 else { 998 int state; 999 1000 state = td->runstate; 1001 td_set_runstate(td, TD_SETTING_UP); 1002 ret = fio_netio_udp_recv_open(td, f); 1003 td_set_runstate(td, state); 1004 } 1005 } 1006 1007 if (ret) 1008 fio_netio_close_file(td, f); 1009 1010 return ret; 1011 } 1012 1013 static int fio_fill_addr(struct thread_data *td, const char *host, int af, 1014 void *dst, struct addrinfo **res) 1015 { 1016 struct netio_options *o = td->eo; 1017 struct addrinfo hints; 1018 int ret; 1019 1020 if (inet_pton(af, host, dst)) 1021 return 0; 1022 1023 memset(&hints, 0, sizeof(hints)); 1024 1025 if (is_tcp(o)) 1026 hints.ai_socktype = SOCK_STREAM; 1027 else 1028 hints.ai_socktype = SOCK_DGRAM; 1029 1030 if (is_ipv6(o)) 1031 hints.ai_family = AF_INET6; 1032 else 1033 hints.ai_family = AF_INET; 1034 1035 ret = getaddrinfo(host, NULL, &hints, res); 1036 if (ret) { 1037 int e = EINVAL; 1038 char str[128]; 1039 1040 if (ret == EAI_SYSTEM) 1041 e = errno; 1042 1043 snprintf(str, sizeof(str), "getaddrinfo: %s", gai_strerror(ret)); 1044 td_verror(td, e, str); 1045 return 1; 1046 } 1047 1048 return 0; 1049 } 1050 1051 static int fio_netio_setup_connect_inet(struct thread_data *td, 1052 const char *host, unsigned short port) 1053 { 1054 struct netio_data *nd = td->io_ops_data; 1055 struct netio_options *o = td->eo; 1056 struct addrinfo *res = NULL; 1057 void *dst, *src; 1058 int af, len; 1059 1060 if (!host) { 1061 log_err("fio: connect with no host to connect to.\n"); 1062 if (td_read(td)) 1063 log_err("fio: did you forget to set 'listen'?\n"); 1064 1065 td_verror(td, EINVAL, "no hostname= set"); 1066 return 1; 1067 } 1068 1069 nd->addr.sin_family = AF_INET; 1070 nd->addr.sin_port = htons(port); 1071 nd->addr6.sin6_family = AF_INET6; 1072 nd->addr6.sin6_port = htons(port); 1073 1074 if (is_ipv6(o)) { 1075 af = AF_INET6; 1076 dst = &nd->addr6.sin6_addr; 1077 } else { 1078 af = AF_INET; 1079 dst = &nd->addr.sin_addr; 1080 } 1081 1082 if (fio_fill_addr(td, host, af, dst, &res)) 1083 return 1; 1084 1085 if (!res) 1086 return 0; 1087 1088 if (is_ipv6(o)) { 1089 len = sizeof(nd->addr6.sin6_addr); 1090 src = &((struct sockaddr_in6 *) res->ai_addr)->sin6_addr; 1091 } else { 1092 len = sizeof(nd->addr.sin_addr); 1093 src = &((struct sockaddr_in *) res->ai_addr)->sin_addr; 1094 } 1095 1096 memcpy(dst, src, len); 1097 freeaddrinfo(res); 1098 return 0; 1099 } 1100 1101 static int fio_netio_setup_connect_unix(struct thread_data *td, 1102 const char *path) 1103 { 1104 struct netio_data *nd = td->io_ops_data; 1105 struct sockaddr_un *soun = &nd->addr_un; 1106 1107 soun->sun_family = AF_UNIX; 1108 memset(soun->sun_path, 0, sizeof(soun->sun_path)); 1109 strncpy(soun->sun_path, path, sizeof(soun->sun_path) - 1); 1110 return 0; 1111 } 1112 1113 static int fio_netio_setup_connect(struct thread_data *td) 1114 { 1115 struct netio_options *o = td->eo; 1116 1117 if (is_udp(o) || is_tcp(o)) 1118 return fio_netio_setup_connect_inet(td, td->o.filename,o->port); 1119 else 1120 return fio_netio_setup_connect_unix(td, td->o.filename); 1121 } 1122 1123 static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path) 1124 { 1125 struct netio_data *nd = td->io_ops_data; 1126 struct sockaddr_un *addr = &nd->addr_un; 1127 mode_t mode; 1128 int len, fd; 1129 1130 fd = socket(AF_UNIX, SOCK_STREAM, 0); 1131 if (fd < 0) { 1132 log_err("fio: socket: %s\n", strerror(errno)); 1133 return -1; 1134 } 1135 1136 mode = umask(000); 1137 1138 memset(addr, 0, sizeof(*addr)); 1139 addr->sun_family = AF_UNIX; 1140 strncpy(addr->sun_path, path, sizeof(addr->sun_path) - 1); 1141 unlink(path); 1142 1143 len = sizeof(addr->sun_family) + strlen(path) + 1; 1144 1145 if (bind(fd, (struct sockaddr *) addr, len) < 0) { 1146 log_err("fio: bind: %s\n", strerror(errno)); 1147 close(fd); 1148 return -1; 1149 } 1150 1151 umask(mode); 1152 nd->listenfd = fd; 1153 return 0; 1154 } 1155 1156 static int fio_netio_setup_listen_inet(struct thread_data *td, short port) 1157 { 1158 struct netio_data *nd = td->io_ops_data; 1159 struct netio_options *o = td->eo; 1160 struct ip_mreq mr; 1161 struct sockaddr_in sin; 1162 struct sockaddr *saddr; 1163 int fd, opt, type, domain; 1164 socklen_t len; 1165 1166 memset(&sin, 0, sizeof(sin)); 1167 1168 if (o->proto == FIO_TYPE_TCP) { 1169 type = SOCK_STREAM; 1170 domain = AF_INET; 1171 } else if (o->proto == FIO_TYPE_TCP_V6) { 1172 type = SOCK_STREAM; 1173 domain = AF_INET6; 1174 } else if (o->proto == FIO_TYPE_UDP) { 1175 type = SOCK_DGRAM; 1176 domain = AF_INET; 1177 } else if (o->proto == FIO_TYPE_UDP_V6) { 1178 type = SOCK_DGRAM; 1179 domain = AF_INET6; 1180 } else { 1181 log_err("fio: unknown proto %d\n", o->proto); 1182 return 1; 1183 } 1184 1185 fd = socket(domain, type, 0); 1186 if (fd < 0) { 1187 td_verror(td, errno, "socket"); 1188 return 1; 1189 } 1190 1191 opt = 1; 1192 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *) &opt, sizeof(opt)) < 0) { 1193 td_verror(td, errno, "setsockopt"); 1194 close(fd); 1195 return 1; 1196 } 1197 #ifdef SO_REUSEPORT 1198 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (void *) &opt, sizeof(opt)) < 0) { 1199 td_verror(td, errno, "setsockopt"); 1200 close(fd); 1201 return 1; 1202 } 1203 #endif 1204 1205 if (set_window_size(td, fd)) { 1206 close(fd); 1207 return 1; 1208 } 1209 if (set_mss(td, fd)) { 1210 close(fd); 1211 return 1; 1212 } 1213 1214 if (td->o.filename) { 1215 if (!is_udp(o) || !fio_netio_is_multicast(td->o.filename)) { 1216 log_err("fio: hostname not valid for non-multicast inbound network IO\n"); 1217 close(fd); 1218 return 1; 1219 } 1220 if (is_ipv6(o)) { 1221 log_err("fio: IPv6 not supported for multicast network IO\n"); 1222 close(fd); 1223 return 1; 1224 } 1225 1226 inet_aton(td->o.filename, &sin.sin_addr); 1227 1228 mr.imr_multiaddr = sin.sin_addr; 1229 if (o->intfc) { 1230 if (inet_aton(o->intfc, &mr.imr_interface) == 0) { 1231 log_err("fio: interface not valid interface IP\n"); 1232 close(fd); 1233 return 1; 1234 } 1235 } else { 1236 mr.imr_interface.s_addr = htonl(INADDR_ANY); 1237 } 1238 1239 if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const char*)&mr, sizeof(mr)) < 0) { 1240 td_verror(td, errno, "setsockopt IP_ADD_MEMBERSHIP"); 1241 close(fd); 1242 return 1; 1243 } 1244 } 1245 1246 if (!is_ipv6(o)) { 1247 saddr = (struct sockaddr *) &nd->addr; 1248 len = sizeof(nd->addr); 1249 1250 nd->addr.sin_family = AF_INET; 1251 nd->addr.sin_addr.s_addr = sin.sin_addr.s_addr ? sin.sin_addr.s_addr : htonl(INADDR_ANY); 1252 nd->addr.sin_port = htons(port); 1253 } else { 1254 saddr = (struct sockaddr *) &nd->addr6; 1255 len = sizeof(nd->addr6); 1256 1257 nd->addr6.sin6_family = AF_INET6; 1258 nd->addr6.sin6_addr = in6addr_any; 1259 nd->addr6.sin6_port = htons(port); 1260 } 1261 1262 if (bind(fd, saddr, len) < 0) { 1263 close(fd); 1264 td_verror(td, errno, "bind"); 1265 return 1; 1266 } 1267 1268 nd->listenfd = fd; 1269 return 0; 1270 } 1271 1272 static int fio_netio_setup_listen(struct thread_data *td) 1273 { 1274 struct netio_data *nd = td->io_ops_data; 1275 struct netio_options *o = td->eo; 1276 int ret; 1277 1278 if (is_udp(o) || is_tcp(o)) 1279 ret = fio_netio_setup_listen_inet(td, o->port); 1280 else 1281 ret = fio_netio_setup_listen_unix(td, td->o.filename); 1282 1283 if (ret) 1284 return ret; 1285 if (is_udp(o)) 1286 return 0; 1287 1288 if (listen(nd->listenfd, 10) < 0) { 1289 td_verror(td, errno, "listen"); 1290 nd->listenfd = -1; 1291 return 1; 1292 } 1293 1294 return 0; 1295 } 1296 1297 static int fio_netio_init(struct thread_data *td) 1298 { 1299 struct netio_options *o = td->eo; 1300 int ret; 1301 1302 #ifdef WIN32 1303 WSADATA wsd; 1304 WSAStartup(MAKEWORD(2,2), &wsd); 1305 #endif 1306 1307 if (td_random(td)) { 1308 log_err("fio: network IO can't be random\n"); 1309 return 1; 1310 } 1311 1312 if (o->proto == FIO_TYPE_UNIX && o->port) { 1313 log_err("fio: network IO port not valid with unix socket\n"); 1314 return 1; 1315 } else if (o->proto != FIO_TYPE_UNIX && !o->port) { 1316 log_err("fio: network IO requires port for tcp or udp\n"); 1317 return 1; 1318 } 1319 1320 o->port += td->subjob_number; 1321 1322 if (!is_tcp(o)) { 1323 if (o->listen) { 1324 log_err("fio: listen only valid for TCP proto IO\n"); 1325 return 1; 1326 } 1327 if (td_rw(td)) { 1328 log_err("fio: datagram network connections must be" 1329 " read OR write\n"); 1330 return 1; 1331 } 1332 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) { 1333 log_err("fio: UNIX sockets need host/filename\n"); 1334 return 1; 1335 } 1336 o->listen = td_read(td); 1337 } 1338 1339 if (o->listen) 1340 ret = fio_netio_setup_listen(td); 1341 else 1342 ret = fio_netio_setup_connect(td); 1343 1344 return ret; 1345 } 1346 1347 static void fio_netio_cleanup(struct thread_data *td) 1348 { 1349 struct netio_data *nd = td->io_ops_data; 1350 1351 if (nd) { 1352 if (nd->listenfd != -1) 1353 close(nd->listenfd); 1354 if (nd->pipes[0] != -1) 1355 close(nd->pipes[0]); 1356 if (nd->pipes[1] != -1) 1357 close(nd->pipes[1]); 1358 1359 free(nd); 1360 } 1361 } 1362 1363 static int fio_netio_setup(struct thread_data *td) 1364 { 1365 struct netio_data *nd; 1366 1367 if (!td->files_index) { 1368 add_file(td, td->o.filename ?: "net", 0, 0); 1369 td->o.nr_files = td->o.nr_files ?: 1; 1370 td->o.open_files++; 1371 } 1372 1373 if (!td->io_ops_data) { 1374 nd = malloc(sizeof(*nd)); 1375 1376 memset(nd, 0, sizeof(*nd)); 1377 nd->listenfd = -1; 1378 nd->pipes[0] = nd->pipes[1] = -1; 1379 td->io_ops_data = nd; 1380 } 1381 1382 return 0; 1383 } 1384 1385 static void fio_netio_terminate(struct thread_data *td) 1386 { 1387 kill(td->pid, SIGTERM); 1388 } 1389 1390 #ifdef CONFIG_LINUX_SPLICE 1391 static int fio_netio_setup_splice(struct thread_data *td) 1392 { 1393 struct netio_data *nd; 1394 1395 fio_netio_setup(td); 1396 1397 nd = td->io_ops_data; 1398 if (nd) { 1399 if (pipe(nd->pipes) < 0) 1400 return 1; 1401 1402 nd->use_splice = 1; 1403 return 0; 1404 } 1405 1406 return 1; 1407 } 1408 1409 static struct ioengine_ops ioengine_splice = { 1410 .name = "netsplice", 1411 .version = FIO_IOOPS_VERSION, 1412 .prep = fio_netio_prep, 1413 .queue = fio_netio_queue, 1414 .setup = fio_netio_setup_splice, 1415 .init = fio_netio_init, 1416 .cleanup = fio_netio_cleanup, 1417 .open_file = fio_netio_open_file, 1418 .close_file = fio_netio_close_file, 1419 .terminate = fio_netio_terminate, 1420 .options = options, 1421 .option_struct_size = sizeof(struct netio_options), 1422 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR | 1423 FIO_PIPEIO, 1424 }; 1425 #endif 1426 1427 static struct ioengine_ops ioengine_rw = { 1428 .name = "net", 1429 .version = FIO_IOOPS_VERSION, 1430 .prep = fio_netio_prep, 1431 .queue = fio_netio_queue, 1432 .setup = fio_netio_setup, 1433 .init = fio_netio_init, 1434 .cleanup = fio_netio_cleanup, 1435 .open_file = fio_netio_open_file, 1436 .close_file = fio_netio_close_file, 1437 .terminate = fio_netio_terminate, 1438 .options = options, 1439 .option_struct_size = sizeof(struct netio_options), 1440 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR | 1441 FIO_PIPEIO | FIO_BIT_BASED, 1442 }; 1443 1444 static int str_hostname_cb(void *data, const char *input) 1445 { 1446 struct netio_options *o = data; 1447 1448 if (o->td->o.filename) 1449 free(o->td->o.filename); 1450 o->td->o.filename = strdup(input); 1451 return 0; 1452 } 1453 1454 static void fio_init fio_netio_register(void) 1455 { 1456 register_ioengine(&ioengine_rw); 1457 #ifdef CONFIG_LINUX_SPLICE 1458 register_ioengine(&ioengine_splice); 1459 #endif 1460 } 1461 1462 static void fio_exit fio_netio_unregister(void) 1463 { 1464 unregister_ioengine(&ioengine_rw); 1465 #ifdef CONFIG_LINUX_SPLICE 1466 unregister_ioengine(&ioengine_splice); 1467 #endif 1468 } 1469