1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <unistd.h> 4 #include <limits.h> 5 #include <errno.h> 6 #include <fcntl.h> 7 #include <sys/poll.h> 8 #include <sys/types.h> 9 #include <sys/stat.h> 10 #include <sys/wait.h> 11 #include <sys/socket.h> 12 #include <sys/un.h> 13 #include <netinet/in.h> 14 #include <arpa/inet.h> 15 #include <netdb.h> 16 #include <signal.h> 17 #ifdef CONFIG_ZLIB 18 #include <zlib.h> 19 #endif 20 21 #include "fio.h" 22 #include "client.h" 23 #include "server.h" 24 #include "flist.h" 25 #include "hash.h" 26 27 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd); 28 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd); 29 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd); 30 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd); 31 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd); 32 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd); 33 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd); 34 35 struct client_ops fio_client_ops = { 36 .text = handle_text, 37 .disk_util = handle_du, 38 .thread_status = handle_ts, 39 .group_stats = handle_gs, 40 .stop = handle_stop, 41 .start = handle_start, 42 .eta = display_thread_status, 43 .probe = handle_probe, 44 .eta_msec = FIO_CLIENT_DEF_ETA_MSEC, 45 .client_type = FIO_CLIENT_TYPE_CLI, 46 }; 47 48 static struct timeval eta_tv; 49 50 static FLIST_HEAD(client_list); 51 static FLIST_HEAD(eta_list); 52 53 static FLIST_HEAD(arg_list); 54 55 struct thread_stat client_ts; 56 struct group_run_stats client_gs; 57 int sum_stat_clients; 58 59 static int sum_stat_nr; 60 static struct json_object *root = NULL; 61 static struct json_array *clients_array = NULL; 62 static struct json_array *du_array = NULL; 63 static int do_output_all_clients; 64 65 #define FIO_CLIENT_HASH_BITS 7 66 #define FIO_CLIENT_HASH_SZ (1 << FIO_CLIENT_HASH_BITS) 67 #define FIO_CLIENT_HASH_MASK (FIO_CLIENT_HASH_SZ - 1) 68 static struct flist_head client_hash[FIO_CLIENT_HASH_SZ]; 69 70 static void fio_client_add_hash(struct fio_client *client) 71 { 72 int bucket = hash_long(client->fd, FIO_CLIENT_HASH_BITS); 73 74 bucket &= FIO_CLIENT_HASH_MASK; 75 flist_add(&client->hash_list, &client_hash[bucket]); 76 } 77 78 static void fio_client_remove_hash(struct fio_client *client) 79 { 80 if (!flist_empty(&client->hash_list)) 81 flist_del_init(&client->hash_list); 82 } 83 84 static void fio_init fio_client_hash_init(void) 85 { 86 int i; 87 88 for (i = 0; i < FIO_CLIENT_HASH_SZ; i++) 89 INIT_FLIST_HEAD(&client_hash[i]); 90 } 91 92 static void fio_client_json_init(void) 93 { 94 if (output_format != FIO_OUTPUT_JSON) 95 return; 96 root = json_create_object(); 97 json_object_add_value_string(root, "fio version", fio_version_string); 98 clients_array = json_create_array(); 99 json_object_add_value_array(root, "client_stats", clients_array); 100 du_array = json_create_array(); 101 json_object_add_value_array(root, "disk_util", du_array); 102 } 103 104 static void fio_client_json_fini(void) 105 { 106 if (output_format != FIO_OUTPUT_JSON) 107 return; 108 json_print_object(root); 109 log_info("\n"); 110 json_free_object(root); 111 root = NULL; 112 clients_array = NULL; 113 du_array = NULL; 114 } 115 116 static struct fio_client *find_client_by_fd(int fd) 117 { 118 int bucket = hash_long(fd, FIO_CLIENT_HASH_BITS) & FIO_CLIENT_HASH_MASK; 119 struct fio_client *client; 120 struct flist_head *entry; 121 122 flist_for_each(entry, &client_hash[bucket]) { 123 client = flist_entry(entry, struct fio_client, hash_list); 124 125 if (client->fd == fd) { 126 client->refs++; 127 return client; 128 } 129 } 130 131 return NULL; 132 } 133 134 void fio_put_client(struct fio_client *client) 135 { 136 if (--client->refs) 137 return; 138 139 free(client->hostname); 140 if (client->argv) 141 free(client->argv); 142 if (client->name) 143 free(client->name); 144 while (client->nr_ini_file) 145 free(client->ini_file[--client->nr_ini_file]); 146 if (client->ini_file) 147 free(client->ini_file); 148 149 if (!client->did_stat) 150 sum_stat_clients -= client->nr_stat; 151 152 free(client); 153 } 154 155 static void remove_client(struct fio_client *client) 156 { 157 assert(client->refs); 158 159 dprint(FD_NET, "client: removed <%s>\n", client->hostname); 160 161 if (!flist_empty(&client->list)) 162 flist_del_init(&client->list); 163 164 fio_client_remove_hash(client); 165 166 if (!flist_empty(&client->eta_list)) { 167 flist_del_init(&client->eta_list); 168 fio_client_dec_jobs_eta(client->eta_in_flight, client->ops->eta); 169 } 170 171 close(client->fd); 172 client->fd = -1; 173 174 if (client->ops->removed) 175 client->ops->removed(client); 176 177 nr_clients--; 178 fio_put_client(client); 179 } 180 181 struct fio_client *fio_get_client(struct fio_client *client) 182 { 183 client->refs++; 184 return client; 185 } 186 187 static void __fio_client_add_cmd_option(struct fio_client *client, 188 const char *opt) 189 { 190 int index; 191 192 index = client->argc++; 193 client->argv = realloc(client->argv, sizeof(char *) * client->argc); 194 client->argv[index] = strdup(opt); 195 dprint(FD_NET, "client: add cmd %d: %s\n", index, opt); 196 } 197 198 void fio_client_add_cmd_option(void *cookie, const char *opt) 199 { 200 struct fio_client *client = cookie; 201 struct flist_head *entry; 202 203 if (!client || !opt) 204 return; 205 206 __fio_client_add_cmd_option(client, opt); 207 208 /* 209 * Duplicate arguments to shared client group 210 */ 211 flist_for_each(entry, &arg_list) { 212 client = flist_entry(entry, struct fio_client, arg_list); 213 214 __fio_client_add_cmd_option(client, opt); 215 } 216 } 217 218 struct fio_client *fio_client_add_explicit(struct client_ops *ops, 219 const char *hostname, int type, 220 int port) 221 { 222 struct fio_client *client; 223 224 client = malloc(sizeof(*client)); 225 memset(client, 0, sizeof(*client)); 226 227 INIT_FLIST_HEAD(&client->list); 228 INIT_FLIST_HEAD(&client->hash_list); 229 INIT_FLIST_HEAD(&client->arg_list); 230 INIT_FLIST_HEAD(&client->eta_list); 231 INIT_FLIST_HEAD(&client->cmd_list); 232 233 client->hostname = strdup(hostname); 234 235 if (type == Fio_client_socket) 236 client->is_sock = 1; 237 else { 238 int ipv6; 239 240 ipv6 = type == Fio_client_ipv6; 241 if (fio_server_parse_host(hostname, ipv6, 242 &client->addr.sin_addr, 243 &client->addr6.sin6_addr)) 244 goto err; 245 246 client->port = port; 247 } 248 249 client->fd = -1; 250 client->ops = ops; 251 client->refs = 1; 252 client->type = ops->client_type; 253 254 __fio_client_add_cmd_option(client, "fio"); 255 256 flist_add(&client->list, &client_list); 257 nr_clients++; 258 dprint(FD_NET, "client: added <%s>\n", client->hostname); 259 return client; 260 err: 261 free(client); 262 return NULL; 263 } 264 265 void fio_client_add_ini_file(void *cookie, const char *ini_file) 266 { 267 struct fio_client *client = cookie; 268 size_t new_size; 269 270 dprint(FD_NET, "client <%s>: add ini %s\n", client->hostname, ini_file); 271 272 new_size = (client->nr_ini_file + 1) * sizeof(char *); 273 client->ini_file = realloc(client->ini_file, new_size); 274 client->ini_file[client->nr_ini_file] = strdup(ini_file); 275 client->nr_ini_file++; 276 } 277 278 int fio_client_add(struct client_ops *ops, const char *hostname, void **cookie) 279 { 280 struct fio_client *existing = *cookie; 281 struct fio_client *client; 282 283 if (existing) { 284 /* 285 * We always add our "exec" name as the option, hence 1 286 * means empty. 287 */ 288 if (existing->argc == 1) 289 flist_add_tail(&existing->arg_list, &arg_list); 290 else { 291 while (!flist_empty(&arg_list)) 292 flist_del_init(arg_list.next); 293 } 294 } 295 296 client = malloc(sizeof(*client)); 297 memset(client, 0, sizeof(*client)); 298 299 INIT_FLIST_HEAD(&client->list); 300 INIT_FLIST_HEAD(&client->hash_list); 301 INIT_FLIST_HEAD(&client->arg_list); 302 INIT_FLIST_HEAD(&client->eta_list); 303 INIT_FLIST_HEAD(&client->cmd_list); 304 305 if (fio_server_parse_string(hostname, &client->hostname, 306 &client->is_sock, &client->port, 307 &client->addr.sin_addr, 308 &client->addr6.sin6_addr, 309 &client->ipv6)) 310 return -1; 311 312 client->fd = -1; 313 client->ops = ops; 314 client->refs = 1; 315 client->type = ops->client_type; 316 317 __fio_client_add_cmd_option(client, "fio"); 318 319 flist_add(&client->list, &client_list); 320 nr_clients++; 321 dprint(FD_NET, "client: added <%s>\n", client->hostname); 322 *cookie = client; 323 return 0; 324 } 325 326 static void probe_client(struct fio_client *client) 327 { 328 struct cmd_client_probe_pdu pdu; 329 uint64_t tag; 330 331 dprint(FD_NET, "client: send probe\n"); 332 333 #ifdef CONFIG_ZLIB 334 pdu.flags = __le64_to_cpu(FIO_PROBE_FLAG_ZLIB); 335 #else 336 pdu.flags = 0; 337 #endif 338 339 fio_net_send_cmd(client->fd, FIO_NET_CMD_PROBE, &pdu, sizeof(pdu), &tag, &client->cmd_list); 340 } 341 342 static int fio_client_connect_ip(struct fio_client *client) 343 { 344 struct sockaddr *addr; 345 socklen_t socklen; 346 int fd, domain; 347 348 if (client->ipv6) { 349 client->addr6.sin6_family = AF_INET6; 350 client->addr6.sin6_port = htons(client->port); 351 domain = AF_INET6; 352 addr = (struct sockaddr *) &client->addr6; 353 socklen = sizeof(client->addr6); 354 } else { 355 client->addr.sin_family = AF_INET; 356 client->addr.sin_port = htons(client->port); 357 domain = AF_INET; 358 addr = (struct sockaddr *) &client->addr; 359 socklen = sizeof(client->addr); 360 } 361 362 fd = socket(domain, SOCK_STREAM, 0); 363 if (fd < 0) { 364 int ret = -errno; 365 366 log_err("fio: socket: %s\n", strerror(errno)); 367 return ret; 368 } 369 370 if (connect(fd, addr, socklen) < 0) { 371 int ret = -errno; 372 373 log_err("fio: connect: %s\n", strerror(errno)); 374 log_err("fio: failed to connect to %s:%u\n", client->hostname, 375 client->port); 376 close(fd); 377 return ret; 378 } 379 380 return fd; 381 } 382 383 static int fio_client_connect_sock(struct fio_client *client) 384 { 385 struct sockaddr_un *addr = &client->addr_un; 386 socklen_t len; 387 int fd; 388 389 memset(addr, 0, sizeof(*addr)); 390 addr->sun_family = AF_UNIX; 391 strncpy(addr->sun_path, client->hostname, sizeof(addr->sun_path) - 1); 392 393 fd = socket(AF_UNIX, SOCK_STREAM, 0); 394 if (fd < 0) { 395 int ret = -errno; 396 397 log_err("fio: socket: %s\n", strerror(errno)); 398 return ret; 399 } 400 401 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1; 402 if (connect(fd, (struct sockaddr *) addr, len) < 0) { 403 int ret = -errno; 404 405 log_err("fio: connect; %s\n", strerror(errno)); 406 close(fd); 407 return ret; 408 } 409 410 return fd; 411 } 412 413 int fio_client_connect(struct fio_client *client) 414 { 415 int fd; 416 417 dprint(FD_NET, "client: connect to host %s\n", client->hostname); 418 419 if (client->is_sock) 420 fd = fio_client_connect_sock(client); 421 else 422 fd = fio_client_connect_ip(client); 423 424 dprint(FD_NET, "client: %s connected %d\n", client->hostname, fd); 425 426 if (fd < 0) 427 return fd; 428 429 client->fd = fd; 430 fio_client_add_hash(client); 431 client->state = Client_connected; 432 433 probe_client(client); 434 return 0; 435 } 436 437 int fio_client_terminate(struct fio_client *client) 438 { 439 return fio_net_send_quit(client->fd); 440 } 441 442 void fio_clients_terminate(void) 443 { 444 struct flist_head *entry; 445 struct fio_client *client; 446 447 dprint(FD_NET, "client: terminate clients\n"); 448 449 flist_for_each(entry, &client_list) { 450 client = flist_entry(entry, struct fio_client, list); 451 fio_client_terminate(client); 452 } 453 } 454 455 static void sig_int(int sig) 456 { 457 dprint(FD_NET, "client: got signal %d\n", sig); 458 fio_clients_terminate(); 459 } 460 461 static void sig_show_status(int sig) 462 { 463 show_running_run_stats(); 464 } 465 466 static void client_signal_handler(void) 467 { 468 struct sigaction act; 469 470 memset(&act, 0, sizeof(act)); 471 act.sa_handler = sig_int; 472 act.sa_flags = SA_RESTART; 473 sigaction(SIGINT, &act, NULL); 474 475 memset(&act, 0, sizeof(act)); 476 act.sa_handler = sig_int; 477 act.sa_flags = SA_RESTART; 478 sigaction(SIGTERM, &act, NULL); 479 480 /* Windows uses SIGBREAK as a quit signal from other applications */ 481 #ifdef WIN32 482 memset(&act, 0, sizeof(act)); 483 act.sa_handler = sig_int; 484 act.sa_flags = SA_RESTART; 485 sigaction(SIGBREAK, &act, NULL); 486 #endif 487 488 memset(&act, 0, sizeof(act)); 489 act.sa_handler = sig_show_status; 490 act.sa_flags = SA_RESTART; 491 sigaction(SIGUSR1, &act, NULL); 492 } 493 494 static int send_client_cmd_line(struct fio_client *client) 495 { 496 struct cmd_single_line_pdu *cslp; 497 struct cmd_line_pdu *clp; 498 unsigned long offset; 499 unsigned int *lens; 500 void *pdu; 501 size_t mem; 502 int i, ret; 503 504 dprint(FD_NET, "client: send cmdline %d\n", client->argc); 505 506 lens = malloc(client->argc * sizeof(unsigned int)); 507 508 /* 509 * Find out how much mem we need 510 */ 511 for (i = 0, mem = 0; i < client->argc; i++) { 512 lens[i] = strlen(client->argv[i]) + 1; 513 mem += lens[i]; 514 } 515 516 /* 517 * We need one cmd_line_pdu, and argc number of cmd_single_line_pdu 518 */ 519 mem += sizeof(*clp) + (client->argc * sizeof(*cslp)); 520 521 pdu = malloc(mem); 522 clp = pdu; 523 offset = sizeof(*clp); 524 525 for (i = 0; i < client->argc; i++) { 526 uint16_t arg_len = lens[i]; 527 528 cslp = pdu + offset; 529 strcpy((char *) cslp->text, client->argv[i]); 530 cslp->len = cpu_to_le16(arg_len); 531 offset += sizeof(*cslp) + arg_len; 532 } 533 534 free(lens); 535 clp->lines = cpu_to_le16(client->argc); 536 clp->client_type = __cpu_to_le16(client->type); 537 ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, NULL, NULL); 538 free(pdu); 539 return ret; 540 } 541 542 int fio_clients_connect(void) 543 { 544 struct fio_client *client; 545 struct flist_head *entry, *tmp; 546 int ret; 547 548 #ifdef WIN32 549 WSADATA wsd; 550 WSAStartup(MAKEWORD(2, 2), &wsd); 551 #endif 552 553 dprint(FD_NET, "client: connect all\n"); 554 555 client_signal_handler(); 556 557 flist_for_each_safe(entry, tmp, &client_list) { 558 client = flist_entry(entry, struct fio_client, list); 559 560 ret = fio_client_connect(client); 561 if (ret) { 562 remove_client(client); 563 continue; 564 } 565 566 if (client->argc > 1) 567 send_client_cmd_line(client); 568 } 569 570 return !nr_clients; 571 } 572 573 int fio_start_client(struct fio_client *client) 574 { 575 dprint(FD_NET, "client: start %s\n", client->hostname); 576 return fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_RUN, 0, NULL); 577 } 578 579 int fio_start_all_clients(void) 580 { 581 struct fio_client *client; 582 struct flist_head *entry, *tmp; 583 int ret; 584 585 dprint(FD_NET, "client: start all\n"); 586 587 fio_client_json_init(); 588 589 flist_for_each_safe(entry, tmp, &client_list) { 590 client = flist_entry(entry, struct fio_client, list); 591 592 ret = fio_start_client(client); 593 if (ret) { 594 remove_client(client); 595 continue; 596 } 597 } 598 599 return flist_empty(&client_list); 600 } 601 602 /* 603 * Send file contents to server backend. We could use sendfile(), but to remain 604 * more portable lets just read/write the darn thing. 605 */ 606 static int __fio_client_send_ini(struct fio_client *client, const char *filename) 607 { 608 struct cmd_job_pdu *pdu; 609 size_t p_size; 610 struct stat sb; 611 char *p; 612 void *buf; 613 off_t len; 614 int fd, ret; 615 616 dprint(FD_NET, "send ini %s to %s\n", filename, client->hostname); 617 618 fd = open(filename, O_RDONLY); 619 if (fd < 0) { 620 int ret = -errno; 621 622 log_err("fio: job file <%s> open: %s\n", filename, strerror(errno)); 623 return ret; 624 } 625 626 if (fstat(fd, &sb) < 0) { 627 int ret = -errno; 628 629 log_err("fio: job file stat: %s\n", strerror(errno)); 630 close(fd); 631 return ret; 632 } 633 634 p_size = sb.st_size + sizeof(*pdu); 635 pdu = malloc(p_size); 636 buf = pdu->buf; 637 638 len = sb.st_size; 639 p = buf; 640 do { 641 ret = read(fd, p, len); 642 if (ret > 0) { 643 len -= ret; 644 if (!len) 645 break; 646 p += ret; 647 continue; 648 } else if (!ret) 649 break; 650 else if (errno == EAGAIN || errno == EINTR) 651 continue; 652 } while (1); 653 654 if (len) { 655 log_err("fio: failed reading job file %s\n", filename); 656 close(fd); 657 free(pdu); 658 return 1; 659 } 660 661 pdu->buf_len = __cpu_to_le32(sb.st_size); 662 pdu->client_type = cpu_to_le32(client->type); 663 664 client->sent_job = 1; 665 ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, pdu, p_size, NULL, NULL); 666 free(pdu); 667 close(fd); 668 return ret; 669 } 670 671 int fio_client_send_ini(struct fio_client *client, const char *filename) 672 { 673 int ret; 674 675 ret = __fio_client_send_ini(client, filename); 676 if (!ret) 677 client->sent_job = 1; 678 679 return ret; 680 } 681 682 int fio_clients_send_ini(const char *filename) 683 { 684 struct fio_client *client; 685 struct flist_head *entry, *tmp; 686 687 flist_for_each_safe(entry, tmp, &client_list) { 688 client = flist_entry(entry, struct fio_client, list); 689 690 if (client->nr_ini_file) { 691 int i; 692 693 for (i = 0; i < client->nr_ini_file; i++) { 694 const char *ini = client->ini_file[i]; 695 696 if (fio_client_send_ini(client, ini)) { 697 remove_client(client); 698 break; 699 } 700 } 701 } else if (!filename || fio_client_send_ini(client, filename)) 702 remove_client(client); 703 } 704 705 return !nr_clients; 706 } 707 708 int fio_client_update_options(struct fio_client *client, 709 struct thread_options *o, uint64_t *tag) 710 { 711 struct cmd_add_job_pdu pdu; 712 713 pdu.thread_number = cpu_to_le32(client->thread_number); 714 pdu.groupid = cpu_to_le32(client->groupid); 715 convert_thread_options_to_net(&pdu.top, o); 716 717 return fio_net_send_cmd(client->fd, FIO_NET_CMD_UPDATE_JOB, &pdu, sizeof(pdu), tag, &client->cmd_list); 718 } 719 720 static void convert_io_stat(struct io_stat *dst, struct io_stat *src) 721 { 722 dst->max_val = le64_to_cpu(src->max_val); 723 dst->min_val = le64_to_cpu(src->min_val); 724 dst->samples = le64_to_cpu(src->samples); 725 726 /* 727 * Floats arrive as IEEE 754 encoded uint64_t, convert back to double 728 */ 729 dst->mean.u.f = fio_uint64_to_double(le64_to_cpu(dst->mean.u.i)); 730 dst->S.u.f = fio_uint64_to_double(le64_to_cpu(dst->S.u.i)); 731 } 732 733 static void convert_ts(struct thread_stat *dst, struct thread_stat *src) 734 { 735 int i, j; 736 737 dst->error = le32_to_cpu(src->error); 738 dst->thread_number = le32_to_cpu(src->thread_number); 739 dst->groupid = le32_to_cpu(src->groupid); 740 dst->pid = le32_to_cpu(src->pid); 741 dst->members = le32_to_cpu(src->members); 742 dst->unified_rw_rep = le32_to_cpu(src->unified_rw_rep); 743 744 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 745 convert_io_stat(&dst->clat_stat[i], &src->clat_stat[i]); 746 convert_io_stat(&dst->slat_stat[i], &src->slat_stat[i]); 747 convert_io_stat(&dst->lat_stat[i], &src->lat_stat[i]); 748 convert_io_stat(&dst->bw_stat[i], &src->bw_stat[i]); 749 } 750 751 dst->usr_time = le64_to_cpu(src->usr_time); 752 dst->sys_time = le64_to_cpu(src->sys_time); 753 dst->ctx = le64_to_cpu(src->ctx); 754 dst->minf = le64_to_cpu(src->minf); 755 dst->majf = le64_to_cpu(src->majf); 756 dst->clat_percentiles = le64_to_cpu(src->clat_percentiles); 757 758 for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) { 759 fio_fp64_t *fps = &src->percentile_list[i]; 760 fio_fp64_t *fpd = &dst->percentile_list[i]; 761 762 fpd->u.f = fio_uint64_to_double(le64_to_cpu(fps->u.i)); 763 } 764 765 for (i = 0; i < FIO_IO_U_MAP_NR; i++) { 766 dst->io_u_map[i] = le32_to_cpu(src->io_u_map[i]); 767 dst->io_u_submit[i] = le32_to_cpu(src->io_u_submit[i]); 768 dst->io_u_complete[i] = le32_to_cpu(src->io_u_complete[i]); 769 } 770 771 for (i = 0; i < FIO_IO_U_LAT_U_NR; i++) { 772 dst->io_u_lat_u[i] = le32_to_cpu(src->io_u_lat_u[i]); 773 dst->io_u_lat_m[i] = le32_to_cpu(src->io_u_lat_m[i]); 774 } 775 776 for (i = 0; i < DDIR_RWDIR_CNT; i++) 777 for (j = 0; j < FIO_IO_U_PLAT_NR; j++) 778 dst->io_u_plat[i][j] = le32_to_cpu(src->io_u_plat[i][j]); 779 780 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 781 dst->total_io_u[i] = le64_to_cpu(src->total_io_u[i]); 782 dst->short_io_u[i] = le64_to_cpu(src->short_io_u[i]); 783 } 784 785 dst->total_submit = le64_to_cpu(src->total_submit); 786 dst->total_complete = le64_to_cpu(src->total_complete); 787 788 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 789 dst->io_bytes[i] = le64_to_cpu(src->io_bytes[i]); 790 dst->runtime[i] = le64_to_cpu(src->runtime[i]); 791 } 792 793 dst->total_run_time = le64_to_cpu(src->total_run_time); 794 dst->continue_on_error = le16_to_cpu(src->continue_on_error); 795 dst->total_err_count = le64_to_cpu(src->total_err_count); 796 dst->first_error = le32_to_cpu(src->first_error); 797 dst->kb_base = le32_to_cpu(src->kb_base); 798 dst->unit_base = le32_to_cpu(src->unit_base); 799 800 dst->latency_depth = le32_to_cpu(src->latency_depth); 801 dst->latency_target = le64_to_cpu(src->latency_target); 802 dst->latency_window = le64_to_cpu(src->latency_window); 803 dst->latency_percentile.u.f = fio_uint64_to_double(le64_to_cpu(src->latency_percentile.u.i)); 804 } 805 806 static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src) 807 { 808 int i; 809 810 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 811 dst->max_run[i] = le64_to_cpu(src->max_run[i]); 812 dst->min_run[i] = le64_to_cpu(src->min_run[i]); 813 dst->max_bw[i] = le64_to_cpu(src->max_bw[i]); 814 dst->min_bw[i] = le64_to_cpu(src->min_bw[i]); 815 dst->io_kb[i] = le64_to_cpu(src->io_kb[i]); 816 dst->agg[i] = le64_to_cpu(src->agg[i]); 817 } 818 819 dst->kb_base = le32_to_cpu(src->kb_base); 820 dst->unit_base = le32_to_cpu(src->unit_base); 821 dst->groupid = le32_to_cpu(src->groupid); 822 dst->unified_rw_rep = le32_to_cpu(src->unified_rw_rep); 823 } 824 825 static void json_object_add_client_info(struct json_object *obj, 826 struct fio_client *client) 827 { 828 json_object_add_value_string(obj, "hostname", client->hostname); 829 json_object_add_value_int(obj, "port", client->port); 830 } 831 832 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd) 833 { 834 struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload; 835 struct json_object *tsobj; 836 837 tsobj = show_thread_status(&p->ts, &p->rs); 838 client->did_stat = 1; 839 if (tsobj) { 840 json_object_add_client_info(tsobj, client); 841 json_array_add_value_object(clients_array, tsobj); 842 } 843 844 if (!do_output_all_clients) 845 return; 846 847 sum_thread_stats(&client_ts, &p->ts, sum_stat_nr); 848 sum_group_stats(&client_gs, &p->rs); 849 850 client_ts.members++; 851 client_ts.thread_number = p->ts.thread_number; 852 client_ts.groupid = p->ts.groupid; 853 client_ts.unified_rw_rep = p->ts.unified_rw_rep; 854 855 if (++sum_stat_nr == sum_stat_clients) { 856 strcpy(client_ts.name, "All clients"); 857 tsobj = show_thread_status(&client_ts, &client_gs); 858 if (tsobj) { 859 json_object_add_client_info(tsobj, client); 860 json_array_add_value_object(clients_array, tsobj); 861 } 862 } 863 } 864 865 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd) 866 { 867 struct group_run_stats *gs = (struct group_run_stats *) cmd->payload; 868 869 show_group_stats(gs); 870 } 871 872 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd) 873 { 874 struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload; 875 const char *buf = (const char *) pdu->buf; 876 const char *name; 877 int fio_unused ret; 878 879 name = client->name ? client->name : client->hostname; 880 881 if (!client->skip_newline) 882 fprintf(f_out, "<%s> ", name); 883 ret = fwrite(buf, pdu->buf_len, 1, f_out); 884 fflush(f_out); 885 client->skip_newline = strchr(buf, '\n') == NULL; 886 } 887 888 static void convert_agg(struct disk_util_agg *agg) 889 { 890 int i; 891 892 for (i = 0; i < 2; i++) { 893 agg->ios[i] = le32_to_cpu(agg->ios[i]); 894 agg->merges[i] = le32_to_cpu(agg->merges[i]); 895 agg->sectors[i] = le64_to_cpu(agg->sectors[i]); 896 agg->ticks[i] = le32_to_cpu(agg->ticks[i]); 897 } 898 899 agg->io_ticks = le32_to_cpu(agg->io_ticks); 900 agg->time_in_queue = le32_to_cpu(agg->time_in_queue); 901 agg->slavecount = le32_to_cpu(agg->slavecount); 902 agg->max_util.u.f = fio_uint64_to_double(__le64_to_cpu(agg->max_util.u.i)); 903 } 904 905 static void convert_dus(struct disk_util_stat *dus) 906 { 907 int i; 908 909 for (i = 0; i < 2; i++) { 910 dus->s.ios[i] = le32_to_cpu(dus->s.ios[i]); 911 dus->s.merges[i] = le32_to_cpu(dus->s.merges[i]); 912 dus->s.sectors[i] = le64_to_cpu(dus->s.sectors[i]); 913 dus->s.ticks[i] = le32_to_cpu(dus->s.ticks[i]); 914 } 915 916 dus->s.io_ticks = le32_to_cpu(dus->s.io_ticks); 917 dus->s.time_in_queue = le32_to_cpu(dus->s.time_in_queue); 918 dus->s.msec = le64_to_cpu(dus->s.msec); 919 } 920 921 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd) 922 { 923 struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload; 924 925 if (!client->disk_stats_shown) { 926 client->disk_stats_shown = 1; 927 log_info("\nDisk stats (read/write):\n"); 928 } 929 930 if (output_format == FIO_OUTPUT_JSON) { 931 struct json_object *duobj; 932 json_array_add_disk_util(&du->dus, &du->agg, du_array); 933 duobj = json_array_last_value_object(du_array); 934 json_object_add_client_info(duobj, client); 935 } else 936 print_disk_util(&du->dus, &du->agg, output_format == FIO_OUTPUT_TERSE); 937 } 938 939 static void convert_jobs_eta(struct jobs_eta *je) 940 { 941 int i; 942 943 je->nr_running = le32_to_cpu(je->nr_running); 944 je->nr_ramp = le32_to_cpu(je->nr_ramp); 945 je->nr_pending = le32_to_cpu(je->nr_pending); 946 je->nr_setting_up = le32_to_cpu(je->nr_setting_up); 947 je->files_open = le32_to_cpu(je->files_open); 948 949 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 950 je->m_rate[i] = le32_to_cpu(je->m_rate[i]); 951 je->t_rate[i] = le32_to_cpu(je->t_rate[i]); 952 je->m_iops[i] = le32_to_cpu(je->m_iops[i]); 953 je->t_iops[i] = le32_to_cpu(je->t_iops[i]); 954 je->rate[i] = le32_to_cpu(je->rate[i]); 955 je->iops[i] = le32_to_cpu(je->iops[i]); 956 } 957 958 je->elapsed_sec = le64_to_cpu(je->elapsed_sec); 959 je->eta_sec = le64_to_cpu(je->eta_sec); 960 je->nr_threads = le32_to_cpu(je->nr_threads); 961 je->is_pow2 = le32_to_cpu(je->is_pow2); 962 je->unit_base = le32_to_cpu(je->unit_base); 963 } 964 965 void fio_client_sum_jobs_eta(struct jobs_eta *dst, struct jobs_eta *je) 966 { 967 int i; 968 969 dst->nr_running += je->nr_running; 970 dst->nr_ramp += je->nr_ramp; 971 dst->nr_pending += je->nr_pending; 972 dst->nr_setting_up += je->nr_setting_up; 973 dst->files_open += je->files_open; 974 975 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 976 dst->m_rate[i] += je->m_rate[i]; 977 dst->t_rate[i] += je->t_rate[i]; 978 dst->m_iops[i] += je->m_iops[i]; 979 dst->t_iops[i] += je->t_iops[i]; 980 dst->rate[i] += je->rate[i]; 981 dst->iops[i] += je->iops[i]; 982 } 983 984 dst->elapsed_sec += je->elapsed_sec; 985 986 if (je->eta_sec > dst->eta_sec) 987 dst->eta_sec = je->eta_sec; 988 989 dst->nr_threads += je->nr_threads; 990 /* we need to handle je->run_str too ... */ 991 } 992 993 void fio_client_dec_jobs_eta(struct client_eta *eta, client_eta_op eta_fn) 994 { 995 if (!--eta->pending) { 996 eta_fn(&eta->eta); 997 free(eta); 998 } 999 } 1000 1001 static void remove_reply_cmd(struct fio_client *client, struct fio_net_cmd *cmd) 1002 { 1003 struct fio_net_cmd_reply *reply = NULL; 1004 struct flist_head *entry; 1005 1006 flist_for_each(entry, &client->cmd_list) { 1007 reply = flist_entry(entry, struct fio_net_cmd_reply, list); 1008 1009 if (cmd->tag == (uintptr_t) reply) 1010 break; 1011 1012 reply = NULL; 1013 } 1014 1015 if (!reply) { 1016 log_err("fio: client: unable to find matching tag (%llx)\n", (unsigned long long) cmd->tag); 1017 return; 1018 } 1019 1020 flist_del(&reply->list); 1021 cmd->tag = reply->saved_tag; 1022 free(reply); 1023 } 1024 1025 int fio_client_wait_for_reply(struct fio_client *client, uint64_t tag) 1026 { 1027 do { 1028 struct fio_net_cmd_reply *reply = NULL; 1029 struct flist_head *entry; 1030 1031 flist_for_each(entry, &client->cmd_list) { 1032 reply = flist_entry(entry, struct fio_net_cmd_reply, list); 1033 1034 if (tag == (uintptr_t) reply) 1035 break; 1036 1037 reply = NULL; 1038 } 1039 1040 if (!reply) 1041 break; 1042 1043 usleep(1000); 1044 } while (1); 1045 1046 return 0; 1047 } 1048 1049 static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd) 1050 { 1051 struct jobs_eta *je = (struct jobs_eta *) cmd->payload; 1052 struct client_eta *eta = (struct client_eta *) (uintptr_t) cmd->tag; 1053 1054 dprint(FD_NET, "client: got eta tag %p, %d\n", eta, eta->pending); 1055 1056 assert(client->eta_in_flight == eta); 1057 1058 client->eta_in_flight = NULL; 1059 flist_del_init(&client->eta_list); 1060 1061 if (client->ops->jobs_eta) 1062 client->ops->jobs_eta(client, je); 1063 1064 fio_client_sum_jobs_eta(&eta->eta, je); 1065 fio_client_dec_jobs_eta(eta, client->ops->eta); 1066 } 1067 1068 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd) 1069 { 1070 struct cmd_probe_reply_pdu *probe = (struct cmd_probe_reply_pdu *) cmd->payload; 1071 const char *os, *arch; 1072 char bit[16]; 1073 1074 os = fio_get_os_string(probe->os); 1075 if (!os) 1076 os = "unknown"; 1077 1078 arch = fio_get_arch_string(probe->arch); 1079 if (!arch) 1080 os = "unknown"; 1081 1082 sprintf(bit, "%d-bit", probe->bpp * 8); 1083 probe->flags = le64_to_cpu(probe->flags); 1084 1085 log_info("hostname=%s, be=%u, %s, os=%s, arch=%s, fio=%s, flags=%lx\n", 1086 probe->hostname, probe->bigendian, bit, os, arch, 1087 probe->fio_version, (unsigned long) probe->flags); 1088 1089 if (!client->name) 1090 client->name = strdup((char *) probe->hostname); 1091 } 1092 1093 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd) 1094 { 1095 struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload; 1096 1097 client->state = Client_started; 1098 client->jobs = le32_to_cpu(pdu->jobs); 1099 client->nr_stat = le32_to_cpu(pdu->stat_outputs); 1100 1101 if (sum_stat_clients > 1) 1102 do_output_all_clients = 1; 1103 1104 sum_stat_clients += client->nr_stat; 1105 } 1106 1107 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd) 1108 { 1109 if (client->error) 1110 log_info("client <%s>: exited with error %d\n", client->hostname, client->error); 1111 } 1112 1113 static void convert_stop(struct fio_net_cmd *cmd) 1114 { 1115 struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload; 1116 1117 pdu->error = le32_to_cpu(pdu->error); 1118 } 1119 1120 static void convert_text(struct fio_net_cmd *cmd) 1121 { 1122 struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload; 1123 1124 pdu->level = le32_to_cpu(pdu->level); 1125 pdu->buf_len = le32_to_cpu(pdu->buf_len); 1126 pdu->log_sec = le64_to_cpu(pdu->log_sec); 1127 pdu->log_usec = le64_to_cpu(pdu->log_usec); 1128 } 1129 1130 static struct cmd_iolog_pdu *convert_iolog_gz(struct fio_net_cmd *cmd, 1131 struct cmd_iolog_pdu *pdu) 1132 { 1133 #ifdef CONFIG_ZLIB 1134 struct cmd_iolog_pdu *ret; 1135 z_stream stream; 1136 uint32_t nr_samples; 1137 size_t total; 1138 void *p; 1139 1140 stream.zalloc = Z_NULL; 1141 stream.zfree = Z_NULL; 1142 stream.opaque = Z_NULL; 1143 stream.avail_in = 0; 1144 stream.next_in = Z_NULL; 1145 1146 if (inflateInit(&stream) != Z_OK) 1147 return NULL; 1148 1149 /* 1150 * Get header first, it's not compressed 1151 */ 1152 nr_samples = le32_to_cpu(pdu->nr_samples); 1153 1154 total = nr_samples * sizeof(struct io_sample); 1155 ret = malloc(total + sizeof(*pdu)); 1156 ret->nr_samples = nr_samples; 1157 1158 memcpy(ret, pdu, sizeof(*pdu)); 1159 1160 p = (void *) ret + sizeof(*pdu); 1161 1162 stream.avail_in = cmd->pdu_len - sizeof(*pdu); 1163 stream.next_in = (void *) pdu + sizeof(*pdu); 1164 while (stream.avail_in) { 1165 unsigned int this_chunk = 65536; 1166 unsigned int this_len; 1167 int err; 1168 1169 if (this_chunk > total) 1170 this_chunk = total; 1171 1172 stream.avail_out = this_chunk; 1173 stream.next_out = p; 1174 err = inflate(&stream, Z_NO_FLUSH); 1175 /* may be Z_OK, or Z_STREAM_END */ 1176 if (err < 0) { 1177 log_err("fio: inflate error %d\n", err); 1178 free(ret); 1179 ret = NULL; 1180 goto err; 1181 } 1182 1183 this_len = this_chunk - stream.avail_out; 1184 p += this_len; 1185 total -= this_len; 1186 } 1187 1188 err: 1189 inflateEnd(&stream); 1190 return ret; 1191 #else 1192 return NULL; 1193 #endif 1194 } 1195 1196 /* 1197 * This has been compressed on the server side, since it can be big. 1198 * Uncompress here. 1199 */ 1200 static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd) 1201 { 1202 struct cmd_iolog_pdu *pdu = (struct cmd_iolog_pdu *) cmd->payload; 1203 struct cmd_iolog_pdu *ret; 1204 int i; 1205 1206 /* 1207 * Convert if compressed and we support it. If it's not 1208 * compressed, we need not do anything. 1209 */ 1210 if (le32_to_cpu(pdu->compressed)) { 1211 #ifndef CONFIG_ZLIB 1212 log_err("fio: server sent compressed data by mistake\n"); 1213 return NULL; 1214 #endif 1215 ret = convert_iolog_gz(cmd, pdu); 1216 if (!ret) { 1217 log_err("fio: failed decompressing log\n"); 1218 return NULL; 1219 } 1220 } else 1221 ret = pdu; 1222 1223 ret->thread_number = le32_to_cpu(ret->thread_number); 1224 ret->nr_samples = le32_to_cpu(ret->nr_samples); 1225 ret->log_type = le32_to_cpu(ret->log_type); 1226 ret->compressed = le32_to_cpu(ret->compressed); 1227 1228 for (i = 0; i < ret->nr_samples; i++) { 1229 struct io_sample *s = &ret->samples[i]; 1230 1231 s->time = le64_to_cpu(s->time); 1232 s->val = le64_to_cpu(s->val); 1233 s->ddir = le32_to_cpu(s->ddir); 1234 s->bs = le32_to_cpu(s->bs); 1235 } 1236 1237 return ret; 1238 } 1239 1240 int fio_handle_client(struct fio_client *client) 1241 { 1242 struct client_ops *ops = client->ops; 1243 struct fio_net_cmd *cmd; 1244 1245 dprint(FD_NET, "client: handle %s\n", client->hostname); 1246 1247 cmd = fio_net_recv_cmd(client->fd); 1248 if (!cmd) 1249 return 0; 1250 1251 dprint(FD_NET, "client: got cmd op %s from %s (pdu=%u)\n", 1252 fio_server_op(cmd->opcode), client->hostname, cmd->pdu_len); 1253 1254 switch (cmd->opcode) { 1255 case FIO_NET_CMD_QUIT: 1256 if (ops->quit) 1257 ops->quit(client, cmd); 1258 remove_client(client); 1259 free(cmd); 1260 break; 1261 case FIO_NET_CMD_TEXT: 1262 convert_text(cmd); 1263 ops->text(client, cmd); 1264 free(cmd); 1265 break; 1266 case FIO_NET_CMD_DU: { 1267 struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload; 1268 1269 convert_dus(&du->dus); 1270 convert_agg(&du->agg); 1271 1272 ops->disk_util(client, cmd); 1273 free(cmd); 1274 break; 1275 } 1276 case FIO_NET_CMD_TS: { 1277 struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload; 1278 1279 convert_ts(&p->ts, &p->ts); 1280 convert_gs(&p->rs, &p->rs); 1281 1282 ops->thread_status(client, cmd); 1283 free(cmd); 1284 break; 1285 } 1286 case FIO_NET_CMD_GS: { 1287 struct group_run_stats *gs = (struct group_run_stats *) cmd->payload; 1288 1289 convert_gs(gs, gs); 1290 1291 ops->group_stats(client, cmd); 1292 free(cmd); 1293 break; 1294 } 1295 case FIO_NET_CMD_ETA: { 1296 struct jobs_eta *je = (struct jobs_eta *) cmd->payload; 1297 1298 remove_reply_cmd(client, cmd); 1299 convert_jobs_eta(je); 1300 handle_eta(client, cmd); 1301 free(cmd); 1302 break; 1303 } 1304 case FIO_NET_CMD_PROBE: 1305 remove_reply_cmd(client, cmd); 1306 ops->probe(client, cmd); 1307 free(cmd); 1308 break; 1309 case FIO_NET_CMD_SERVER_START: 1310 client->state = Client_running; 1311 if (ops->job_start) 1312 ops->job_start(client, cmd); 1313 free(cmd); 1314 break; 1315 case FIO_NET_CMD_START: { 1316 struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload; 1317 1318 pdu->jobs = le32_to_cpu(pdu->jobs); 1319 ops->start(client, cmd); 1320 free(cmd); 1321 break; 1322 } 1323 case FIO_NET_CMD_STOP: { 1324 struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload; 1325 1326 convert_stop(cmd); 1327 client->state = Client_stopped; 1328 client->error = le32_to_cpu(pdu->error); 1329 client->signal = le32_to_cpu(pdu->signal); 1330 ops->stop(client, cmd); 1331 free(cmd); 1332 break; 1333 } 1334 case FIO_NET_CMD_ADD_JOB: { 1335 struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload; 1336 1337 client->thread_number = le32_to_cpu(pdu->thread_number); 1338 client->groupid = le32_to_cpu(pdu->groupid); 1339 1340 if (ops->add_job) 1341 ops->add_job(client, cmd); 1342 free(cmd); 1343 break; 1344 } 1345 case FIO_NET_CMD_IOLOG: 1346 if (ops->iolog) { 1347 struct cmd_iolog_pdu *pdu; 1348 1349 pdu = convert_iolog(cmd); 1350 ops->iolog(client, pdu); 1351 } 1352 free(cmd); 1353 break; 1354 case FIO_NET_CMD_UPDATE_JOB: 1355 ops->update_job(client, cmd); 1356 remove_reply_cmd(client, cmd); 1357 free(cmd); 1358 break; 1359 default: 1360 log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode)); 1361 free(cmd); 1362 break; 1363 } 1364 1365 return 1; 1366 } 1367 1368 static void request_client_etas(struct client_ops *ops) 1369 { 1370 struct fio_client *client; 1371 struct flist_head *entry; 1372 struct client_eta *eta; 1373 int skipped = 0; 1374 1375 dprint(FD_NET, "client: request eta (%d)\n", nr_clients); 1376 1377 eta = malloc(sizeof(*eta)); 1378 memset(&eta->eta, 0, sizeof(eta->eta)); 1379 eta->pending = nr_clients; 1380 1381 flist_for_each(entry, &client_list) { 1382 client = flist_entry(entry, struct fio_client, list); 1383 1384 if (!flist_empty(&client->eta_list)) { 1385 skipped++; 1386 continue; 1387 } 1388 if (client->state != Client_running) 1389 continue; 1390 1391 assert(!client->eta_in_flight); 1392 flist_add_tail(&client->eta_list, &eta_list); 1393 client->eta_in_flight = eta; 1394 fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA, 1395 (uintptr_t) eta, &client->cmd_list); 1396 } 1397 1398 while (skipped--) 1399 fio_client_dec_jobs_eta(eta, ops->eta); 1400 1401 dprint(FD_NET, "client: requested eta tag %p\n", eta); 1402 } 1403 1404 static int client_check_cmd_timeout(struct fio_client *client, 1405 struct timeval *now) 1406 { 1407 struct fio_net_cmd_reply *reply; 1408 struct flist_head *entry, *tmp; 1409 int ret = 0; 1410 1411 flist_for_each_safe(entry, tmp, &client->cmd_list) { 1412 reply = flist_entry(entry, struct fio_net_cmd_reply, list); 1413 1414 if (mtime_since(&reply->tv, now) < FIO_NET_CLIENT_TIMEOUT) 1415 continue; 1416 1417 log_err("fio: client %s, timeout on cmd %s\n", client->hostname, 1418 fio_server_op(reply->opcode)); 1419 flist_del(&reply->list); 1420 free(reply); 1421 ret = 1; 1422 } 1423 1424 return flist_empty(&client->cmd_list) && ret; 1425 } 1426 1427 static int fio_check_clients_timed_out(void) 1428 { 1429 struct fio_client *client; 1430 struct flist_head *entry, *tmp; 1431 struct timeval tv; 1432 int ret = 0; 1433 1434 fio_gettime(&tv, NULL); 1435 1436 flist_for_each_safe(entry, tmp, &client_list) { 1437 client = flist_entry(entry, struct fio_client, list); 1438 1439 if (flist_empty(&client->cmd_list)) 1440 continue; 1441 1442 if (!client_check_cmd_timeout(client, &tv)) 1443 continue; 1444 1445 if (client->ops->timed_out) 1446 client->ops->timed_out(client); 1447 else 1448 log_err("fio: client %s timed out\n", client->hostname); 1449 1450 remove_client(client); 1451 ret = 1; 1452 } 1453 1454 return ret; 1455 } 1456 1457 int fio_handle_clients(struct client_ops *ops) 1458 { 1459 struct pollfd *pfds; 1460 int i, ret = 0, retval = 0; 1461 1462 fio_gettime(&eta_tv, NULL); 1463 1464 pfds = malloc(nr_clients * sizeof(struct pollfd)); 1465 1466 init_thread_stat(&client_ts); 1467 init_group_run_stat(&client_gs); 1468 1469 while (!exit_backend && nr_clients) { 1470 struct flist_head *entry, *tmp; 1471 struct fio_client *client; 1472 1473 i = 0; 1474 flist_for_each_safe(entry, tmp, &client_list) { 1475 client = flist_entry(entry, struct fio_client, list); 1476 1477 if (!client->sent_job && !client->ops->stay_connected && 1478 flist_empty(&client->cmd_list)) { 1479 remove_client(client); 1480 continue; 1481 } 1482 1483 pfds[i].fd = client->fd; 1484 pfds[i].events = POLLIN; 1485 i++; 1486 } 1487 1488 if (!nr_clients) 1489 break; 1490 1491 assert(i == nr_clients); 1492 1493 do { 1494 struct timeval tv; 1495 1496 fio_gettime(&tv, NULL); 1497 if (mtime_since(&eta_tv, &tv) >= 900) { 1498 request_client_etas(ops); 1499 memcpy(&eta_tv, &tv, sizeof(tv)); 1500 1501 if (fio_check_clients_timed_out()) 1502 break; 1503 } 1504 1505 ret = poll(pfds, nr_clients, ops->eta_msec); 1506 if (ret < 0) { 1507 if (errno == EINTR) 1508 continue; 1509 log_err("fio: poll clients: %s\n", strerror(errno)); 1510 break; 1511 } else if (!ret) 1512 continue; 1513 } while (ret <= 0); 1514 1515 for (i = 0; i < nr_clients; i++) { 1516 if (!(pfds[i].revents & POLLIN)) 1517 continue; 1518 1519 client = find_client_by_fd(pfds[i].fd); 1520 if (!client) { 1521 log_err("fio: unknown client fd %ld\n", (long) pfds[i].fd); 1522 continue; 1523 } 1524 if (!fio_handle_client(client)) { 1525 log_info("client: host=%s disconnected\n", 1526 client->hostname); 1527 remove_client(client); 1528 retval = 1; 1529 } else if (client->error) 1530 retval = 1; 1531 fio_put_client(client); 1532 } 1533 } 1534 1535 fio_client_json_fini(); 1536 1537 free(pfds); 1538 return retval; 1539 } 1540