1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <unistd.h> 4 #include <limits.h> 5 #include <errno.h> 6 #include <fcntl.h> 7 #include <sys/poll.h> 8 #include <sys/types.h> 9 #include <sys/stat.h> 10 #include <sys/wait.h> 11 #include <sys/socket.h> 12 #include <sys/un.h> 13 #include <netinet/in.h> 14 #include <arpa/inet.h> 15 #include <netdb.h> 16 #include <signal.h> 17 #ifdef CONFIG_ZLIB 18 #include <zlib.h> 19 #endif 20 21 #include "fio.h" 22 #include "client.h" 23 #include "server.h" 24 #include "flist.h" 25 #include "hash.h" 26 #include "verify.h" 27 28 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd); 29 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd); 30 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd); 31 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd); 32 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd); 33 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd); 34 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd); 35 36 struct client_ops fio_client_ops = { 37 .text = handle_text, 38 .disk_util = handle_du, 39 .thread_status = handle_ts, 40 .group_stats = handle_gs, 41 .stop = handle_stop, 42 .start = handle_start, 43 .eta = display_thread_status, 44 .probe = handle_probe, 45 .eta_msec = FIO_CLIENT_DEF_ETA_MSEC, 46 .client_type = FIO_CLIENT_TYPE_CLI, 47 }; 48 49 static struct timeval eta_tv; 50 51 static FLIST_HEAD(client_list); 52 static FLIST_HEAD(eta_list); 53 54 static FLIST_HEAD(arg_list); 55 56 struct thread_stat client_ts; 57 struct group_run_stats client_gs; 58 int sum_stat_clients; 59 60 static int sum_stat_nr; 61 static struct json_object *root = NULL; 62 static struct json_array *clients_array = NULL; 63 static struct json_array *du_array = NULL; 64 65 static int error_clients; 66 67 #define FIO_CLIENT_HASH_BITS 7 68 #define FIO_CLIENT_HASH_SZ (1 << FIO_CLIENT_HASH_BITS) 69 #define FIO_CLIENT_HASH_MASK (FIO_CLIENT_HASH_SZ - 1) 70 static struct flist_head client_hash[FIO_CLIENT_HASH_SZ]; 71 72 static void fio_client_add_hash(struct fio_client *client) 73 { 74 int bucket = hash_long(client->fd, FIO_CLIENT_HASH_BITS); 75 76 bucket &= FIO_CLIENT_HASH_MASK; 77 flist_add(&client->hash_list, &client_hash[bucket]); 78 } 79 80 static void fio_client_remove_hash(struct fio_client *client) 81 { 82 if (!flist_empty(&client->hash_list)) 83 flist_del_init(&client->hash_list); 84 } 85 86 static void fio_init fio_client_hash_init(void) 87 { 88 int i; 89 90 for (i = 0; i < FIO_CLIENT_HASH_SZ; i++) 91 INIT_FLIST_HEAD(&client_hash[i]); 92 } 93 94 static int read_data(int fd, void *data, size_t size) 95 { 96 ssize_t ret; 97 98 while (size) { 99 ret = read(fd, data, size); 100 if (ret < 0) { 101 if (errno == EAGAIN || errno == EINTR) 102 continue; 103 break; 104 } else if (!ret) 105 break; 106 else { 107 data += ret; 108 size -= ret; 109 } 110 } 111 112 if (size) 113 return EAGAIN; 114 115 return 0; 116 } 117 118 static void fio_client_json_init(void) 119 { 120 if (output_format != FIO_OUTPUT_JSON) 121 return; 122 root = json_create_object(); 123 json_object_add_value_string(root, "fio version", fio_version_string); 124 clients_array = json_create_array(); 125 json_object_add_value_array(root, "client_stats", clients_array); 126 du_array = json_create_array(); 127 json_object_add_value_array(root, "disk_util", du_array); 128 } 129 130 static void fio_client_json_fini(void) 131 { 132 if (output_format != FIO_OUTPUT_JSON) 133 return; 134 json_print_object(root); 135 log_info("\n"); 136 json_free_object(root); 137 root = NULL; 138 clients_array = NULL; 139 du_array = NULL; 140 } 141 142 static struct fio_client *find_client_by_fd(int fd) 143 { 144 int bucket = hash_long(fd, FIO_CLIENT_HASH_BITS) & FIO_CLIENT_HASH_MASK; 145 struct fio_client *client; 146 struct flist_head *entry; 147 148 flist_for_each(entry, &client_hash[bucket]) { 149 client = flist_entry(entry, struct fio_client, hash_list); 150 151 if (client->fd == fd) { 152 client->refs++; 153 return client; 154 } 155 } 156 157 return NULL; 158 } 159 160 void fio_put_client(struct fio_client *client) 161 { 162 if (--client->refs) 163 return; 164 165 free(client->hostname); 166 if (client->argv) 167 free(client->argv); 168 if (client->name) 169 free(client->name); 170 while (client->nr_files) { 171 struct client_file *cf = &client->files[--client->nr_files]; 172 173 free(cf->file); 174 } 175 if (client->files) 176 free(client->files); 177 178 if (!client->did_stat) 179 sum_stat_clients--; 180 181 if (client->error) 182 error_clients++; 183 184 free(client); 185 } 186 187 static void remove_client(struct fio_client *client) 188 { 189 assert(client->refs); 190 191 dprint(FD_NET, "client: removed <%s>\n", client->hostname); 192 193 if (!flist_empty(&client->list)) 194 flist_del_init(&client->list); 195 196 fio_client_remove_hash(client); 197 198 if (!flist_empty(&client->eta_list)) { 199 flist_del_init(&client->eta_list); 200 fio_client_dec_jobs_eta(client->eta_in_flight, client->ops->eta); 201 } 202 203 close(client->fd); 204 client->fd = -1; 205 206 if (client->ops->removed) 207 client->ops->removed(client); 208 209 nr_clients--; 210 fio_put_client(client); 211 } 212 213 struct fio_client *fio_get_client(struct fio_client *client) 214 { 215 client->refs++; 216 return client; 217 } 218 219 static void __fio_client_add_cmd_option(struct fio_client *client, 220 const char *opt) 221 { 222 int index; 223 224 index = client->argc++; 225 client->argv = realloc(client->argv, sizeof(char *) * client->argc); 226 client->argv[index] = strdup(opt); 227 dprint(FD_NET, "client: add cmd %d: %s\n", index, opt); 228 } 229 230 void fio_client_add_cmd_option(void *cookie, const char *opt) 231 { 232 struct fio_client *client = cookie; 233 struct flist_head *entry; 234 235 if (!client || !opt) 236 return; 237 238 __fio_client_add_cmd_option(client, opt); 239 240 /* 241 * Duplicate arguments to shared client group 242 */ 243 flist_for_each(entry, &arg_list) { 244 client = flist_entry(entry, struct fio_client, arg_list); 245 246 __fio_client_add_cmd_option(client, opt); 247 } 248 } 249 250 struct fio_client *fio_client_add_explicit(struct client_ops *ops, 251 const char *hostname, int type, 252 int port) 253 { 254 struct fio_client *client; 255 256 client = malloc(sizeof(*client)); 257 memset(client, 0, sizeof(*client)); 258 259 INIT_FLIST_HEAD(&client->list); 260 INIT_FLIST_HEAD(&client->hash_list); 261 INIT_FLIST_HEAD(&client->arg_list); 262 INIT_FLIST_HEAD(&client->eta_list); 263 INIT_FLIST_HEAD(&client->cmd_list); 264 265 client->hostname = strdup(hostname); 266 267 if (type == Fio_client_socket) 268 client->is_sock = 1; 269 else { 270 int ipv6; 271 272 ipv6 = type == Fio_client_ipv6; 273 if (fio_server_parse_host(hostname, ipv6, 274 &client->addr.sin_addr, 275 &client->addr6.sin6_addr)) 276 goto err; 277 278 client->port = port; 279 } 280 281 client->fd = -1; 282 client->ops = ops; 283 client->refs = 1; 284 client->type = ops->client_type; 285 286 __fio_client_add_cmd_option(client, "fio"); 287 288 flist_add(&client->list, &client_list); 289 nr_clients++; 290 dprint(FD_NET, "client: added <%s>\n", client->hostname); 291 return client; 292 err: 293 free(client); 294 return NULL; 295 } 296 297 int fio_client_add_ini_file(void *cookie, const char *ini_file, int remote) 298 { 299 struct fio_client *client = cookie; 300 struct client_file *cf; 301 size_t new_size; 302 void *new_files; 303 304 if (!client) 305 return 1; 306 307 dprint(FD_NET, "client <%s>: add ini %s\n", client->hostname, ini_file); 308 309 new_size = (client->nr_files + 1) * sizeof(struct client_file); 310 new_files = realloc(client->files, new_size); 311 if (!new_files) 312 return 1; 313 314 client->files = new_files; 315 cf = &client->files[client->nr_files]; 316 cf->file = strdup(ini_file); 317 cf->remote = remote; 318 client->nr_files++; 319 return 0; 320 } 321 322 int fio_client_add(struct client_ops *ops, const char *hostname, void **cookie) 323 { 324 struct fio_client *existing = *cookie; 325 struct fio_client *client; 326 327 if (existing) { 328 /* 329 * We always add our "exec" name as the option, hence 1 330 * means empty. 331 */ 332 if (existing->argc == 1) 333 flist_add_tail(&existing->arg_list, &arg_list); 334 else { 335 while (!flist_empty(&arg_list)) 336 flist_del_init(arg_list.next); 337 } 338 } 339 340 client = malloc(sizeof(*client)); 341 memset(client, 0, sizeof(*client)); 342 343 INIT_FLIST_HEAD(&client->list); 344 INIT_FLIST_HEAD(&client->hash_list); 345 INIT_FLIST_HEAD(&client->arg_list); 346 INIT_FLIST_HEAD(&client->eta_list); 347 INIT_FLIST_HEAD(&client->cmd_list); 348 349 if (fio_server_parse_string(hostname, &client->hostname, 350 &client->is_sock, &client->port, 351 &client->addr.sin_addr, 352 &client->addr6.sin6_addr, 353 &client->ipv6)) 354 return -1; 355 356 client->fd = -1; 357 client->ops = ops; 358 client->refs = 1; 359 client->type = ops->client_type; 360 361 __fio_client_add_cmd_option(client, "fio"); 362 363 flist_add(&client->list, &client_list); 364 nr_clients++; 365 dprint(FD_NET, "client: added <%s>\n", client->hostname); 366 *cookie = client; 367 return 0; 368 } 369 370 static const char *server_name(struct fio_client *client, char *buf, 371 size_t bufsize) 372 { 373 const char *from; 374 375 if (client->ipv6) 376 from = inet_ntop(AF_INET6, (struct sockaddr *) &client->addr6.sin6_addr, buf, bufsize); 377 else if (client->is_sock) 378 from = "sock"; 379 else 380 from = inet_ntop(AF_INET, (struct sockaddr *) &client->addr.sin_addr, buf, bufsize); 381 382 return from; 383 } 384 385 static void probe_client(struct fio_client *client) 386 { 387 struct cmd_client_probe_pdu pdu; 388 const char *sname; 389 uint64_t tag; 390 char buf[64]; 391 392 dprint(FD_NET, "client: send probe\n"); 393 394 #ifdef CONFIG_ZLIB 395 pdu.flags = __le64_to_cpu(FIO_PROBE_FLAG_ZLIB); 396 #else 397 pdu.flags = 0; 398 #endif 399 400 sname = server_name(client, buf, sizeof(buf)); 401 memset(pdu.server, 0, sizeof(pdu.server)); 402 strncpy((char *) pdu.server, sname, sizeof(pdu.server) - 1); 403 404 fio_net_send_cmd(client->fd, FIO_NET_CMD_PROBE, &pdu, sizeof(pdu), &tag, &client->cmd_list); 405 } 406 407 static int fio_client_connect_ip(struct fio_client *client) 408 { 409 struct sockaddr *addr; 410 socklen_t socklen; 411 int fd, domain; 412 413 if (client->ipv6) { 414 client->addr6.sin6_family = AF_INET6; 415 client->addr6.sin6_port = htons(client->port); 416 domain = AF_INET6; 417 addr = (struct sockaddr *) &client->addr6; 418 socklen = sizeof(client->addr6); 419 } else { 420 client->addr.sin_family = AF_INET; 421 client->addr.sin_port = htons(client->port); 422 domain = AF_INET; 423 addr = (struct sockaddr *) &client->addr; 424 socklen = sizeof(client->addr); 425 } 426 427 fd = socket(domain, SOCK_STREAM, 0); 428 if (fd < 0) { 429 int ret = -errno; 430 431 log_err("fio: socket: %s\n", strerror(errno)); 432 return ret; 433 } 434 435 if (connect(fd, addr, socklen) < 0) { 436 int ret = -errno; 437 438 log_err("fio: connect: %s\n", strerror(errno)); 439 log_err("fio: failed to connect to %s:%u\n", client->hostname, 440 client->port); 441 close(fd); 442 return ret; 443 } 444 445 return fd; 446 } 447 448 static int fio_client_connect_sock(struct fio_client *client) 449 { 450 struct sockaddr_un *addr = &client->addr_un; 451 socklen_t len; 452 int fd; 453 454 memset(addr, 0, sizeof(*addr)); 455 addr->sun_family = AF_UNIX; 456 strncpy(addr->sun_path, client->hostname, sizeof(addr->sun_path) - 1); 457 458 fd = socket(AF_UNIX, SOCK_STREAM, 0); 459 if (fd < 0) { 460 int ret = -errno; 461 462 log_err("fio: socket: %s\n", strerror(errno)); 463 return ret; 464 } 465 466 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1; 467 if (connect(fd, (struct sockaddr *) addr, len) < 0) { 468 int ret = -errno; 469 470 log_err("fio: connect; %s\n", strerror(errno)); 471 close(fd); 472 return ret; 473 } 474 475 return fd; 476 } 477 478 int fio_client_connect(struct fio_client *client) 479 { 480 int fd; 481 482 dprint(FD_NET, "client: connect to host %s\n", client->hostname); 483 484 if (client->is_sock) 485 fd = fio_client_connect_sock(client); 486 else 487 fd = fio_client_connect_ip(client); 488 489 dprint(FD_NET, "client: %s connected %d\n", client->hostname, fd); 490 491 if (fd < 0) 492 return fd; 493 494 client->fd = fd; 495 fio_client_add_hash(client); 496 client->state = Client_connected; 497 498 probe_client(client); 499 return 0; 500 } 501 502 int fio_client_terminate(struct fio_client *client) 503 { 504 return fio_net_send_quit(client->fd); 505 } 506 507 void fio_clients_terminate(void) 508 { 509 struct flist_head *entry; 510 struct fio_client *client; 511 512 dprint(FD_NET, "client: terminate clients\n"); 513 514 flist_for_each(entry, &client_list) { 515 client = flist_entry(entry, struct fio_client, list); 516 fio_client_terminate(client); 517 } 518 } 519 520 static void sig_int(int sig) 521 { 522 dprint(FD_NET, "client: got signal %d\n", sig); 523 fio_clients_terminate(); 524 } 525 526 static void sig_show_status(int sig) 527 { 528 show_running_run_stats(); 529 } 530 531 static void client_signal_handler(void) 532 { 533 struct sigaction act; 534 535 memset(&act, 0, sizeof(act)); 536 act.sa_handler = sig_int; 537 act.sa_flags = SA_RESTART; 538 sigaction(SIGINT, &act, NULL); 539 540 memset(&act, 0, sizeof(act)); 541 act.sa_handler = sig_int; 542 act.sa_flags = SA_RESTART; 543 sigaction(SIGTERM, &act, NULL); 544 545 /* Windows uses SIGBREAK as a quit signal from other applications */ 546 #ifdef WIN32 547 memset(&act, 0, sizeof(act)); 548 act.sa_handler = sig_int; 549 act.sa_flags = SA_RESTART; 550 sigaction(SIGBREAK, &act, NULL); 551 #endif 552 553 memset(&act, 0, sizeof(act)); 554 act.sa_handler = sig_show_status; 555 act.sa_flags = SA_RESTART; 556 sigaction(SIGUSR1, &act, NULL); 557 } 558 559 static int send_client_cmd_line(struct fio_client *client) 560 { 561 struct cmd_single_line_pdu *cslp; 562 struct cmd_line_pdu *clp; 563 unsigned long offset; 564 unsigned int *lens; 565 void *pdu; 566 size_t mem; 567 int i, ret; 568 569 dprint(FD_NET, "client: send cmdline %d\n", client->argc); 570 571 lens = malloc(client->argc * sizeof(unsigned int)); 572 573 /* 574 * Find out how much mem we need 575 */ 576 for (i = 0, mem = 0; i < client->argc; i++) { 577 lens[i] = strlen(client->argv[i]) + 1; 578 mem += lens[i]; 579 } 580 581 /* 582 * We need one cmd_line_pdu, and argc number of cmd_single_line_pdu 583 */ 584 mem += sizeof(*clp) + (client->argc * sizeof(*cslp)); 585 586 pdu = malloc(mem); 587 clp = pdu; 588 offset = sizeof(*clp); 589 590 for (i = 0; i < client->argc; i++) { 591 uint16_t arg_len = lens[i]; 592 593 cslp = pdu + offset; 594 strcpy((char *) cslp->text, client->argv[i]); 595 cslp->len = cpu_to_le16(arg_len); 596 offset += sizeof(*cslp) + arg_len; 597 } 598 599 free(lens); 600 clp->lines = cpu_to_le16(client->argc); 601 clp->client_type = __cpu_to_le16(client->type); 602 ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, NULL, NULL); 603 free(pdu); 604 return ret; 605 } 606 607 int fio_clients_connect(void) 608 { 609 struct fio_client *client; 610 struct flist_head *entry, *tmp; 611 int ret; 612 613 #ifdef WIN32 614 WSADATA wsd; 615 WSAStartup(MAKEWORD(2, 2), &wsd); 616 #endif 617 618 dprint(FD_NET, "client: connect all\n"); 619 620 client_signal_handler(); 621 622 flist_for_each_safe(entry, tmp, &client_list) { 623 client = flist_entry(entry, struct fio_client, list); 624 625 ret = fio_client_connect(client); 626 if (ret) { 627 remove_client(client); 628 continue; 629 } 630 631 if (client->argc > 1) 632 send_client_cmd_line(client); 633 } 634 635 return !nr_clients; 636 } 637 638 int fio_start_client(struct fio_client *client) 639 { 640 dprint(FD_NET, "client: start %s\n", client->hostname); 641 return fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_RUN, 0, NULL); 642 } 643 644 int fio_start_all_clients(void) 645 { 646 struct fio_client *client; 647 struct flist_head *entry, *tmp; 648 int ret; 649 650 dprint(FD_NET, "client: start all\n"); 651 652 fio_client_json_init(); 653 654 flist_for_each_safe(entry, tmp, &client_list) { 655 client = flist_entry(entry, struct fio_client, list); 656 657 ret = fio_start_client(client); 658 if (ret) { 659 remove_client(client); 660 continue; 661 } 662 } 663 664 return flist_empty(&client_list); 665 } 666 667 static int __fio_client_send_remote_ini(struct fio_client *client, 668 const char *filename) 669 { 670 struct cmd_load_file_pdu *pdu; 671 size_t p_size; 672 int ret; 673 674 dprint(FD_NET, "send remote ini %s to %s\n", filename, client->hostname); 675 676 p_size = sizeof(*pdu) + strlen(filename) + 1; 677 pdu = malloc(p_size); 678 memset(pdu, 0, p_size); 679 pdu->name_len = strlen(filename); 680 strcpy((char *) pdu->file, filename); 681 pdu->client_type = cpu_to_le16((uint16_t) client->type); 682 683 client->sent_job = 1; 684 ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_LOAD_FILE, pdu, p_size,NULL, NULL); 685 free(pdu); 686 return ret; 687 } 688 689 /* 690 * Send file contents to server backend. We could use sendfile(), but to remain 691 * more portable lets just read/write the darn thing. 692 */ 693 static int __fio_client_send_local_ini(struct fio_client *client, 694 const char *filename) 695 { 696 struct cmd_job_pdu *pdu; 697 size_t p_size; 698 struct stat sb; 699 char *p; 700 void *buf; 701 off_t len; 702 int fd, ret; 703 704 dprint(FD_NET, "send ini %s to %s\n", filename, client->hostname); 705 706 fd = open(filename, O_RDONLY); 707 if (fd < 0) { 708 ret = -errno; 709 log_err("fio: job file <%s> open: %s\n", filename, strerror(errno)); 710 return ret; 711 } 712 713 if (fstat(fd, &sb) < 0) { 714 ret = -errno; 715 log_err("fio: job file stat: %s\n", strerror(errno)); 716 close(fd); 717 return ret; 718 } 719 720 p_size = sb.st_size + sizeof(*pdu); 721 pdu = malloc(p_size); 722 buf = pdu->buf; 723 724 len = sb.st_size; 725 p = buf; 726 if (read_data(fd, p, len)) { 727 log_err("fio: failed reading job file %s\n", filename); 728 close(fd); 729 free(pdu); 730 return 1; 731 } 732 733 pdu->buf_len = __cpu_to_le32(sb.st_size); 734 pdu->client_type = cpu_to_le32(client->type); 735 736 client->sent_job = 1; 737 ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, pdu, p_size, NULL, NULL); 738 free(pdu); 739 close(fd); 740 return ret; 741 } 742 743 int fio_client_send_ini(struct fio_client *client, const char *filename, 744 int remote) 745 { 746 int ret; 747 748 if (!remote) 749 ret = __fio_client_send_local_ini(client, filename); 750 else 751 ret = __fio_client_send_remote_ini(client, filename); 752 753 if (!ret) 754 client->sent_job = 1; 755 756 return ret; 757 } 758 759 static int fio_client_send_cf(struct fio_client *client, 760 struct client_file *cf) 761 { 762 return fio_client_send_ini(client, cf->file, cf->remote); 763 } 764 765 int fio_clients_send_ini(const char *filename) 766 { 767 struct fio_client *client; 768 struct flist_head *entry, *tmp; 769 770 flist_for_each_safe(entry, tmp, &client_list) { 771 client = flist_entry(entry, struct fio_client, list); 772 773 if (client->nr_files) { 774 int i; 775 776 for (i = 0; i < client->nr_files; i++) { 777 struct client_file *cf; 778 779 cf = &client->files[i]; 780 781 if (fio_client_send_cf(client, cf)) { 782 remove_client(client); 783 break; 784 } 785 } 786 } 787 if (client->sent_job) 788 continue; 789 if (!filename || fio_client_send_ini(client, filename, 0)) 790 remove_client(client); 791 } 792 793 return !nr_clients; 794 } 795 796 int fio_client_update_options(struct fio_client *client, 797 struct thread_options *o, uint64_t *tag) 798 { 799 struct cmd_add_job_pdu pdu; 800 801 pdu.thread_number = cpu_to_le32(client->thread_number); 802 pdu.groupid = cpu_to_le32(client->groupid); 803 convert_thread_options_to_net(&pdu.top, o); 804 805 return fio_net_send_cmd(client->fd, FIO_NET_CMD_UPDATE_JOB, &pdu, sizeof(pdu), tag, &client->cmd_list); 806 } 807 808 static void convert_io_stat(struct io_stat *dst, struct io_stat *src) 809 { 810 dst->max_val = le64_to_cpu(src->max_val); 811 dst->min_val = le64_to_cpu(src->min_val); 812 dst->samples = le64_to_cpu(src->samples); 813 814 /* 815 * Floats arrive as IEEE 754 encoded uint64_t, convert back to double 816 */ 817 dst->mean.u.f = fio_uint64_to_double(le64_to_cpu(dst->mean.u.i)); 818 dst->S.u.f = fio_uint64_to_double(le64_to_cpu(dst->S.u.i)); 819 } 820 821 static void convert_ts(struct thread_stat *dst, struct thread_stat *src) 822 { 823 int i, j; 824 825 dst->error = le32_to_cpu(src->error); 826 dst->thread_number = le32_to_cpu(src->thread_number); 827 dst->groupid = le32_to_cpu(src->groupid); 828 dst->pid = le32_to_cpu(src->pid); 829 dst->members = le32_to_cpu(src->members); 830 dst->unified_rw_rep = le32_to_cpu(src->unified_rw_rep); 831 832 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 833 convert_io_stat(&dst->clat_stat[i], &src->clat_stat[i]); 834 convert_io_stat(&dst->slat_stat[i], &src->slat_stat[i]); 835 convert_io_stat(&dst->lat_stat[i], &src->lat_stat[i]); 836 convert_io_stat(&dst->bw_stat[i], &src->bw_stat[i]); 837 } 838 839 dst->usr_time = le64_to_cpu(src->usr_time); 840 dst->sys_time = le64_to_cpu(src->sys_time); 841 dst->ctx = le64_to_cpu(src->ctx); 842 dst->minf = le64_to_cpu(src->minf); 843 dst->majf = le64_to_cpu(src->majf); 844 dst->clat_percentiles = le64_to_cpu(src->clat_percentiles); 845 dst->percentile_precision = le64_to_cpu(src->percentile_precision); 846 847 for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) { 848 fio_fp64_t *fps = &src->percentile_list[i]; 849 fio_fp64_t *fpd = &dst->percentile_list[i]; 850 851 fpd->u.f = fio_uint64_to_double(le64_to_cpu(fps->u.i)); 852 } 853 854 for (i = 0; i < FIO_IO_U_MAP_NR; i++) { 855 dst->io_u_map[i] = le32_to_cpu(src->io_u_map[i]); 856 dst->io_u_submit[i] = le32_to_cpu(src->io_u_submit[i]); 857 dst->io_u_complete[i] = le32_to_cpu(src->io_u_complete[i]); 858 } 859 860 for (i = 0; i < FIO_IO_U_LAT_U_NR; i++) { 861 dst->io_u_lat_u[i] = le32_to_cpu(src->io_u_lat_u[i]); 862 dst->io_u_lat_m[i] = le32_to_cpu(src->io_u_lat_m[i]); 863 } 864 865 for (i = 0; i < DDIR_RWDIR_CNT; i++) 866 for (j = 0; j < FIO_IO_U_PLAT_NR; j++) 867 dst->io_u_plat[i][j] = le32_to_cpu(src->io_u_plat[i][j]); 868 869 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 870 dst->total_io_u[i] = le64_to_cpu(src->total_io_u[i]); 871 dst->short_io_u[i] = le64_to_cpu(src->short_io_u[i]); 872 dst->drop_io_u[i] = le64_to_cpu(src->drop_io_u[i]); 873 } 874 875 dst->total_submit = le64_to_cpu(src->total_submit); 876 dst->total_complete = le64_to_cpu(src->total_complete); 877 878 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 879 dst->io_bytes[i] = le64_to_cpu(src->io_bytes[i]); 880 dst->runtime[i] = le64_to_cpu(src->runtime[i]); 881 } 882 883 dst->total_run_time = le64_to_cpu(src->total_run_time); 884 dst->continue_on_error = le16_to_cpu(src->continue_on_error); 885 dst->total_err_count = le64_to_cpu(src->total_err_count); 886 dst->first_error = le32_to_cpu(src->first_error); 887 dst->kb_base = le32_to_cpu(src->kb_base); 888 dst->unit_base = le32_to_cpu(src->unit_base); 889 890 dst->latency_depth = le32_to_cpu(src->latency_depth); 891 dst->latency_target = le64_to_cpu(src->latency_target); 892 dst->latency_window = le64_to_cpu(src->latency_window); 893 dst->latency_percentile.u.f = fio_uint64_to_double(le64_to_cpu(src->latency_percentile.u.i)); 894 } 895 896 static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src) 897 { 898 int i; 899 900 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 901 dst->max_run[i] = le64_to_cpu(src->max_run[i]); 902 dst->min_run[i] = le64_to_cpu(src->min_run[i]); 903 dst->max_bw[i] = le64_to_cpu(src->max_bw[i]); 904 dst->min_bw[i] = le64_to_cpu(src->min_bw[i]); 905 dst->io_kb[i] = le64_to_cpu(src->io_kb[i]); 906 dst->agg[i] = le64_to_cpu(src->agg[i]); 907 } 908 909 dst->kb_base = le32_to_cpu(src->kb_base); 910 dst->unit_base = le32_to_cpu(src->unit_base); 911 dst->groupid = le32_to_cpu(src->groupid); 912 dst->unified_rw_rep = le32_to_cpu(src->unified_rw_rep); 913 } 914 915 static void json_object_add_client_info(struct json_object *obj, 916 struct fio_client *client) 917 { 918 const char *hostname = client->hostname ? client->hostname : ""; 919 920 json_object_add_value_string(obj, "hostname", hostname); 921 json_object_add_value_int(obj, "port", client->port); 922 } 923 924 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd) 925 { 926 struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload; 927 struct json_object *tsobj; 928 929 tsobj = show_thread_status(&p->ts, &p->rs); 930 client->did_stat = 1; 931 if (tsobj) { 932 json_object_add_client_info(tsobj, client); 933 json_array_add_value_object(clients_array, tsobj); 934 } 935 936 if (sum_stat_clients <= 1) 937 return; 938 939 sum_thread_stats(&client_ts, &p->ts, sum_stat_nr); 940 sum_group_stats(&client_gs, &p->rs); 941 942 client_ts.members++; 943 client_ts.thread_number = p->ts.thread_number; 944 client_ts.groupid = p->ts.groupid; 945 client_ts.unified_rw_rep = p->ts.unified_rw_rep; 946 947 if (++sum_stat_nr == sum_stat_clients) { 948 strcpy(client_ts.name, "All clients"); 949 tsobj = show_thread_status(&client_ts, &client_gs); 950 if (tsobj) { 951 json_object_add_client_info(tsobj, client); 952 json_array_add_value_object(clients_array, tsobj); 953 } 954 } 955 } 956 957 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd) 958 { 959 struct group_run_stats *gs = (struct group_run_stats *) cmd->payload; 960 961 show_group_stats(gs); 962 } 963 964 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd) 965 { 966 struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload; 967 const char *buf = (const char *) pdu->buf; 968 const char *name; 969 int fio_unused ret; 970 971 name = client->name ? client->name : client->hostname; 972 973 if (!client->skip_newline) 974 fprintf(f_out, "<%s> ", name); 975 ret = fwrite(buf, pdu->buf_len, 1, f_out); 976 fflush(f_out); 977 client->skip_newline = strchr(buf, '\n') == NULL; 978 } 979 980 static void convert_agg(struct disk_util_agg *agg) 981 { 982 int i; 983 984 for (i = 0; i < 2; i++) { 985 agg->ios[i] = le64_to_cpu(agg->ios[i]); 986 agg->merges[i] = le64_to_cpu(agg->merges[i]); 987 agg->sectors[i] = le64_to_cpu(agg->sectors[i]); 988 agg->ticks[i] = le64_to_cpu(agg->ticks[i]); 989 } 990 991 agg->io_ticks = le64_to_cpu(agg->io_ticks); 992 agg->time_in_queue = le64_to_cpu(agg->time_in_queue); 993 agg->slavecount = le32_to_cpu(agg->slavecount); 994 agg->max_util.u.f = fio_uint64_to_double(le64_to_cpu(agg->max_util.u.i)); 995 } 996 997 static void convert_dus(struct disk_util_stat *dus) 998 { 999 int i; 1000 1001 for (i = 0; i < 2; i++) { 1002 dus->s.ios[i] = le64_to_cpu(dus->s.ios[i]); 1003 dus->s.merges[i] = le64_to_cpu(dus->s.merges[i]); 1004 dus->s.sectors[i] = le64_to_cpu(dus->s.sectors[i]); 1005 dus->s.ticks[i] = le64_to_cpu(dus->s.ticks[i]); 1006 } 1007 1008 dus->s.io_ticks = le64_to_cpu(dus->s.io_ticks); 1009 dus->s.time_in_queue = le64_to_cpu(dus->s.time_in_queue); 1010 dus->s.msec = le64_to_cpu(dus->s.msec); 1011 } 1012 1013 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd) 1014 { 1015 struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload; 1016 1017 if (!client->disk_stats_shown) { 1018 client->disk_stats_shown = 1; 1019 log_info("\nDisk stats (read/write):\n"); 1020 } 1021 1022 if (output_format == FIO_OUTPUT_JSON) { 1023 struct json_object *duobj; 1024 json_array_add_disk_util(&du->dus, &du->agg, du_array); 1025 duobj = json_array_last_value_object(du_array); 1026 json_object_add_client_info(duobj, client); 1027 } else 1028 print_disk_util(&du->dus, &du->agg, output_format == FIO_OUTPUT_TERSE); 1029 } 1030 1031 static void convert_jobs_eta(struct jobs_eta *je) 1032 { 1033 int i; 1034 1035 je->nr_running = le32_to_cpu(je->nr_running); 1036 je->nr_ramp = le32_to_cpu(je->nr_ramp); 1037 je->nr_pending = le32_to_cpu(je->nr_pending); 1038 je->nr_setting_up = le32_to_cpu(je->nr_setting_up); 1039 je->files_open = le32_to_cpu(je->files_open); 1040 1041 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 1042 je->m_rate[i] = le32_to_cpu(je->m_rate[i]); 1043 je->t_rate[i] = le32_to_cpu(je->t_rate[i]); 1044 je->m_iops[i] = le32_to_cpu(je->m_iops[i]); 1045 je->t_iops[i] = le32_to_cpu(je->t_iops[i]); 1046 je->rate[i] = le32_to_cpu(je->rate[i]); 1047 je->iops[i] = le32_to_cpu(je->iops[i]); 1048 } 1049 1050 je->elapsed_sec = le64_to_cpu(je->elapsed_sec); 1051 je->eta_sec = le64_to_cpu(je->eta_sec); 1052 je->nr_threads = le32_to_cpu(je->nr_threads); 1053 je->is_pow2 = le32_to_cpu(je->is_pow2); 1054 je->unit_base = le32_to_cpu(je->unit_base); 1055 } 1056 1057 void fio_client_sum_jobs_eta(struct jobs_eta *dst, struct jobs_eta *je) 1058 { 1059 int i; 1060 1061 dst->nr_running += je->nr_running; 1062 dst->nr_ramp += je->nr_ramp; 1063 dst->nr_pending += je->nr_pending; 1064 dst->nr_setting_up += je->nr_setting_up; 1065 dst->files_open += je->files_open; 1066 1067 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 1068 dst->m_rate[i] += je->m_rate[i]; 1069 dst->t_rate[i] += je->t_rate[i]; 1070 dst->m_iops[i] += je->m_iops[i]; 1071 dst->t_iops[i] += je->t_iops[i]; 1072 dst->rate[i] += je->rate[i]; 1073 dst->iops[i] += je->iops[i]; 1074 } 1075 1076 dst->elapsed_sec += je->elapsed_sec; 1077 1078 if (je->eta_sec > dst->eta_sec) 1079 dst->eta_sec = je->eta_sec; 1080 1081 dst->nr_threads += je->nr_threads; 1082 1083 /* 1084 * This wont be correct for multiple strings, but at least it 1085 * works for the basic cases. 1086 */ 1087 strcpy((char *) dst->run_str, (char *) je->run_str); 1088 } 1089 1090 void fio_client_dec_jobs_eta(struct client_eta *eta, client_eta_op eta_fn) 1091 { 1092 if (!--eta->pending) { 1093 eta_fn(&eta->eta); 1094 free(eta); 1095 } 1096 } 1097 1098 static void remove_reply_cmd(struct fio_client *client, struct fio_net_cmd *cmd) 1099 { 1100 struct fio_net_cmd_reply *reply = NULL; 1101 struct flist_head *entry; 1102 1103 flist_for_each(entry, &client->cmd_list) { 1104 reply = flist_entry(entry, struct fio_net_cmd_reply, list); 1105 1106 if (cmd->tag == (uintptr_t) reply) 1107 break; 1108 1109 reply = NULL; 1110 } 1111 1112 if (!reply) { 1113 log_err("fio: client: unable to find matching tag (%llx)\n", (unsigned long long) cmd->tag); 1114 return; 1115 } 1116 1117 flist_del(&reply->list); 1118 cmd->tag = reply->saved_tag; 1119 free(reply); 1120 } 1121 1122 int fio_client_wait_for_reply(struct fio_client *client, uint64_t tag) 1123 { 1124 do { 1125 struct fio_net_cmd_reply *reply = NULL; 1126 struct flist_head *entry; 1127 1128 flist_for_each(entry, &client->cmd_list) { 1129 reply = flist_entry(entry, struct fio_net_cmd_reply, list); 1130 1131 if (tag == (uintptr_t) reply) 1132 break; 1133 1134 reply = NULL; 1135 } 1136 1137 if (!reply) 1138 break; 1139 1140 usleep(1000); 1141 } while (1); 1142 1143 return 0; 1144 } 1145 1146 static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd) 1147 { 1148 struct jobs_eta *je = (struct jobs_eta *) cmd->payload; 1149 struct client_eta *eta = (struct client_eta *) (uintptr_t) cmd->tag; 1150 1151 dprint(FD_NET, "client: got eta tag %p, %d\n", eta, eta->pending); 1152 1153 assert(client->eta_in_flight == eta); 1154 1155 client->eta_in_flight = NULL; 1156 flist_del_init(&client->eta_list); 1157 1158 if (client->ops->jobs_eta) 1159 client->ops->jobs_eta(client, je); 1160 1161 fio_client_sum_jobs_eta(&eta->eta, je); 1162 fio_client_dec_jobs_eta(eta, client->ops->eta); 1163 } 1164 1165 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd) 1166 { 1167 struct cmd_probe_reply_pdu *probe = (struct cmd_probe_reply_pdu *) cmd->payload; 1168 const char *os, *arch; 1169 char bit[16]; 1170 1171 os = fio_get_os_string(probe->os); 1172 if (!os) 1173 os = "unknown"; 1174 1175 arch = fio_get_arch_string(probe->arch); 1176 if (!arch) 1177 os = "unknown"; 1178 1179 sprintf(bit, "%d-bit", probe->bpp * 8); 1180 probe->flags = le64_to_cpu(probe->flags); 1181 1182 log_info("hostname=%s, be=%u, %s, os=%s, arch=%s, fio=%s, flags=%lx\n", 1183 probe->hostname, probe->bigendian, bit, os, arch, 1184 probe->fio_version, (unsigned long) probe->flags); 1185 1186 if (!client->name) 1187 client->name = strdup((char *) probe->hostname); 1188 } 1189 1190 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd) 1191 { 1192 struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload; 1193 1194 client->state = Client_started; 1195 client->jobs = le32_to_cpu(pdu->jobs); 1196 client->nr_stat = le32_to_cpu(pdu->stat_outputs); 1197 1198 sum_stat_clients += client->nr_stat; 1199 } 1200 1201 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd) 1202 { 1203 if (client->error) 1204 log_info("client <%s>: exited with error %d\n", client->hostname, client->error); 1205 } 1206 1207 static void convert_stop(struct fio_net_cmd *cmd) 1208 { 1209 struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload; 1210 1211 pdu->error = le32_to_cpu(pdu->error); 1212 } 1213 1214 static void convert_text(struct fio_net_cmd *cmd) 1215 { 1216 struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload; 1217 1218 pdu->level = le32_to_cpu(pdu->level); 1219 pdu->buf_len = le32_to_cpu(pdu->buf_len); 1220 pdu->log_sec = le64_to_cpu(pdu->log_sec); 1221 pdu->log_usec = le64_to_cpu(pdu->log_usec); 1222 } 1223 1224 static struct cmd_iolog_pdu *convert_iolog_gz(struct fio_net_cmd *cmd, 1225 struct cmd_iolog_pdu *pdu) 1226 { 1227 #ifdef CONFIG_ZLIB 1228 struct cmd_iolog_pdu *ret; 1229 z_stream stream; 1230 uint32_t nr_samples; 1231 size_t total; 1232 void *p; 1233 1234 stream.zalloc = Z_NULL; 1235 stream.zfree = Z_NULL; 1236 stream.opaque = Z_NULL; 1237 stream.avail_in = 0; 1238 stream.next_in = Z_NULL; 1239 1240 if (inflateInit(&stream) != Z_OK) 1241 return NULL; 1242 1243 /* 1244 * Get header first, it's not compressed 1245 */ 1246 nr_samples = le64_to_cpu(pdu->nr_samples); 1247 1248 total = nr_samples * __log_entry_sz(le32_to_cpu(pdu->log_offset)); 1249 ret = malloc(total + sizeof(*pdu)); 1250 ret->nr_samples = nr_samples; 1251 1252 memcpy(ret, pdu, sizeof(*pdu)); 1253 1254 p = (void *) ret + sizeof(*pdu); 1255 1256 stream.avail_in = cmd->pdu_len - sizeof(*pdu); 1257 stream.next_in = (void *) pdu + sizeof(*pdu); 1258 while (stream.avail_in) { 1259 unsigned int this_chunk = 65536; 1260 unsigned int this_len; 1261 int err; 1262 1263 if (this_chunk > total) 1264 this_chunk = total; 1265 1266 stream.avail_out = this_chunk; 1267 stream.next_out = p; 1268 err = inflate(&stream, Z_NO_FLUSH); 1269 /* may be Z_OK, or Z_STREAM_END */ 1270 if (err < 0) { 1271 log_err("fio: inflate error %d\n", err); 1272 free(ret); 1273 ret = NULL; 1274 goto err; 1275 } 1276 1277 this_len = this_chunk - stream.avail_out; 1278 p += this_len; 1279 total -= this_len; 1280 } 1281 1282 err: 1283 inflateEnd(&stream); 1284 return ret; 1285 #else 1286 return NULL; 1287 #endif 1288 } 1289 1290 /* 1291 * This has been compressed on the server side, since it can be big. 1292 * Uncompress here. 1293 */ 1294 static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd) 1295 { 1296 struct cmd_iolog_pdu *pdu = (struct cmd_iolog_pdu *) cmd->payload; 1297 struct cmd_iolog_pdu *ret; 1298 uint64_t i; 1299 void *samples; 1300 1301 /* 1302 * Convert if compressed and we support it. If it's not 1303 * compressed, we need not do anything. 1304 */ 1305 if (le32_to_cpu(pdu->compressed)) { 1306 #ifndef CONFIG_ZLIB 1307 log_err("fio: server sent compressed data by mistake\n"); 1308 return NULL; 1309 #endif 1310 ret = convert_iolog_gz(cmd, pdu); 1311 if (!ret) { 1312 log_err("fio: failed decompressing log\n"); 1313 return NULL; 1314 } 1315 } else 1316 ret = pdu; 1317 1318 ret->nr_samples = le64_to_cpu(ret->nr_samples); 1319 ret->thread_number = le32_to_cpu(ret->thread_number); 1320 ret->log_type = le32_to_cpu(ret->log_type); 1321 ret->compressed = le32_to_cpu(ret->compressed); 1322 ret->log_offset = le32_to_cpu(ret->log_offset); 1323 1324 samples = &ret->samples[0]; 1325 for (i = 0; i < ret->nr_samples; i++) { 1326 struct io_sample *s; 1327 1328 s = __get_sample(samples, ret->log_offset, i); 1329 s->time = le64_to_cpu(s->time); 1330 s->val = le64_to_cpu(s->val); 1331 s->__ddir = le32_to_cpu(s->__ddir); 1332 s->bs = le32_to_cpu(s->bs); 1333 1334 if (ret->log_offset) { 1335 struct io_sample_offset *so = (void *) s; 1336 1337 so->offset = le64_to_cpu(so->offset); 1338 } 1339 } 1340 1341 return ret; 1342 } 1343 1344 static void sendfile_reply(int fd, struct cmd_sendfile_reply *rep, 1345 size_t size, uint64_t tag) 1346 { 1347 rep->error = cpu_to_le32(rep->error); 1348 fio_net_send_cmd(fd, FIO_NET_CMD_SENDFILE, rep, size, &tag, NULL); 1349 } 1350 1351 static int send_file(struct fio_client *client, struct cmd_sendfile *pdu, 1352 uint64_t tag) 1353 { 1354 struct cmd_sendfile_reply *rep; 1355 struct stat sb; 1356 size_t size; 1357 int fd; 1358 1359 size = sizeof(*rep); 1360 rep = malloc(size); 1361 1362 if (stat((char *)pdu->path, &sb) < 0) { 1363 fail: 1364 rep->error = errno; 1365 sendfile_reply(client->fd, rep, size, tag); 1366 free(rep); 1367 return 1; 1368 } 1369 1370 size += sb.st_size; 1371 rep = realloc(rep, size); 1372 rep->size = cpu_to_le32((uint32_t) sb.st_size); 1373 1374 fd = open((char *)pdu->path, O_RDONLY); 1375 if (fd == -1 ) 1376 goto fail; 1377 1378 rep->error = read_data(fd, &rep->data, sb.st_size); 1379 sendfile_reply(client->fd, rep, size, tag); 1380 free(rep); 1381 close(fd); 1382 return 0; 1383 } 1384 1385 int fio_handle_client(struct fio_client *client) 1386 { 1387 struct client_ops *ops = client->ops; 1388 struct fio_net_cmd *cmd; 1389 1390 dprint(FD_NET, "client: handle %s\n", client->hostname); 1391 1392 cmd = fio_net_recv_cmd(client->fd); 1393 if (!cmd) 1394 return 0; 1395 1396 dprint(FD_NET, "client: got cmd op %s from %s (pdu=%u)\n", 1397 fio_server_op(cmd->opcode), client->hostname, cmd->pdu_len); 1398 1399 switch (cmd->opcode) { 1400 case FIO_NET_CMD_QUIT: 1401 if (ops->quit) 1402 ops->quit(client, cmd); 1403 remove_client(client); 1404 break; 1405 case FIO_NET_CMD_TEXT: 1406 convert_text(cmd); 1407 ops->text(client, cmd); 1408 break; 1409 case FIO_NET_CMD_DU: { 1410 struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload; 1411 1412 convert_dus(&du->dus); 1413 convert_agg(&du->agg); 1414 1415 ops->disk_util(client, cmd); 1416 break; 1417 } 1418 case FIO_NET_CMD_TS: { 1419 struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload; 1420 1421 convert_ts(&p->ts, &p->ts); 1422 convert_gs(&p->rs, &p->rs); 1423 1424 ops->thread_status(client, cmd); 1425 break; 1426 } 1427 case FIO_NET_CMD_GS: { 1428 struct group_run_stats *gs = (struct group_run_stats *) cmd->payload; 1429 1430 convert_gs(gs, gs); 1431 1432 ops->group_stats(client, cmd); 1433 break; 1434 } 1435 case FIO_NET_CMD_ETA: { 1436 struct jobs_eta *je = (struct jobs_eta *) cmd->payload; 1437 1438 remove_reply_cmd(client, cmd); 1439 convert_jobs_eta(je); 1440 handle_eta(client, cmd); 1441 break; 1442 } 1443 case FIO_NET_CMD_PROBE: 1444 remove_reply_cmd(client, cmd); 1445 ops->probe(client, cmd); 1446 break; 1447 case FIO_NET_CMD_SERVER_START: 1448 client->state = Client_running; 1449 if (ops->job_start) 1450 ops->job_start(client, cmd); 1451 break; 1452 case FIO_NET_CMD_START: { 1453 struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload; 1454 1455 pdu->jobs = le32_to_cpu(pdu->jobs); 1456 ops->start(client, cmd); 1457 break; 1458 } 1459 case FIO_NET_CMD_STOP: { 1460 struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload; 1461 1462 convert_stop(cmd); 1463 client->state = Client_stopped; 1464 client->error = le32_to_cpu(pdu->error); 1465 client->signal = le32_to_cpu(pdu->signal); 1466 ops->stop(client, cmd); 1467 break; 1468 } 1469 case FIO_NET_CMD_ADD_JOB: { 1470 struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload; 1471 1472 client->thread_number = le32_to_cpu(pdu->thread_number); 1473 client->groupid = le32_to_cpu(pdu->groupid); 1474 1475 if (ops->add_job) 1476 ops->add_job(client, cmd); 1477 break; 1478 } 1479 case FIO_NET_CMD_IOLOG: 1480 if (ops->iolog) { 1481 struct cmd_iolog_pdu *pdu; 1482 1483 pdu = convert_iolog(cmd); 1484 ops->iolog(client, pdu); 1485 } 1486 break; 1487 case FIO_NET_CMD_UPDATE_JOB: 1488 ops->update_job(client, cmd); 1489 remove_reply_cmd(client, cmd); 1490 break; 1491 case FIO_NET_CMD_VTRIGGER: { 1492 struct all_io_list *pdu = (struct all_io_list *) cmd->payload; 1493 char buf[64]; 1494 1495 __verify_save_state(pdu, server_name(client, buf, sizeof(buf))); 1496 exec_trigger(trigger_cmd); 1497 break; 1498 } 1499 case FIO_NET_CMD_SENDFILE: { 1500 struct cmd_sendfile *pdu = (struct cmd_sendfile *) cmd->payload; 1501 send_file(client, pdu, cmd->tag); 1502 break; 1503 } 1504 default: 1505 log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode)); 1506 break; 1507 } 1508 1509 free(cmd); 1510 return 1; 1511 } 1512 1513 int fio_clients_send_trigger(const char *cmd) 1514 { 1515 struct flist_head *entry; 1516 struct fio_client *client; 1517 size_t slen; 1518 1519 dprint(FD_NET, "client: send vtrigger: %s\n", cmd); 1520 1521 if (!cmd) 1522 slen = 0; 1523 else 1524 slen = strlen(cmd); 1525 1526 flist_for_each(entry, &client_list) { 1527 struct cmd_vtrigger_pdu *pdu; 1528 1529 client = flist_entry(entry, struct fio_client, list); 1530 1531 pdu = malloc(sizeof(*pdu) + slen); 1532 pdu->len = cpu_to_le16((uint16_t) slen); 1533 if (slen) 1534 memcpy(pdu->cmd, cmd, slen); 1535 fio_net_send_cmd(client->fd, FIO_NET_CMD_VTRIGGER, pdu, 1536 sizeof(*pdu) + slen, NULL, NULL); 1537 free(pdu); 1538 } 1539 1540 return 0; 1541 } 1542 1543 static void request_client_etas(struct client_ops *ops) 1544 { 1545 struct fio_client *client; 1546 struct flist_head *entry; 1547 struct client_eta *eta; 1548 int skipped = 0; 1549 1550 dprint(FD_NET, "client: request eta (%d)\n", nr_clients); 1551 1552 eta = calloc(1, sizeof(*eta) + __THREAD_RUNSTR_SZ(REAL_MAX_JOBS)); 1553 eta->pending = nr_clients; 1554 1555 flist_for_each(entry, &client_list) { 1556 client = flist_entry(entry, struct fio_client, list); 1557 1558 if (!flist_empty(&client->eta_list)) { 1559 skipped++; 1560 continue; 1561 } 1562 if (client->state != Client_running) 1563 continue; 1564 1565 assert(!client->eta_in_flight); 1566 flist_add_tail(&client->eta_list, &eta_list); 1567 client->eta_in_flight = eta; 1568 fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA, 1569 (uintptr_t) eta, &client->cmd_list); 1570 } 1571 1572 while (skipped--) 1573 fio_client_dec_jobs_eta(eta, ops->eta); 1574 1575 dprint(FD_NET, "client: requested eta tag %p\n", eta); 1576 } 1577 1578 static int client_check_cmd_timeout(struct fio_client *client, 1579 struct timeval *now) 1580 { 1581 struct fio_net_cmd_reply *reply; 1582 struct flist_head *entry, *tmp; 1583 int ret = 0; 1584 1585 flist_for_each_safe(entry, tmp, &client->cmd_list) { 1586 reply = flist_entry(entry, struct fio_net_cmd_reply, list); 1587 1588 if (mtime_since(&reply->tv, now) < FIO_NET_CLIENT_TIMEOUT) 1589 continue; 1590 1591 log_err("fio: client %s, timeout on cmd %s\n", client->hostname, 1592 fio_server_op(reply->opcode)); 1593 flist_del(&reply->list); 1594 free(reply); 1595 ret = 1; 1596 } 1597 1598 return flist_empty(&client->cmd_list) && ret; 1599 } 1600 1601 static int fio_check_clients_timed_out(void) 1602 { 1603 struct fio_client *client; 1604 struct flist_head *entry, *tmp; 1605 struct timeval tv; 1606 int ret = 0; 1607 1608 fio_gettime(&tv, NULL); 1609 1610 flist_for_each_safe(entry, tmp, &client_list) { 1611 client = flist_entry(entry, struct fio_client, list); 1612 1613 if (flist_empty(&client->cmd_list)) 1614 continue; 1615 1616 if (!client_check_cmd_timeout(client, &tv)) 1617 continue; 1618 1619 if (client->ops->timed_out) 1620 client->ops->timed_out(client); 1621 else 1622 log_err("fio: client %s timed out\n", client->hostname); 1623 1624 client->error = ETIMEDOUT; 1625 remove_client(client); 1626 ret = 1; 1627 } 1628 1629 return ret; 1630 } 1631 1632 int fio_handle_clients(struct client_ops *ops) 1633 { 1634 struct pollfd *pfds; 1635 int i, ret = 0, retval = 0; 1636 1637 fio_gettime(&eta_tv, NULL); 1638 1639 pfds = malloc(nr_clients * sizeof(struct pollfd)); 1640 1641 init_thread_stat(&client_ts); 1642 init_group_run_stat(&client_gs); 1643 1644 while (!exit_backend && nr_clients) { 1645 struct flist_head *entry, *tmp; 1646 struct fio_client *client; 1647 1648 i = 0; 1649 flist_for_each_safe(entry, tmp, &client_list) { 1650 client = flist_entry(entry, struct fio_client, list); 1651 1652 if (!client->sent_job && !client->ops->stay_connected && 1653 flist_empty(&client->cmd_list)) { 1654 remove_client(client); 1655 continue; 1656 } 1657 1658 pfds[i].fd = client->fd; 1659 pfds[i].events = POLLIN; 1660 i++; 1661 } 1662 1663 if (!nr_clients) 1664 break; 1665 1666 assert(i == nr_clients); 1667 1668 do { 1669 struct timeval tv; 1670 int timeout; 1671 1672 fio_gettime(&tv, NULL); 1673 if (mtime_since(&eta_tv, &tv) >= 900) { 1674 request_client_etas(ops); 1675 memcpy(&eta_tv, &tv, sizeof(tv)); 1676 1677 if (fio_check_clients_timed_out()) 1678 break; 1679 } 1680 1681 check_trigger_file(); 1682 1683 timeout = min(100u, ops->eta_msec); 1684 1685 ret = poll(pfds, nr_clients, timeout); 1686 if (ret < 0) { 1687 if (errno == EINTR) 1688 continue; 1689 log_err("fio: poll clients: %s\n", strerror(errno)); 1690 break; 1691 } else if (!ret) 1692 continue; 1693 } while (ret <= 0); 1694 1695 for (i = 0; i < nr_clients; i++) { 1696 if (!(pfds[i].revents & POLLIN)) 1697 continue; 1698 1699 client = find_client_by_fd(pfds[i].fd); 1700 if (!client) { 1701 log_err("fio: unknown client fd %ld\n", (long) pfds[i].fd); 1702 continue; 1703 } 1704 if (!fio_handle_client(client)) { 1705 log_info("client: host=%s disconnected\n", 1706 client->hostname); 1707 remove_client(client); 1708 retval = 1; 1709 } else if (client->error) 1710 retval = 1; 1711 fio_put_client(client); 1712 } 1713 } 1714 1715 fio_client_json_fini(); 1716 1717 free(pfds); 1718 return retval || error_clients; 1719 } 1720