1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <stdarg.h> 4 #include <unistd.h> 5 #include <limits.h> 6 #include <errno.h> 7 #include <sys/poll.h> 8 #include <sys/types.h> 9 #include <sys/wait.h> 10 #include <sys/socket.h> 11 #include <sys/stat.h> 12 #include <sys/un.h> 13 #include <sys/uio.h> 14 #include <netinet/in.h> 15 #include <arpa/inet.h> 16 #include <netdb.h> 17 #include <syslog.h> 18 #include <signal.h> 19 #ifdef CONFIG_ZLIB 20 #include <zlib.h> 21 #endif 22 23 #include "fio.h" 24 #include "server.h" 25 #include "crc/crc16.h" 26 #include "lib/ieee754.h" 27 28 int fio_net_port = FIO_NET_PORT; 29 30 int exit_backend = 0; 31 32 static int server_fd = -1; 33 static char *fio_server_arg; 34 static char *bind_sock; 35 static struct sockaddr_in saddr_in; 36 static struct sockaddr_in6 saddr_in6; 37 static int use_ipv6; 38 #ifdef CONFIG_ZLIB 39 static unsigned int has_zlib = 1; 40 #else 41 static unsigned int has_zlib = 0; 42 #endif 43 static unsigned int use_zlib; 44 45 struct fio_fork_item { 46 struct flist_head list; 47 int exitval; 48 int signal; 49 int exited; 50 pid_t pid; 51 }; 52 53 static const char *fio_server_ops[FIO_NET_CMD_NR] = { 54 "", 55 "QUIT", 56 "EXIT", 57 "JOB", 58 "JOBLINE", 59 "TEXT", 60 "TS", 61 "GS", 62 "SEND_ETA", 63 "ETA", 64 "PROBE", 65 "START", 66 "STOP", 67 "DISK_UTIL", 68 "SERVER_START", 69 "ADD_JOB", 70 "CMD_RUN", 71 "CMD_IOLOG", 72 }; 73 74 const char *fio_server_op(unsigned int op) 75 { 76 static char buf[32]; 77 78 if (op < FIO_NET_CMD_NR) 79 return fio_server_ops[op]; 80 81 sprintf(buf, "UNKNOWN/%d", op); 82 return buf; 83 } 84 85 static ssize_t iov_total_len(const struct iovec *iov, int count) 86 { 87 ssize_t ret = 0; 88 89 while (count--) { 90 ret += iov->iov_len; 91 iov++; 92 } 93 94 return ret; 95 } 96 97 static int fio_sendv_data(int sk, struct iovec *iov, int count) 98 { 99 ssize_t total_len = iov_total_len(iov, count); 100 ssize_t ret; 101 102 do { 103 ret = writev(sk, iov, count); 104 if (ret > 0) { 105 total_len -= ret; 106 if (!total_len) 107 break; 108 109 while (ret) { 110 if (ret >= iov->iov_len) { 111 ret -= iov->iov_len; 112 iov++; 113 continue; 114 } 115 iov->iov_base += ret; 116 iov->iov_len -= ret; 117 ret = 0; 118 } 119 } else if (!ret) 120 break; 121 else if (errno == EAGAIN || errno == EINTR) 122 continue; 123 else 124 break; 125 } while (!exit_backend); 126 127 if (!total_len) 128 return 0; 129 130 if (errno) 131 return -errno; 132 133 return 1; 134 } 135 136 int fio_send_data(int sk, const void *p, unsigned int len) 137 { 138 struct iovec iov = { .iov_base = (void *) p, .iov_len = len }; 139 140 assert(len <= sizeof(struct fio_net_cmd) + FIO_SERVER_MAX_FRAGMENT_PDU); 141 142 return fio_sendv_data(sk, &iov, 1); 143 } 144 145 int fio_recv_data(int sk, void *p, unsigned int len) 146 { 147 do { 148 int ret = recv(sk, p, len, MSG_WAITALL); 149 150 if (ret > 0) { 151 len -= ret; 152 if (!len) 153 break; 154 p += ret; 155 continue; 156 } else if (!ret) 157 break; 158 else if (errno == EAGAIN || errno == EINTR) 159 continue; 160 else 161 break; 162 } while (!exit_backend); 163 164 if (!len) 165 return 0; 166 167 return -1; 168 } 169 170 static int verify_convert_cmd(struct fio_net_cmd *cmd) 171 { 172 uint16_t crc; 173 174 cmd->cmd_crc16 = le16_to_cpu(cmd->cmd_crc16); 175 cmd->pdu_crc16 = le16_to_cpu(cmd->pdu_crc16); 176 177 crc = fio_crc16(cmd, FIO_NET_CMD_CRC_SZ); 178 if (crc != cmd->cmd_crc16) { 179 log_err("fio: server bad crc on command (got %x, wanted %x)\n", 180 cmd->cmd_crc16, crc); 181 return 1; 182 } 183 184 cmd->version = le16_to_cpu(cmd->version); 185 cmd->opcode = le16_to_cpu(cmd->opcode); 186 cmd->flags = le32_to_cpu(cmd->flags); 187 cmd->tag = le64_to_cpu(cmd->tag); 188 cmd->pdu_len = le32_to_cpu(cmd->pdu_len); 189 190 switch (cmd->version) { 191 case FIO_SERVER_VER: 192 break; 193 default: 194 log_err("fio: bad server cmd version %d\n", cmd->version); 195 return 1; 196 } 197 198 if (cmd->pdu_len > FIO_SERVER_MAX_FRAGMENT_PDU) { 199 log_err("fio: command payload too large: %u\n", cmd->pdu_len); 200 return 1; 201 } 202 203 return 0; 204 } 205 206 /* 207 * Read (and defragment, if necessary) incoming commands 208 */ 209 struct fio_net_cmd *fio_net_recv_cmd(int sk) 210 { 211 struct fio_net_cmd cmd, *tmp, *cmdret = NULL; 212 size_t cmd_size = 0, pdu_offset = 0; 213 uint16_t crc; 214 int ret, first = 1; 215 void *pdu = NULL; 216 217 do { 218 ret = fio_recv_data(sk, &cmd, sizeof(cmd)); 219 if (ret) 220 break; 221 222 /* We have a command, verify it and swap if need be */ 223 ret = verify_convert_cmd(&cmd); 224 if (ret) 225 break; 226 227 if (first) { 228 /* if this is text, add room for \0 at the end */ 229 cmd_size = sizeof(cmd) + cmd.pdu_len + 1; 230 assert(!cmdret); 231 } else 232 cmd_size += cmd.pdu_len; 233 234 if (cmd_size / 1024 > FIO_SERVER_MAX_CMD_MB * 1024) { 235 log_err("fio: cmd+pdu too large (%llu)\n", (unsigned long long) cmd_size); 236 ret = 1; 237 break; 238 } 239 240 tmp = realloc(cmdret, cmd_size); 241 if (!tmp) { 242 log_err("fio: server failed allocating cmd\n"); 243 ret = 1; 244 break; 245 } 246 cmdret = tmp; 247 248 if (first) 249 memcpy(cmdret, &cmd, sizeof(cmd)); 250 else if (cmdret->opcode != cmd.opcode) { 251 log_err("fio: fragment opcode mismatch (%d != %d)\n", 252 cmdret->opcode, cmd.opcode); 253 ret = 1; 254 break; 255 } 256 257 if (!cmd.pdu_len) 258 break; 259 260 /* There's payload, get it */ 261 pdu = (void *) cmdret->payload + pdu_offset; 262 ret = fio_recv_data(sk, pdu, cmd.pdu_len); 263 if (ret) 264 break; 265 266 /* Verify payload crc */ 267 crc = fio_crc16(pdu, cmd.pdu_len); 268 if (crc != cmd.pdu_crc16) { 269 log_err("fio: server bad crc on payload "); 270 log_err("(got %x, wanted %x)\n", cmd.pdu_crc16, crc); 271 ret = 1; 272 break; 273 } 274 275 pdu_offset += cmd.pdu_len; 276 if (!first) 277 cmdret->pdu_len += cmd.pdu_len; 278 first = 0; 279 } while (cmd.flags & FIO_NET_CMD_F_MORE); 280 281 if (ret) { 282 free(cmdret); 283 cmdret = NULL; 284 } else if (cmdret) { 285 /* zero-terminate text input */ 286 if (cmdret->pdu_len) { 287 if (cmdret->opcode == FIO_NET_CMD_TEXT) { 288 struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmdret->payload; 289 char *buf = (char *) pdu->buf; 290 291 buf[pdu->buf_len] = '\0'; 292 } else if (cmdret->opcode == FIO_NET_CMD_JOB) { 293 struct cmd_job_pdu *pdu = (struct cmd_job_pdu *) cmdret->payload; 294 char *buf = (char *) pdu->buf; 295 int len = le32_to_cpu(pdu->buf_len); 296 297 buf[len] = '\0'; 298 } 299 } 300 301 /* frag flag is internal */ 302 cmdret->flags &= ~FIO_NET_CMD_F_MORE; 303 } 304 305 return cmdret; 306 } 307 308 static void add_reply(uint64_t tag, struct flist_head *list) 309 { 310 struct fio_net_cmd_reply *reply; 311 312 reply = (struct fio_net_cmd_reply *) (uintptr_t) tag; 313 flist_add_tail(&reply->list, list); 314 } 315 316 static uint64_t alloc_reply(uint64_t tag, uint16_t opcode) 317 { 318 struct fio_net_cmd_reply *reply; 319 320 reply = calloc(1, sizeof(*reply)); 321 INIT_FLIST_HEAD(&reply->list); 322 gettimeofday(&reply->tv, NULL); 323 reply->saved_tag = tag; 324 reply->opcode = opcode; 325 326 return (uintptr_t) reply; 327 } 328 329 static void free_reply(uint64_t tag) 330 { 331 struct fio_net_cmd_reply *reply; 332 333 reply = (struct fio_net_cmd_reply *) (uintptr_t) tag; 334 free(reply); 335 } 336 337 void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu) 338 { 339 uint32_t pdu_len; 340 341 cmd->cmd_crc16 = __cpu_to_le16(fio_crc16(cmd, FIO_NET_CMD_CRC_SZ)); 342 343 pdu_len = le32_to_cpu(cmd->pdu_len); 344 cmd->pdu_crc16 = __cpu_to_le16(fio_crc16(pdu, pdu_len)); 345 } 346 347 void fio_net_cmd_crc(struct fio_net_cmd *cmd) 348 { 349 fio_net_cmd_crc_pdu(cmd, cmd->payload); 350 } 351 352 int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size, 353 uint64_t *tagptr, struct flist_head *list) 354 { 355 struct fio_net_cmd *cmd = NULL; 356 size_t this_len, cur_len = 0; 357 uint64_t tag; 358 int ret; 359 360 if (list) { 361 assert(tagptr); 362 tag = *tagptr = alloc_reply(*tagptr, opcode); 363 } else 364 tag = tagptr ? *tagptr : 0; 365 366 do { 367 this_len = size; 368 if (this_len > FIO_SERVER_MAX_FRAGMENT_PDU) 369 this_len = FIO_SERVER_MAX_FRAGMENT_PDU; 370 371 if (!cmd || cur_len < sizeof(*cmd) + this_len) { 372 if (cmd) 373 free(cmd); 374 375 cur_len = sizeof(*cmd) + this_len; 376 cmd = malloc(cur_len); 377 } 378 379 fio_init_net_cmd(cmd, opcode, buf, this_len, tag); 380 381 if (this_len < size) 382 cmd->flags = __cpu_to_le32(FIO_NET_CMD_F_MORE); 383 384 fio_net_cmd_crc(cmd); 385 386 ret = fio_send_data(fd, cmd, sizeof(*cmd) + this_len); 387 size -= this_len; 388 buf += this_len; 389 } while (!ret && size); 390 391 if (list) { 392 if (ret) 393 free_reply(tag); 394 else 395 add_reply(tag, list); 396 } 397 398 if (cmd) 399 free(cmd); 400 401 return ret; 402 } 403 404 static int fio_net_send_simple_stack_cmd(int sk, uint16_t opcode, uint64_t tag) 405 { 406 struct fio_net_cmd cmd; 407 408 fio_init_net_cmd(&cmd, opcode, NULL, 0, tag); 409 fio_net_cmd_crc(&cmd); 410 411 return fio_send_data(sk, &cmd, sizeof(cmd)); 412 } 413 414 /* 415 * If 'list' is non-NULL, then allocate and store the sent command for 416 * later verification. 417 */ 418 int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag, 419 struct flist_head *list) 420 { 421 int ret; 422 423 if (list) 424 tag = alloc_reply(tag, opcode); 425 426 ret = fio_net_send_simple_stack_cmd(sk, opcode, tag); 427 if (ret) { 428 if (list) 429 free_reply(tag); 430 431 return ret; 432 } 433 434 if (list) 435 add_reply(tag, list); 436 437 return 0; 438 } 439 440 int fio_net_send_quit(int sk) 441 { 442 dprint(FD_NET, "server: sending quit\n"); 443 444 return fio_net_send_simple_cmd(sk, FIO_NET_CMD_QUIT, 0, NULL); 445 } 446 447 static int fio_net_send_ack(int sk, struct fio_net_cmd *cmd, int error, 448 int signal) 449 { 450 struct cmd_end_pdu epdu; 451 uint64_t tag = 0; 452 453 if (cmd) 454 tag = cmd->tag; 455 456 epdu.error = __cpu_to_le32(error); 457 epdu.signal = __cpu_to_le32(signal); 458 return fio_net_send_cmd(sk, FIO_NET_CMD_STOP, &epdu, sizeof(epdu), &tag, NULL); 459 } 460 461 int fio_net_send_stop(int sk, int error, int signal) 462 { 463 dprint(FD_NET, "server: sending stop (%d, %d)\n", error, signal); 464 return fio_net_send_ack(sk, NULL, error, signal); 465 } 466 467 static void fio_server_add_fork_item(pid_t pid, struct flist_head *list) 468 { 469 struct fio_fork_item *ffi; 470 471 ffi = malloc(sizeof(*ffi)); 472 ffi->exitval = 0; 473 ffi->signal = 0; 474 ffi->exited = 0; 475 ffi->pid = pid; 476 flist_add_tail(&ffi->list, list); 477 } 478 479 static void fio_server_add_conn_pid(struct flist_head *conn_list, pid_t pid) 480 { 481 dprint(FD_NET, "server: forked off connection job (pid=%u)\n", (int) pid); 482 fio_server_add_fork_item(pid, conn_list); 483 } 484 485 static void fio_server_add_job_pid(struct flist_head *job_list, pid_t pid) 486 { 487 dprint(FD_NET, "server: forked off job job (pid=%u)\n", (int) pid); 488 fio_server_add_fork_item(pid, job_list); 489 } 490 491 static void fio_server_check_fork_item(struct fio_fork_item *ffi) 492 { 493 int ret, status; 494 495 ret = waitpid(ffi->pid, &status, WNOHANG); 496 if (ret < 0) { 497 if (errno == ECHILD) { 498 log_err("fio: connection pid %u disappeared\n", (int) ffi->pid); 499 ffi->exited = 1; 500 } else 501 log_err("fio: waitpid: %s\n", strerror(errno)); 502 } else if (ret == ffi->pid) { 503 if (WIFSIGNALED(status)) { 504 ffi->signal = WTERMSIG(status); 505 ffi->exited = 1; 506 } 507 if (WIFEXITED(status)) { 508 if (WEXITSTATUS(status)) 509 ffi->exitval = WEXITSTATUS(status); 510 ffi->exited = 1; 511 } 512 } 513 } 514 515 static void fio_server_fork_item_done(struct fio_fork_item *ffi) 516 { 517 dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int) ffi->pid, ffi->signal, ffi->exitval); 518 519 /* 520 * Fold STOP and QUIT... 521 */ 522 fio_net_send_stop(server_fd, ffi->exitval, ffi->signal); 523 fio_net_send_quit(server_fd); 524 flist_del(&ffi->list); 525 free(ffi); 526 } 527 528 static void fio_server_check_fork_items(struct flist_head *list) 529 { 530 struct flist_head *entry, *tmp; 531 struct fio_fork_item *ffi; 532 533 flist_for_each_safe(entry, tmp, list) { 534 ffi = flist_entry(entry, struct fio_fork_item, list); 535 536 fio_server_check_fork_item(ffi); 537 538 if (ffi->exited) 539 fio_server_fork_item_done(ffi); 540 } 541 } 542 543 static void fio_server_check_jobs(struct flist_head *job_list) 544 { 545 fio_server_check_fork_items(job_list); 546 } 547 548 static void fio_server_check_conns(struct flist_head *conn_list) 549 { 550 fio_server_check_fork_items(conn_list); 551 } 552 553 static int handle_run_cmd(struct flist_head *job_list, struct fio_net_cmd *cmd) 554 { 555 pid_t pid; 556 int ret; 557 558 set_genesis_time(); 559 560 pid = fork(); 561 if (pid) { 562 fio_server_add_job_pid(job_list, pid); 563 return 0; 564 } 565 566 ret = fio_backend(); 567 free_threads_shm(); 568 _exit(ret); 569 } 570 571 static int handle_job_cmd(struct fio_net_cmd *cmd) 572 { 573 struct cmd_job_pdu *pdu = (struct cmd_job_pdu *) cmd->payload; 574 void *buf = pdu->buf; 575 struct cmd_start_pdu spdu; 576 577 pdu->buf_len = le32_to_cpu(pdu->buf_len); 578 pdu->client_type = le32_to_cpu(pdu->client_type); 579 580 if (parse_jobs_ini(buf, 1, 0, pdu->client_type)) { 581 fio_net_send_quit(server_fd); 582 return -1; 583 } 584 585 spdu.jobs = cpu_to_le32(thread_number); 586 spdu.stat_outputs = cpu_to_le32(stat_number); 587 fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL); 588 return 0; 589 } 590 591 static int handle_jobline_cmd(struct fio_net_cmd *cmd) 592 { 593 void *pdu = cmd->payload; 594 struct cmd_single_line_pdu *cslp; 595 struct cmd_line_pdu *clp; 596 unsigned long offset; 597 struct cmd_start_pdu spdu; 598 char **argv; 599 int i; 600 601 clp = pdu; 602 clp->lines = le16_to_cpu(clp->lines); 603 clp->client_type = le16_to_cpu(clp->client_type); 604 argv = malloc(clp->lines * sizeof(char *)); 605 offset = sizeof(*clp); 606 607 dprint(FD_NET, "server: %d command line args\n", clp->lines); 608 609 for (i = 0; i < clp->lines; i++) { 610 cslp = pdu + offset; 611 argv[i] = (char *) cslp->text; 612 613 offset += sizeof(*cslp) + le16_to_cpu(cslp->len); 614 dprint(FD_NET, "server: %d: %s\n", i, argv[i]); 615 } 616 617 if (parse_cmd_line(clp->lines, argv, clp->client_type)) { 618 fio_net_send_quit(server_fd); 619 free(argv); 620 return -1; 621 } 622 623 free(argv); 624 625 spdu.jobs = cpu_to_le32(thread_number); 626 spdu.stat_outputs = cpu_to_le32(stat_number); 627 fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL); 628 return 0; 629 } 630 631 static int handle_probe_cmd(struct fio_net_cmd *cmd) 632 { 633 struct cmd_client_probe_pdu *pdu = (struct cmd_client_probe_pdu *) cmd->payload; 634 struct cmd_probe_reply_pdu probe; 635 uint64_t tag = cmd->tag; 636 637 dprint(FD_NET, "server: sending probe reply\n"); 638 639 memset(&probe, 0, sizeof(probe)); 640 gethostname((char *) probe.hostname, sizeof(probe.hostname)); 641 #ifdef CONFIG_BIG_ENDIAN 642 probe.bigendian = 1; 643 #endif 644 strncpy((char *) probe.fio_version, fio_version_string, sizeof(probe.fio_version)); 645 646 probe.os = FIO_OS; 647 probe.arch = FIO_ARCH; 648 probe.bpp = sizeof(void *); 649 probe.cpus = __cpu_to_le32(cpus_online()); 650 651 /* 652 * If the client supports compression and we do too, then enable it 653 */ 654 if (has_zlib && le64_to_cpu(pdu->flags) & FIO_PROBE_FLAG_ZLIB) { 655 probe.flags = __cpu_to_le64(FIO_PROBE_FLAG_ZLIB); 656 use_zlib = 1; 657 } else { 658 probe.flags = 0; 659 use_zlib = 0; 660 } 661 662 return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), &tag, NULL); 663 } 664 665 static int handle_send_eta_cmd(struct fio_net_cmd *cmd) 666 { 667 struct jobs_eta *je; 668 size_t size; 669 uint64_t tag = cmd->tag; 670 int i; 671 672 if (!thread_number) 673 return 0; 674 675 size = sizeof(*je) + thread_number * sizeof(char) + 1; 676 je = malloc(size); 677 memset(je, 0, size); 678 679 if (!calc_thread_status(je, 1)) { 680 free(je); 681 return 0; 682 } 683 684 dprint(FD_NET, "server sending status\n"); 685 686 je->nr_running = cpu_to_le32(je->nr_running); 687 je->nr_ramp = cpu_to_le32(je->nr_ramp); 688 je->nr_pending = cpu_to_le32(je->nr_pending); 689 je->nr_setting_up = cpu_to_le32(je->nr_setting_up); 690 je->files_open = cpu_to_le32(je->files_open); 691 692 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 693 je->m_rate[i] = cpu_to_le32(je->m_rate[i]); 694 je->t_rate[i] = cpu_to_le32(je->t_rate[i]); 695 je->m_iops[i] = cpu_to_le32(je->m_iops[i]); 696 je->t_iops[i] = cpu_to_le32(je->t_iops[i]); 697 je->rate[i] = cpu_to_le32(je->rate[i]); 698 je->iops[i] = cpu_to_le32(je->iops[i]); 699 } 700 701 je->elapsed_sec = cpu_to_le64(je->elapsed_sec); 702 je->eta_sec = cpu_to_le64(je->eta_sec); 703 je->nr_threads = cpu_to_le32(je->nr_threads); 704 je->is_pow2 = cpu_to_le32(je->is_pow2); 705 je->unit_base = cpu_to_le32(je->unit_base); 706 707 fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, je, size, &tag, NULL); 708 free(je); 709 return 0; 710 } 711 712 static int send_update_job_reply(int fd, uint64_t __tag, int error) 713 { 714 uint64_t tag = __tag; 715 uint32_t pdu_error; 716 717 pdu_error = __cpu_to_le32(error); 718 return fio_net_send_cmd(fd, FIO_NET_CMD_UPDATE_JOB, &pdu_error, sizeof(pdu_error), &tag, NULL); 719 } 720 721 static int handle_update_job_cmd(struct fio_net_cmd *cmd) 722 { 723 struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload; 724 struct thread_data *td; 725 uint32_t tnumber; 726 727 tnumber = le32_to_cpu(pdu->thread_number); 728 729 dprint(FD_NET, "server: updating options for job %u\n", tnumber); 730 731 if (!tnumber || tnumber > thread_number) { 732 send_update_job_reply(server_fd, cmd->tag, ENODEV); 733 return 0; 734 } 735 736 td = &threads[tnumber - 1]; 737 convert_thread_options_to_cpu(&td->o, &pdu->top); 738 send_update_job_reply(server_fd, cmd->tag, 0); 739 return 0; 740 } 741 742 static int handle_command(struct flist_head *job_list, struct fio_net_cmd *cmd) 743 { 744 int ret; 745 746 dprint(FD_NET, "server: got op [%s], pdu=%u, tag=%llx\n", 747 fio_server_op(cmd->opcode), cmd->pdu_len, 748 (unsigned long long) cmd->tag); 749 750 switch (cmd->opcode) { 751 case FIO_NET_CMD_QUIT: 752 fio_terminate_threads(TERMINATE_ALL); 753 return -1; 754 case FIO_NET_CMD_EXIT: 755 exit_backend = 1; 756 return -1; 757 case FIO_NET_CMD_JOB: 758 ret = handle_job_cmd(cmd); 759 break; 760 case FIO_NET_CMD_JOBLINE: 761 ret = handle_jobline_cmd(cmd); 762 break; 763 case FIO_NET_CMD_PROBE: 764 ret = handle_probe_cmd(cmd); 765 break; 766 case FIO_NET_CMD_SEND_ETA: 767 ret = handle_send_eta_cmd(cmd); 768 break; 769 case FIO_NET_CMD_RUN: 770 ret = handle_run_cmd(job_list, cmd); 771 break; 772 case FIO_NET_CMD_UPDATE_JOB: 773 ret = handle_update_job_cmd(cmd); 774 break; 775 default: 776 log_err("fio: unknown opcode: %s\n", fio_server_op(cmd->opcode)); 777 ret = 1; 778 } 779 780 return ret; 781 } 782 783 static int handle_connection(int sk) 784 { 785 struct fio_net_cmd *cmd = NULL; 786 FLIST_HEAD(job_list); 787 int ret = 0; 788 789 reset_fio_state(); 790 server_fd = sk; 791 792 /* read forever */ 793 while (!exit_backend) { 794 struct pollfd pfd = { 795 .fd = sk, 796 .events = POLLIN, 797 }; 798 799 ret = 0; 800 do { 801 int timeout = 1000; 802 803 if (!flist_empty(&job_list)) 804 timeout = 100; 805 806 ret = poll(&pfd, 1, timeout); 807 if (ret < 0) { 808 if (errno == EINTR) 809 break; 810 log_err("fio: poll: %s\n", strerror(errno)); 811 break; 812 } else if (!ret) { 813 fio_server_check_jobs(&job_list); 814 continue; 815 } 816 817 if (pfd.revents & POLLIN) 818 break; 819 if (pfd.revents & (POLLERR|POLLHUP)) { 820 ret = 1; 821 break; 822 } 823 } while (!exit_backend); 824 825 fio_server_check_jobs(&job_list); 826 827 if (ret < 0) 828 break; 829 830 cmd = fio_net_recv_cmd(sk); 831 if (!cmd) { 832 ret = -1; 833 break; 834 } 835 836 ret = handle_command(&job_list, cmd); 837 if (ret) 838 break; 839 840 free(cmd); 841 cmd = NULL; 842 } 843 844 if (cmd) 845 free(cmd); 846 847 close(sk); 848 _exit(ret); 849 } 850 851 static int accept_loop(int listen_sk) 852 { 853 struct sockaddr_in addr; 854 struct sockaddr_in6 addr6; 855 socklen_t len = use_ipv6 ? sizeof(addr6) : sizeof(addr); 856 struct pollfd pfd; 857 int ret = 0, sk, exitval = 0; 858 FLIST_HEAD(conn_list); 859 860 dprint(FD_NET, "server enter accept loop\n"); 861 862 fio_set_fd_nonblocking(listen_sk, "server"); 863 864 while (!exit_backend) { 865 const char *from; 866 char buf[64]; 867 pid_t pid; 868 869 pfd.fd = listen_sk; 870 pfd.events = POLLIN; 871 do { 872 int timeout = 1000; 873 874 if (!flist_empty(&conn_list)) 875 timeout = 100; 876 877 ret = poll(&pfd, 1, timeout); 878 if (ret < 0) { 879 if (errno == EINTR) 880 break; 881 log_err("fio: poll: %s\n", strerror(errno)); 882 break; 883 } else if (!ret) { 884 fio_server_check_conns(&conn_list); 885 continue; 886 } 887 888 if (pfd.revents & POLLIN) 889 break; 890 } while (!exit_backend); 891 892 fio_server_check_conns(&conn_list); 893 894 if (exit_backend || ret < 0) 895 break; 896 897 if (use_ipv6) 898 sk = accept(listen_sk, (struct sockaddr *) &addr6, &len); 899 else 900 sk = accept(listen_sk, (struct sockaddr *) &addr, &len); 901 902 if (sk < 0) { 903 log_err("fio: accept: %s\n", strerror(errno)); 904 return -1; 905 } 906 907 if (use_ipv6) 908 from = inet_ntop(AF_INET6, (struct sockaddr *) &addr6.sin6_addr, buf, sizeof(buf)); 909 else 910 from = inet_ntop(AF_INET, (struct sockaddr *) &addr.sin_addr, buf, sizeof(buf)); 911 912 dprint(FD_NET, "server: connect from %s\n", from); 913 914 pid = fork(); 915 if (pid) { 916 close(sk); 917 fio_server_add_conn_pid(&conn_list, pid); 918 continue; 919 } 920 921 /* exits */ 922 handle_connection(sk); 923 } 924 925 return exitval; 926 } 927 928 int fio_server_text_output(int level, const char *buf, size_t len) 929 { 930 struct cmd_text_pdu *pdu; 931 unsigned int tlen; 932 struct timeval tv; 933 934 if (server_fd == -1) 935 return log_local_buf(buf, len); 936 937 tlen = sizeof(*pdu) + len; 938 pdu = malloc(tlen); 939 940 pdu->level = __cpu_to_le32(level); 941 pdu->buf_len = __cpu_to_le32(len); 942 943 gettimeofday(&tv, NULL); 944 pdu->log_sec = __cpu_to_le64(tv.tv_sec); 945 pdu->log_usec = __cpu_to_le64(tv.tv_usec); 946 947 memcpy(pdu->buf, buf, len); 948 949 fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, pdu, tlen, NULL, NULL); 950 free(pdu); 951 return len; 952 } 953 954 static void convert_io_stat(struct io_stat *dst, struct io_stat *src) 955 { 956 dst->max_val = cpu_to_le64(src->max_val); 957 dst->min_val = cpu_to_le64(src->min_val); 958 dst->samples = cpu_to_le64(src->samples); 959 960 /* 961 * Encode to IEEE 754 for network transfer 962 */ 963 dst->mean.u.i = __cpu_to_le64(fio_double_to_uint64(src->mean.u.f)); 964 dst->S.u.i = __cpu_to_le64(fio_double_to_uint64(src->S.u.f)); 965 } 966 967 static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src) 968 { 969 int i; 970 971 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 972 dst->max_run[i] = cpu_to_le64(src->max_run[i]); 973 dst->min_run[i] = cpu_to_le64(src->min_run[i]); 974 dst->max_bw[i] = cpu_to_le64(src->max_bw[i]); 975 dst->min_bw[i] = cpu_to_le64(src->min_bw[i]); 976 dst->io_kb[i] = cpu_to_le64(src->io_kb[i]); 977 dst->agg[i] = cpu_to_le64(src->agg[i]); 978 } 979 980 dst->kb_base = cpu_to_le32(src->kb_base); 981 dst->unit_base = cpu_to_le32(src->unit_base); 982 dst->groupid = cpu_to_le32(src->groupid); 983 dst->unified_rw_rep = cpu_to_le32(src->unified_rw_rep); 984 } 985 986 /* 987 * Send a CMD_TS, which packs struct thread_stat and group_run_stats 988 * into a single payload. 989 */ 990 void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs) 991 { 992 struct cmd_ts_pdu p; 993 int i, j; 994 995 dprint(FD_NET, "server sending end stats\n"); 996 997 memset(&p, 0, sizeof(p)); 998 999 strncpy(p.ts.name, ts->name, FIO_JOBNAME_SIZE - 1); 1000 strncpy(p.ts.verror, ts->verror, FIO_VERROR_SIZE - 1); 1001 strncpy(p.ts.description, ts->description, FIO_JOBDESC_SIZE - 1); 1002 1003 p.ts.error = cpu_to_le32(ts->error); 1004 p.ts.thread_number = cpu_to_le32(ts->thread_number); 1005 p.ts.groupid = cpu_to_le32(ts->groupid); 1006 p.ts.pid = cpu_to_le32(ts->pid); 1007 p.ts.members = cpu_to_le32(ts->members); 1008 p.ts.unified_rw_rep = cpu_to_le32(ts->unified_rw_rep); 1009 1010 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 1011 convert_io_stat(&p.ts.clat_stat[i], &ts->clat_stat[i]); 1012 convert_io_stat(&p.ts.slat_stat[i], &ts->slat_stat[i]); 1013 convert_io_stat(&p.ts.lat_stat[i], &ts->lat_stat[i]); 1014 convert_io_stat(&p.ts.bw_stat[i], &ts->bw_stat[i]); 1015 } 1016 1017 p.ts.usr_time = cpu_to_le64(ts->usr_time); 1018 p.ts.sys_time = cpu_to_le64(ts->sys_time); 1019 p.ts.ctx = cpu_to_le64(ts->ctx); 1020 p.ts.minf = cpu_to_le64(ts->minf); 1021 p.ts.majf = cpu_to_le64(ts->majf); 1022 p.ts.clat_percentiles = cpu_to_le64(ts->clat_percentiles); 1023 1024 for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) { 1025 fio_fp64_t *src = &ts->percentile_list[i]; 1026 fio_fp64_t *dst = &p.ts.percentile_list[i]; 1027 1028 dst->u.i = __cpu_to_le64(fio_double_to_uint64(src->u.f)); 1029 } 1030 1031 for (i = 0; i < FIO_IO_U_MAP_NR; i++) { 1032 p.ts.io_u_map[i] = cpu_to_le32(ts->io_u_map[i]); 1033 p.ts.io_u_submit[i] = cpu_to_le32(ts->io_u_submit[i]); 1034 p.ts.io_u_complete[i] = cpu_to_le32(ts->io_u_complete[i]); 1035 } 1036 1037 for (i = 0; i < FIO_IO_U_LAT_U_NR; i++) { 1038 p.ts.io_u_lat_u[i] = cpu_to_le32(ts->io_u_lat_u[i]); 1039 p.ts.io_u_lat_m[i] = cpu_to_le32(ts->io_u_lat_m[i]); 1040 } 1041 1042 for (i = 0; i < DDIR_RWDIR_CNT; i++) 1043 for (j = 0; j < FIO_IO_U_PLAT_NR; j++) 1044 p.ts.io_u_plat[i][j] = cpu_to_le32(ts->io_u_plat[i][j]); 1045 1046 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 1047 p.ts.total_io_u[i] = cpu_to_le64(ts->total_io_u[i]); 1048 p.ts.short_io_u[i] = cpu_to_le64(ts->short_io_u[i]); 1049 } 1050 1051 p.ts.total_submit = cpu_to_le64(ts->total_submit); 1052 p.ts.total_complete = cpu_to_le64(ts->total_complete); 1053 1054 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 1055 p.ts.io_bytes[i] = cpu_to_le64(ts->io_bytes[i]); 1056 p.ts.runtime[i] = cpu_to_le64(ts->runtime[i]); 1057 } 1058 1059 p.ts.total_run_time = cpu_to_le64(ts->total_run_time); 1060 p.ts.continue_on_error = cpu_to_le16(ts->continue_on_error); 1061 p.ts.total_err_count = cpu_to_le64(ts->total_err_count); 1062 p.ts.first_error = cpu_to_le32(ts->first_error); 1063 p.ts.kb_base = cpu_to_le32(ts->kb_base); 1064 p.ts.unit_base = cpu_to_le32(ts->unit_base); 1065 1066 p.ts.latency_depth = cpu_to_le32(ts->latency_depth); 1067 p.ts.latency_target = cpu_to_le64(ts->latency_target); 1068 p.ts.latency_window = cpu_to_le64(ts->latency_window); 1069 p.ts.latency_percentile.u.i = __cpu_to_le64(fio_double_to_uint64(ts->latency_percentile.u.f)); 1070 1071 convert_gs(&p.rs, rs); 1072 1073 fio_net_send_cmd(server_fd, FIO_NET_CMD_TS, &p, sizeof(p), NULL, NULL); 1074 } 1075 1076 void fio_server_send_gs(struct group_run_stats *rs) 1077 { 1078 struct group_run_stats gs; 1079 1080 dprint(FD_NET, "server sending group run stats\n"); 1081 1082 convert_gs(&gs, rs); 1083 fio_net_send_cmd(server_fd, FIO_NET_CMD_GS, &gs, sizeof(gs), NULL, NULL); 1084 } 1085 1086 static void convert_agg(struct disk_util_agg *dst, struct disk_util_agg *src) 1087 { 1088 int i; 1089 1090 for (i = 0; i < 2; i++) { 1091 dst->ios[i] = cpu_to_le32(src->ios[i]); 1092 dst->merges[i] = cpu_to_le32(src->merges[i]); 1093 dst->sectors[i] = cpu_to_le64(src->sectors[i]); 1094 dst->ticks[i] = cpu_to_le32(src->ticks[i]); 1095 } 1096 1097 dst->io_ticks = cpu_to_le32(src->io_ticks); 1098 dst->time_in_queue = cpu_to_le32(src->time_in_queue); 1099 dst->slavecount = cpu_to_le32(src->slavecount); 1100 dst->max_util.u.i = __cpu_to_le64(fio_double_to_uint64(src->max_util.u.f)); 1101 } 1102 1103 static void convert_dus(struct disk_util_stat *dst, struct disk_util_stat *src) 1104 { 1105 int i; 1106 1107 dst->name[FIO_DU_NAME_SZ - 1] = '\0'; 1108 strncpy((char *) dst->name, (char *) src->name, FIO_DU_NAME_SZ - 1); 1109 1110 for (i = 0; i < 2; i++) { 1111 dst->s.ios[i] = cpu_to_le32(src->s.ios[i]); 1112 dst->s.merges[i] = cpu_to_le32(src->s.merges[i]); 1113 dst->s.sectors[i] = cpu_to_le64(src->s.sectors[i]); 1114 dst->s.ticks[i] = cpu_to_le32(src->s.ticks[i]); 1115 } 1116 1117 dst->s.io_ticks = cpu_to_le32(src->s.io_ticks); 1118 dst->s.time_in_queue = cpu_to_le32(src->s.time_in_queue); 1119 dst->s.msec = cpu_to_le64(src->s.msec); 1120 } 1121 1122 void fio_server_send_du(void) 1123 { 1124 struct disk_util *du; 1125 struct flist_head *entry; 1126 struct cmd_du_pdu pdu; 1127 1128 dprint(FD_NET, "server: sending disk_util %d\n", !flist_empty(&disk_list)); 1129 1130 memset(&pdu, 0, sizeof(pdu)); 1131 1132 flist_for_each(entry, &disk_list) { 1133 du = flist_entry(entry, struct disk_util, list); 1134 1135 convert_dus(&pdu.dus, &du->dus); 1136 convert_agg(&pdu.agg, &du->agg); 1137 1138 fio_net_send_cmd(server_fd, FIO_NET_CMD_DU, &pdu, sizeof(pdu), NULL, NULL); 1139 } 1140 } 1141 1142 /* 1143 * Send a command with a separate PDU, not inlined in the command 1144 */ 1145 static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf, 1146 off_t size, uint64_t tag, uint32_t flags) 1147 { 1148 struct fio_net_cmd cmd; 1149 struct iovec iov[2]; 1150 1151 iov[0].iov_base = (void *) &cmd; 1152 iov[0].iov_len = sizeof(cmd); 1153 iov[1].iov_base = (void *) buf; 1154 iov[1].iov_len = size; 1155 1156 __fio_init_net_cmd(&cmd, opcode, size, tag); 1157 cmd.flags = __cpu_to_le32(flags); 1158 fio_net_cmd_crc_pdu(&cmd, buf); 1159 1160 return fio_sendv_data(sk, iov, 2); 1161 } 1162 1163 static int fio_send_iolog_gz(struct cmd_iolog_pdu *pdu, struct io_log *log) 1164 { 1165 int ret = 0; 1166 #ifdef CONFIG_ZLIB 1167 z_stream stream; 1168 void *out_pdu; 1169 1170 /* 1171 * Dirty - since the log is potentially huge, compress it into 1172 * FIO_SERVER_MAX_FRAGMENT_PDU chunks and let the receiving 1173 * side defragment it. 1174 */ 1175 out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU); 1176 1177 stream.zalloc = Z_NULL; 1178 stream.zfree = Z_NULL; 1179 stream.opaque = Z_NULL; 1180 1181 if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK) { 1182 ret = 1; 1183 goto err; 1184 } 1185 1186 stream.next_in = (void *) log->log; 1187 stream.avail_in = log->nr_samples * sizeof(struct io_sample); 1188 1189 do { 1190 unsigned int this_len, flags = 0; 1191 int ret; 1192 1193 stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU; 1194 stream.next_out = out_pdu; 1195 ret = deflate(&stream, Z_FINISH); 1196 /* may be Z_OK, or Z_STREAM_END */ 1197 if (ret < 0) 1198 goto err_zlib; 1199 1200 this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out; 1201 1202 if (stream.avail_in) 1203 flags = FIO_NET_CMD_F_MORE; 1204 1205 ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, 1206 out_pdu, this_len, 0, flags); 1207 if (ret) 1208 goto err_zlib; 1209 } while (stream.avail_in); 1210 1211 err_zlib: 1212 deflateEnd(&stream); 1213 err: 1214 free(out_pdu); 1215 #endif 1216 return ret; 1217 } 1218 1219 int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name) 1220 { 1221 struct cmd_iolog_pdu pdu; 1222 int i, ret = 0; 1223 1224 pdu.thread_number = cpu_to_le32(td->thread_number); 1225 pdu.nr_samples = __cpu_to_le32(log->nr_samples); 1226 pdu.log_type = cpu_to_le32(log->log_type); 1227 pdu.compressed = cpu_to_le32(use_zlib); 1228 1229 strncpy((char *) pdu.name, name, FIO_NET_NAME_MAX); 1230 pdu.name[FIO_NET_NAME_MAX - 1] = '\0'; 1231 1232 for (i = 0; i < log->nr_samples; i++) { 1233 struct io_sample *s = &log->log[i]; 1234 1235 s->time = cpu_to_le64(s->time); 1236 s->val = cpu_to_le64(s->val); 1237 s->ddir = cpu_to_le32(s->ddir); 1238 s->bs = cpu_to_le32(s->bs); 1239 } 1240 1241 /* 1242 * Send header first, it's not compressed. 1243 */ 1244 ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, &pdu, 1245 sizeof(pdu), 0, FIO_NET_CMD_F_MORE); 1246 if (ret) 1247 return ret; 1248 1249 /* 1250 * Now send actual log, compress if we can, otherwise just plain 1251 */ 1252 if (use_zlib) 1253 return fio_send_iolog_gz(&pdu, log); 1254 1255 return fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, log->log, 1256 log->nr_samples * sizeof(struct io_sample), 0, 0); 1257 } 1258 1259 void fio_server_send_add_job(struct thread_data *td) 1260 { 1261 struct cmd_add_job_pdu pdu; 1262 1263 memset(&pdu, 0, sizeof(pdu)); 1264 pdu.thread_number = cpu_to_le32(td->thread_number); 1265 pdu.groupid = cpu_to_le32(td->groupid); 1266 convert_thread_options_to_net(&pdu.top, &td->o); 1267 1268 fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL, NULL); 1269 } 1270 1271 void fio_server_send_start(struct thread_data *td) 1272 { 1273 assert(server_fd != -1); 1274 1275 fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL); 1276 } 1277 1278 static int fio_init_server_ip(void) 1279 { 1280 struct sockaddr *addr; 1281 socklen_t socklen; 1282 char buf[80]; 1283 const char *str; 1284 int sk, opt; 1285 1286 if (use_ipv6) 1287 sk = socket(AF_INET6, SOCK_STREAM, 0); 1288 else 1289 sk = socket(AF_INET, SOCK_STREAM, 0); 1290 1291 if (sk < 0) { 1292 log_err("fio: socket: %s\n", strerror(errno)); 1293 return -1; 1294 } 1295 1296 opt = 1; 1297 if (setsockopt(sk, SOL_SOCKET, SO_REUSEADDR, (void *)&opt, sizeof(opt)) < 0) { 1298 log_err("fio: setsockopt: %s\n", strerror(errno)); 1299 close(sk); 1300 return -1; 1301 } 1302 #ifdef SO_REUSEPORT 1303 if (setsockopt(sk, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) { 1304 log_err("fio: setsockopt: %s\n", strerror(errno)); 1305 close(sk); 1306 return -1; 1307 } 1308 #endif 1309 1310 if (use_ipv6) { 1311 const void *src = &saddr_in6.sin6_addr; 1312 1313 addr = (struct sockaddr *) &saddr_in6; 1314 socklen = sizeof(saddr_in6); 1315 saddr_in6.sin6_family = AF_INET6; 1316 str = inet_ntop(AF_INET6, src, buf, sizeof(buf)); 1317 } else { 1318 const void *src = &saddr_in.sin_addr; 1319 1320 addr = (struct sockaddr *) &saddr_in; 1321 socklen = sizeof(saddr_in); 1322 saddr_in.sin_family = AF_INET; 1323 str = inet_ntop(AF_INET, src, buf, sizeof(buf)); 1324 } 1325 1326 if (bind(sk, addr, socklen) < 0) { 1327 log_err("fio: bind: %s\n", strerror(errno)); 1328 log_info("fio: failed with IPv%c %s\n", use_ipv6 ? '6' : '4', str); 1329 close(sk); 1330 return -1; 1331 } 1332 1333 return sk; 1334 } 1335 1336 static int fio_init_server_sock(void) 1337 { 1338 struct sockaddr_un addr; 1339 socklen_t len; 1340 mode_t mode; 1341 int sk; 1342 1343 sk = socket(AF_UNIX, SOCK_STREAM, 0); 1344 if (sk < 0) { 1345 log_err("fio: socket: %s\n", strerror(errno)); 1346 return -1; 1347 } 1348 1349 mode = umask(000); 1350 1351 memset(&addr, 0, sizeof(addr)); 1352 addr.sun_family = AF_UNIX; 1353 strncpy(addr.sun_path, bind_sock, sizeof(addr.sun_path) - 1); 1354 1355 len = sizeof(addr.sun_family) + strlen(bind_sock) + 1; 1356 1357 if (bind(sk, (struct sockaddr *) &addr, len) < 0) { 1358 log_err("fio: bind: %s\n", strerror(errno)); 1359 close(sk); 1360 return -1; 1361 } 1362 1363 umask(mode); 1364 return sk; 1365 } 1366 1367 static int fio_init_server_connection(void) 1368 { 1369 char bind_str[128]; 1370 int sk; 1371 1372 dprint(FD_NET, "starting server\n"); 1373 1374 if (!bind_sock) 1375 sk = fio_init_server_ip(); 1376 else 1377 sk = fio_init_server_sock(); 1378 1379 if (sk < 0) 1380 return sk; 1381 1382 memset(bind_str, 0, sizeof(bind_str)); 1383 1384 if (!bind_sock) { 1385 char *p, port[16]; 1386 const void *src; 1387 int af; 1388 1389 if (use_ipv6) { 1390 af = AF_INET6; 1391 src = &saddr_in6.sin6_addr; 1392 } else { 1393 af = AF_INET; 1394 src = &saddr_in.sin_addr; 1395 } 1396 1397 p = (char *) inet_ntop(af, src, bind_str, sizeof(bind_str)); 1398 1399 sprintf(port, ",%u", fio_net_port); 1400 if (p) 1401 strcat(p, port); 1402 else 1403 strncpy(bind_str, port, sizeof(bind_str) - 1); 1404 } else 1405 strncpy(bind_str, bind_sock, sizeof(bind_str) - 1); 1406 1407 log_info("fio: server listening on %s\n", bind_str); 1408 1409 if (listen(sk, 0) < 0) { 1410 log_err("fio: listen: %s\n", strerror(errno)); 1411 close(sk); 1412 return -1; 1413 } 1414 1415 return sk; 1416 } 1417 1418 int fio_server_parse_host(const char *host, int ipv6, struct in_addr *inp, 1419 struct in6_addr *inp6) 1420 1421 { 1422 int ret = 0; 1423 1424 if (ipv6) 1425 ret = inet_pton(AF_INET6, host, inp6); 1426 else 1427 ret = inet_pton(AF_INET, host, inp); 1428 1429 if (ret != 1) { 1430 struct addrinfo hints, *res; 1431 1432 memset(&hints, 0, sizeof(hints)); 1433 hints.ai_family = ipv6 ? AF_INET6 : AF_INET; 1434 hints.ai_socktype = SOCK_STREAM; 1435 1436 ret = getaddrinfo(host, NULL, &hints, &res); 1437 if (ret) { 1438 log_err("fio: failed to resolve <%s> (%s)\n", host, 1439 gai_strerror(ret)); 1440 return 1; 1441 } 1442 1443 if (ipv6) 1444 memcpy(inp6, &((struct sockaddr_in6 *) res->ai_addr)->sin6_addr, sizeof(*inp6)); 1445 else 1446 memcpy(inp, &((struct sockaddr_in *) res->ai_addr)->sin_addr, sizeof(*inp)); 1447 1448 ret = 1; 1449 freeaddrinfo(res); 1450 } 1451 1452 return !(ret == 1); 1453 } 1454 1455 /* 1456 * Parse a host/ip/port string. Reads from 'str'. 1457 * 1458 * Outputs: 1459 * 1460 * For IPv4: 1461 * *ptr is the host, *port is the port, inp is the destination. 1462 * For IPv6: 1463 * *ptr is the host, *port is the port, inp6 is the dest, and *ipv6 is 1. 1464 * For local domain sockets: 1465 * *ptr is the filename, *is_sock is 1. 1466 */ 1467 int fio_server_parse_string(const char *str, char **ptr, int *is_sock, 1468 int *port, struct in_addr *inp, 1469 struct in6_addr *inp6, int *ipv6) 1470 { 1471 const char *host = str; 1472 char *portp; 1473 int lport = 0; 1474 1475 *ptr = NULL; 1476 *is_sock = 0; 1477 *port = fio_net_port; 1478 *ipv6 = 0; 1479 1480 if (!strncmp(str, "sock:", 5)) { 1481 *ptr = strdup(str + 5); 1482 *is_sock = 1; 1483 1484 return 0; 1485 } 1486 1487 /* 1488 * Is it ip:<ip or host>:port 1489 */ 1490 if (!strncmp(host, "ip:", 3)) 1491 host += 3; 1492 else if (!strncmp(host, "ip4:", 4)) 1493 host += 4; 1494 else if (!strncmp(host, "ip6:", 4)) { 1495 host += 4; 1496 *ipv6 = 1; 1497 } else if (host[0] == ':') { 1498 /* String is :port */ 1499 host++; 1500 lport = atoi(host); 1501 if (!lport || lport > 65535) { 1502 log_err("fio: bad server port %u\n", lport); 1503 return 1; 1504 } 1505 /* no hostname given, we are done */ 1506 *port = lport; 1507 return 0; 1508 } 1509 1510 /* 1511 * If no port seen yet, check if there's a last ',' at the end 1512 */ 1513 if (!lport) { 1514 portp = strchr(host, ','); 1515 if (portp) { 1516 *portp = '\0'; 1517 portp++; 1518 lport = atoi(portp); 1519 if (!lport || lport > 65535) { 1520 log_err("fio: bad server port %u\n", lport); 1521 return 1; 1522 } 1523 } 1524 } 1525 1526 if (lport) 1527 *port = lport; 1528 1529 if (!strlen(host)) 1530 return 0; 1531 1532 *ptr = strdup(host); 1533 1534 if (fio_server_parse_host(*ptr, *ipv6, inp, inp6)) { 1535 free(*ptr); 1536 *ptr = NULL; 1537 return 1; 1538 } 1539 1540 if (*port == 0) 1541 *port = fio_net_port; 1542 1543 return 0; 1544 } 1545 1546 /* 1547 * Server arg should be one of: 1548 * 1549 * sock:/path/to/socket 1550 * ip:1.2.3.4 1551 * 1.2.3.4 1552 * 1553 * Where sock uses unix domain sockets, and ip binds the server to 1554 * a specific interface. If no arguments are given to the server, it 1555 * uses IP and binds to 0.0.0.0. 1556 * 1557 */ 1558 static int fio_handle_server_arg(void) 1559 { 1560 int port = fio_net_port; 1561 int is_sock, ret = 0; 1562 1563 saddr_in.sin_addr.s_addr = htonl(INADDR_ANY); 1564 1565 if (!fio_server_arg) 1566 goto out; 1567 1568 ret = fio_server_parse_string(fio_server_arg, &bind_sock, &is_sock, 1569 &port, &saddr_in.sin_addr, 1570 &saddr_in6.sin6_addr, &use_ipv6); 1571 1572 if (!is_sock && bind_sock) { 1573 free(bind_sock); 1574 bind_sock = NULL; 1575 } 1576 1577 out: 1578 fio_net_port = port; 1579 saddr_in.sin_port = htons(port); 1580 saddr_in6.sin6_port = htons(port); 1581 return ret; 1582 } 1583 1584 static void sig_int(int sig) 1585 { 1586 if (bind_sock) 1587 unlink(bind_sock); 1588 } 1589 1590 static void set_sig_handlers(void) 1591 { 1592 struct sigaction act; 1593 1594 memset(&act, 0, sizeof(act)); 1595 act.sa_handler = sig_int; 1596 act.sa_flags = SA_RESTART; 1597 sigaction(SIGINT, &act, NULL); 1598 } 1599 1600 static int fio_server(void) 1601 { 1602 int sk, ret; 1603 1604 dprint(FD_NET, "starting server\n"); 1605 1606 if (fio_handle_server_arg()) 1607 return -1; 1608 1609 sk = fio_init_server_connection(); 1610 if (sk < 0) 1611 return -1; 1612 1613 set_sig_handlers(); 1614 1615 ret = accept_loop(sk); 1616 1617 close(sk); 1618 1619 if (fio_server_arg) { 1620 free(fio_server_arg); 1621 fio_server_arg = NULL; 1622 } 1623 if (bind_sock) 1624 free(bind_sock); 1625 1626 return ret; 1627 } 1628 1629 void fio_server_got_signal(int signal) 1630 { 1631 if (signal == SIGPIPE) 1632 server_fd = -1; 1633 else { 1634 log_info("\nfio: terminating on signal %d\n", signal); 1635 exit_backend = 1; 1636 } 1637 } 1638 1639 static int check_existing_pidfile(const char *pidfile) 1640 { 1641 struct stat sb; 1642 char buf[16]; 1643 pid_t pid; 1644 FILE *f; 1645 1646 if (stat(pidfile, &sb)) 1647 return 0; 1648 1649 f = fopen(pidfile, "r"); 1650 if (!f) 1651 return 0; 1652 1653 if (fread(buf, sb.st_size, 1, f) <= 0) { 1654 fclose(f); 1655 return 1; 1656 } 1657 fclose(f); 1658 1659 pid = atoi(buf); 1660 if (kill(pid, SIGCONT) < 0) 1661 return errno != ESRCH; 1662 1663 return 1; 1664 } 1665 1666 static int write_pid(pid_t pid, const char *pidfile) 1667 { 1668 FILE *fpid; 1669 1670 fpid = fopen(pidfile, "w"); 1671 if (!fpid) { 1672 log_err("fio: failed opening pid file %s\n", pidfile); 1673 return 1; 1674 } 1675 1676 fprintf(fpid, "%u\n", (unsigned int) pid); 1677 fclose(fpid); 1678 return 0; 1679 } 1680 1681 /* 1682 * If pidfile is specified, background us. 1683 */ 1684 int fio_start_server(char *pidfile) 1685 { 1686 pid_t pid; 1687 int ret; 1688 1689 #if defined(WIN32) 1690 WSADATA wsd; 1691 WSAStartup(MAKEWORD(2, 2), &wsd); 1692 #endif 1693 1694 if (!pidfile) 1695 return fio_server(); 1696 1697 if (check_existing_pidfile(pidfile)) { 1698 log_err("fio: pidfile %s exists and server appears alive\n", 1699 pidfile); 1700 free(pidfile); 1701 return -1; 1702 } 1703 1704 pid = fork(); 1705 if (pid < 0) { 1706 log_err("fio: failed server fork: %s", strerror(errno)); 1707 free(pidfile); 1708 return -1; 1709 } else if (pid) { 1710 int ret = write_pid(pid, pidfile); 1711 1712 free(pidfile); 1713 exit(ret); 1714 } 1715 1716 setsid(); 1717 openlog("fio", LOG_NDELAY|LOG_NOWAIT|LOG_PID, LOG_USER); 1718 log_syslog = 1; 1719 close(STDIN_FILENO); 1720 close(STDOUT_FILENO); 1721 close(STDERR_FILENO); 1722 f_out = NULL; 1723 f_err = NULL; 1724 1725 ret = fio_server(); 1726 1727 closelog(); 1728 unlink(pidfile); 1729 free(pidfile); 1730 return ret; 1731 } 1732 1733 void fio_server_set_arg(const char *arg) 1734 { 1735 fio_server_arg = strdup(arg); 1736 } 1737