Home | History | Annotate | Download | only in fio
      1 #include <stdio.h>
      2 #include <stdlib.h>
      3 #include <unistd.h>
      4 #include <limits.h>
      5 #include <errno.h>
      6 #include <fcntl.h>
      7 #include <sys/poll.h>
      8 #include <sys/types.h>
      9 #include <sys/stat.h>
     10 #include <sys/wait.h>
     11 #include <sys/socket.h>
     12 #include <sys/un.h>
     13 #include <netinet/in.h>
     14 #include <arpa/inet.h>
     15 #include <netdb.h>
     16 #include <signal.h>
     17 #ifdef CONFIG_ZLIB
     18 #include <zlib.h>
     19 #endif
     20 
     21 #include "fio.h"
     22 #include "client.h"
     23 #include "server.h"
     24 #include "flist.h"
     25 #include "hash.h"
     26 #include "verify.h"
     27 
     28 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd);
     29 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd);
     30 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd);
     31 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd);
     32 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd);
     33 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd);
     34 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd);
     35 
     36 static void convert_text(struct fio_net_cmd *cmd);
     37 
     38 struct client_ops fio_client_ops = {
     39 	.text		= handle_text,
     40 	.disk_util	= handle_du,
     41 	.thread_status	= handle_ts,
     42 	.group_stats	= handle_gs,
     43 	.stop		= handle_stop,
     44 	.start		= handle_start,
     45 	.eta		= display_thread_status,
     46 	.probe		= handle_probe,
     47 	.eta_msec	= FIO_CLIENT_DEF_ETA_MSEC,
     48 	.client_type	= FIO_CLIENT_TYPE_CLI,
     49 };
     50 
     51 static struct timeval eta_tv;
     52 
     53 static FLIST_HEAD(client_list);
     54 static FLIST_HEAD(eta_list);
     55 
     56 static FLIST_HEAD(arg_list);
     57 
     58 struct thread_stat client_ts;
     59 struct group_run_stats client_gs;
     60 int sum_stat_clients;
     61 
     62 static int sum_stat_nr;
     63 static struct json_object *root = NULL;
     64 static struct json_object *job_opt_object = NULL;
     65 static struct json_array *clients_array = NULL;
     66 static struct json_array *du_array = NULL;
     67 
     68 static int error_clients;
     69 
     70 #define FIO_CLIENT_HASH_BITS	7
     71 #define FIO_CLIENT_HASH_SZ	(1 << FIO_CLIENT_HASH_BITS)
     72 #define FIO_CLIENT_HASH_MASK	(FIO_CLIENT_HASH_SZ - 1)
     73 static struct flist_head client_hash[FIO_CLIENT_HASH_SZ];
     74 
     75 static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *, bool *);
     76 
     77 static void fio_client_add_hash(struct fio_client *client)
     78 {
     79 	int bucket = hash_long(client->fd, FIO_CLIENT_HASH_BITS);
     80 
     81 	bucket &= FIO_CLIENT_HASH_MASK;
     82 	flist_add(&client->hash_list, &client_hash[bucket]);
     83 }
     84 
     85 static void fio_client_remove_hash(struct fio_client *client)
     86 {
     87 	if (!flist_empty(&client->hash_list))
     88 		flist_del_init(&client->hash_list);
     89 }
     90 
     91 static void fio_init fio_client_hash_init(void)
     92 {
     93 	int i;
     94 
     95 	for (i = 0; i < FIO_CLIENT_HASH_SZ; i++)
     96 		INIT_FLIST_HEAD(&client_hash[i]);
     97 }
     98 
     99 static int read_data(int fd, void *data, size_t size)
    100 {
    101 	ssize_t ret;
    102 
    103 	while (size) {
    104 		ret = read(fd, data, size);
    105 		if (ret < 0) {
    106 			if (errno == EAGAIN || errno == EINTR)
    107 				continue;
    108 			break;
    109 		} else if (!ret)
    110 			break;
    111 		else {
    112 			data += ret;
    113 			size -= ret;
    114 		}
    115 	}
    116 
    117 	if (size)
    118 		return EAGAIN;
    119 
    120 	return 0;
    121 }
    122 
    123 static void fio_client_json_init(void)
    124 {
    125 	char time_buf[32];
    126 	time_t time_p;
    127 
    128 	if (!(output_format & FIO_OUTPUT_JSON))
    129 		return;
    130 
    131 	time(&time_p);
    132 	os_ctime_r((const time_t *) &time_p, time_buf, sizeof(time_buf));
    133 	time_buf[strlen(time_buf) - 1] = '\0';
    134 
    135 	root = json_create_object();
    136 	json_object_add_value_string(root, "fio version", fio_version_string);
    137 	json_object_add_value_int(root, "timestamp", time_p);
    138 	json_object_add_value_string(root, "time", time_buf);
    139 
    140 	job_opt_object = json_create_object();
    141 	json_object_add_value_object(root, "global options", job_opt_object);
    142 	clients_array = json_create_array();
    143 	json_object_add_value_array(root, "client_stats", clients_array);
    144 	du_array = json_create_array();
    145 	json_object_add_value_array(root, "disk_util", du_array);
    146 }
    147 
    148 static void fio_client_json_fini(void)
    149 {
    150 	if (!(output_format & FIO_OUTPUT_JSON))
    151 		return;
    152 
    153 	log_info("\n");
    154 	json_print_object(root, NULL);
    155 	log_info("\n");
    156 	json_free_object(root);
    157 	root = NULL;
    158 	clients_array = NULL;
    159 	du_array = NULL;
    160 }
    161 
    162 static struct fio_client *find_client_by_fd(int fd)
    163 {
    164 	int bucket = hash_long(fd, FIO_CLIENT_HASH_BITS) & FIO_CLIENT_HASH_MASK;
    165 	struct fio_client *client;
    166 	struct flist_head *entry;
    167 
    168 	flist_for_each(entry, &client_hash[bucket]) {
    169 		client = flist_entry(entry, struct fio_client, hash_list);
    170 
    171 		if (client->fd == fd) {
    172 			client->refs++;
    173 			return client;
    174 		}
    175 	}
    176 
    177 	return NULL;
    178 }
    179 
    180 void fio_put_client(struct fio_client *client)
    181 {
    182 	if (--client->refs)
    183 		return;
    184 
    185 	free(client->hostname);
    186 	if (client->argv)
    187 		free(client->argv);
    188 	if (client->name)
    189 		free(client->name);
    190 	while (client->nr_files) {
    191 		struct client_file *cf = &client->files[--client->nr_files];
    192 
    193 		free(cf->file);
    194 	}
    195 	if (client->files)
    196 		free(client->files);
    197 	if (client->opt_lists)
    198 		free(client->opt_lists);
    199 
    200 	if (!client->did_stat)
    201 		sum_stat_clients--;
    202 
    203 	if (client->error)
    204 		error_clients++;
    205 
    206 	free(client);
    207 }
    208 
    209 static int fio_client_dec_jobs_eta(struct client_eta *eta, client_eta_op eta_fn)
    210 {
    211 	if (!--eta->pending) {
    212 		eta_fn(&eta->eta);
    213 		free(eta);
    214 		return 0;
    215 	}
    216 
    217 	return 1;
    218 }
    219 
    220 static void fio_drain_client_text(struct fio_client *client)
    221 {
    222 	do {
    223 		struct fio_net_cmd *cmd;
    224 
    225 		cmd = fio_net_recv_cmd(client->fd, false);
    226 		if (!cmd)
    227 			break;
    228 
    229 		if (cmd->opcode == FIO_NET_CMD_TEXT) {
    230 			convert_text(cmd);
    231 			client->ops->text(client, cmd);
    232 		}
    233 
    234 		free(cmd);
    235 	} while (1);
    236 }
    237 
    238 static void remove_client(struct fio_client *client)
    239 {
    240 	assert(client->refs);
    241 
    242 	dprint(FD_NET, "client: removed <%s>\n", client->hostname);
    243 
    244 	fio_drain_client_text(client);
    245 
    246 	if (!flist_empty(&client->list))
    247 		flist_del_init(&client->list);
    248 
    249 	fio_client_remove_hash(client);
    250 
    251 	if (!flist_empty(&client->eta_list)) {
    252 		flist_del_init(&client->eta_list);
    253 		fio_client_dec_jobs_eta(client->eta_in_flight, client->ops->eta);
    254 	}
    255 
    256 	close(client->fd);
    257 	client->fd = -1;
    258 
    259 	if (client->ops->removed)
    260 		client->ops->removed(client);
    261 
    262 	nr_clients--;
    263 	fio_put_client(client);
    264 }
    265 
    266 struct fio_client *fio_get_client(struct fio_client *client)
    267 {
    268 	client->refs++;
    269 	return client;
    270 }
    271 
    272 static void __fio_client_add_cmd_option(struct fio_client *client,
    273 					const char *opt)
    274 {
    275 	int index;
    276 
    277 	index = client->argc++;
    278 	client->argv = realloc(client->argv, sizeof(char *) * client->argc);
    279 	client->argv[index] = strdup(opt);
    280 	dprint(FD_NET, "client: add cmd %d: %s\n", index, opt);
    281 }
    282 
    283 void fio_client_add_cmd_option(void *cookie, const char *opt)
    284 {
    285 	struct fio_client *client = cookie;
    286 	struct flist_head *entry;
    287 
    288 	if (!client || !opt)
    289 		return;
    290 
    291 	__fio_client_add_cmd_option(client, opt);
    292 
    293 	/*
    294 	 * Duplicate arguments to shared client group
    295 	 */
    296 	flist_for_each(entry, &arg_list) {
    297 		client = flist_entry(entry, struct fio_client, arg_list);
    298 
    299 		__fio_client_add_cmd_option(client, opt);
    300 	}
    301 }
    302 
    303 struct fio_client *fio_client_add_explicit(struct client_ops *ops,
    304 					   const char *hostname, int type,
    305 					   int port)
    306 {
    307 	struct fio_client *client;
    308 
    309 	client = malloc(sizeof(*client));
    310 	memset(client, 0, sizeof(*client));
    311 
    312 	INIT_FLIST_HEAD(&client->list);
    313 	INIT_FLIST_HEAD(&client->hash_list);
    314 	INIT_FLIST_HEAD(&client->arg_list);
    315 	INIT_FLIST_HEAD(&client->eta_list);
    316 	INIT_FLIST_HEAD(&client->cmd_list);
    317 
    318 	client->hostname = strdup(hostname);
    319 
    320 	if (type == Fio_client_socket)
    321 		client->is_sock = 1;
    322 	else {
    323 		int ipv6;
    324 
    325 		ipv6 = type == Fio_client_ipv6;
    326 		if (fio_server_parse_host(hostname, ipv6,
    327 						&client->addr.sin_addr,
    328 						&client->addr6.sin6_addr))
    329 			goto err;
    330 
    331 		client->port = port;
    332 	}
    333 
    334 	client->fd = -1;
    335 	client->ops = ops;
    336 	client->refs = 1;
    337 	client->type = ops->client_type;
    338 
    339 	__fio_client_add_cmd_option(client, "fio");
    340 
    341 	flist_add(&client->list, &client_list);
    342 	nr_clients++;
    343 	dprint(FD_NET, "client: added <%s>\n", client->hostname);
    344 	return client;
    345 err:
    346 	free(client);
    347 	return NULL;
    348 }
    349 
    350 int fio_client_add_ini_file(void *cookie, const char *ini_file, bool remote)
    351 {
    352 	struct fio_client *client = cookie;
    353 	struct client_file *cf;
    354 	size_t new_size;
    355 	void *new_files;
    356 
    357 	if (!client)
    358 		return 1;
    359 
    360 	dprint(FD_NET, "client <%s>: add ini %s\n", client->hostname, ini_file);
    361 
    362 	new_size = (client->nr_files + 1) * sizeof(struct client_file);
    363 	new_files = realloc(client->files, new_size);
    364 	if (!new_files)
    365 		return 1;
    366 
    367 	client->files = new_files;
    368 	cf = &client->files[client->nr_files];
    369 	cf->file = strdup(ini_file);
    370 	cf->remote = remote;
    371 	client->nr_files++;
    372 	return 0;
    373 }
    374 
    375 int fio_client_add(struct client_ops *ops, const char *hostname, void **cookie)
    376 {
    377 	struct fio_client *existing = *cookie;
    378 	struct fio_client *client;
    379 
    380 	if (existing) {
    381 		/*
    382 		 * We always add our "exec" name as the option, hence 1
    383 		 * means empty.
    384 		 */
    385 		if (existing->argc == 1)
    386 			flist_add_tail(&existing->arg_list, &arg_list);
    387 		else {
    388 			while (!flist_empty(&arg_list))
    389 				flist_del_init(arg_list.next);
    390 		}
    391 	}
    392 
    393 	client = malloc(sizeof(*client));
    394 	memset(client, 0, sizeof(*client));
    395 
    396 	INIT_FLIST_HEAD(&client->list);
    397 	INIT_FLIST_HEAD(&client->hash_list);
    398 	INIT_FLIST_HEAD(&client->arg_list);
    399 	INIT_FLIST_HEAD(&client->eta_list);
    400 	INIT_FLIST_HEAD(&client->cmd_list);
    401 
    402 	if (fio_server_parse_string(hostname, &client->hostname,
    403 					&client->is_sock, &client->port,
    404 					&client->addr.sin_addr,
    405 					&client->addr6.sin6_addr,
    406 					&client->ipv6))
    407 		return -1;
    408 
    409 	client->fd = -1;
    410 	client->ops = ops;
    411 	client->refs = 1;
    412 	client->type = ops->client_type;
    413 
    414 	__fio_client_add_cmd_option(client, "fio");
    415 
    416 	flist_add(&client->list, &client_list);
    417 	nr_clients++;
    418 	dprint(FD_NET, "client: added <%s>\n", client->hostname);
    419 	*cookie = client;
    420 	return 0;
    421 }
    422 
    423 static const char *server_name(struct fio_client *client, char *buf,
    424 			       size_t bufsize)
    425 {
    426 	const char *from;
    427 
    428 	if (client->ipv6)
    429 		from = inet_ntop(AF_INET6, (struct sockaddr *) &client->addr6.sin6_addr, buf, bufsize);
    430 	else if (client->is_sock)
    431 		from = "sock";
    432 	else
    433 		from = inet_ntop(AF_INET, (struct sockaddr *) &client->addr.sin_addr, buf, bufsize);
    434 
    435 	return from;
    436 }
    437 
    438 static void probe_client(struct fio_client *client)
    439 {
    440 	struct cmd_client_probe_pdu pdu;
    441 	const char *sname;
    442 	uint64_t tag;
    443 	char buf[64];
    444 
    445 	dprint(FD_NET, "client: send probe\n");
    446 
    447 #ifdef CONFIG_ZLIB
    448 	pdu.flags = __le64_to_cpu(FIO_PROBE_FLAG_ZLIB);
    449 #else
    450 	pdu.flags = 0;
    451 #endif
    452 
    453 	sname = server_name(client, buf, sizeof(buf));
    454 	memset(pdu.server, 0, sizeof(pdu.server));
    455 	strncpy((char *) pdu.server, sname, sizeof(pdu.server) - 1);
    456 
    457 	fio_net_send_cmd(client->fd, FIO_NET_CMD_PROBE, &pdu, sizeof(pdu), &tag, &client->cmd_list);
    458 }
    459 
    460 static int fio_client_connect_ip(struct fio_client *client)
    461 {
    462 	struct sockaddr *addr;
    463 	socklen_t socklen;
    464 	int fd, domain;
    465 
    466 	if (client->ipv6) {
    467 		client->addr6.sin6_family = AF_INET6;
    468 		client->addr6.sin6_port = htons(client->port);
    469 		domain = AF_INET6;
    470 		addr = (struct sockaddr *) &client->addr6;
    471 		socklen = sizeof(client->addr6);
    472 	} else {
    473 		client->addr.sin_family = AF_INET;
    474 		client->addr.sin_port = htons(client->port);
    475 		domain = AF_INET;
    476 		addr = (struct sockaddr *) &client->addr;
    477 		socklen = sizeof(client->addr);
    478 	}
    479 
    480 	fd = socket(domain, SOCK_STREAM, 0);
    481 	if (fd < 0) {
    482 		int ret = -errno;
    483 
    484 		log_err("fio: socket: %s\n", strerror(errno));
    485 		return ret;
    486 	}
    487 
    488 	if (connect(fd, addr, socklen) < 0) {
    489 		int ret = -errno;
    490 
    491 		log_err("fio: connect: %s\n", strerror(errno));
    492 		log_err("fio: failed to connect to %s:%u\n", client->hostname,
    493 								client->port);
    494 		close(fd);
    495 		return ret;
    496 	}
    497 
    498 	return fd;
    499 }
    500 
    501 static int fio_client_connect_sock(struct fio_client *client)
    502 {
    503 	struct sockaddr_un *addr = &client->addr_un;
    504 	socklen_t len;
    505 	int fd;
    506 
    507 	memset(addr, 0, sizeof(*addr));
    508 	addr->sun_family = AF_UNIX;
    509 	strncpy(addr->sun_path, client->hostname, sizeof(addr->sun_path) - 1);
    510 
    511 	fd = socket(AF_UNIX, SOCK_STREAM, 0);
    512 	if (fd < 0) {
    513 		int ret = -errno;
    514 
    515 		log_err("fio: socket: %s\n", strerror(errno));
    516 		return ret;
    517 	}
    518 
    519 	len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
    520 	if (connect(fd, (struct sockaddr *) addr, len) < 0) {
    521 		int ret = -errno;
    522 
    523 		log_err("fio: connect; %s\n", strerror(errno));
    524 		close(fd);
    525 		return ret;
    526 	}
    527 
    528 	return fd;
    529 }
    530 
    531 int fio_client_connect(struct fio_client *client)
    532 {
    533 	int fd;
    534 
    535 	dprint(FD_NET, "client: connect to host %s\n", client->hostname);
    536 
    537 	if (client->is_sock)
    538 		fd = fio_client_connect_sock(client);
    539 	else
    540 		fd = fio_client_connect_ip(client);
    541 
    542 	dprint(FD_NET, "client: %s connected %d\n", client->hostname, fd);
    543 
    544 	if (fd < 0)
    545 		return fd;
    546 
    547 	client->fd = fd;
    548 	fio_client_add_hash(client);
    549 	client->state = Client_connected;
    550 
    551 	probe_client(client);
    552 	return 0;
    553 }
    554 
    555 int fio_client_terminate(struct fio_client *client)
    556 {
    557 	return fio_net_send_quit(client->fd);
    558 }
    559 
    560 static void fio_clients_terminate(void)
    561 {
    562 	struct flist_head *entry;
    563 	struct fio_client *client;
    564 
    565 	dprint(FD_NET, "client: terminate clients\n");
    566 
    567 	flist_for_each(entry, &client_list) {
    568 		client = flist_entry(entry, struct fio_client, list);
    569 		fio_client_terminate(client);
    570 	}
    571 }
    572 
    573 static void sig_int(int sig)
    574 {
    575 	dprint(FD_NET, "client: got signal %d\n", sig);
    576 	fio_clients_terminate();
    577 }
    578 
    579 static void client_signal_handler(void)
    580 {
    581 	struct sigaction act;
    582 
    583 	memset(&act, 0, sizeof(act));
    584 	act.sa_handler = sig_int;
    585 	act.sa_flags = SA_RESTART;
    586 	sigaction(SIGINT, &act, NULL);
    587 
    588 	memset(&act, 0, sizeof(act));
    589 	act.sa_handler = sig_int;
    590 	act.sa_flags = SA_RESTART;
    591 	sigaction(SIGTERM, &act, NULL);
    592 
    593 /* Windows uses SIGBREAK as a quit signal from other applications */
    594 #ifdef WIN32
    595 	memset(&act, 0, sizeof(act));
    596 	act.sa_handler = sig_int;
    597 	act.sa_flags = SA_RESTART;
    598 	sigaction(SIGBREAK, &act, NULL);
    599 #endif
    600 
    601 	memset(&act, 0, sizeof(act));
    602 	act.sa_handler = sig_show_status;
    603 	act.sa_flags = SA_RESTART;
    604 	sigaction(SIGUSR1, &act, NULL);
    605 }
    606 
    607 static int send_client_cmd_line(struct fio_client *client)
    608 {
    609 	struct cmd_single_line_pdu *cslp;
    610 	struct cmd_line_pdu *clp;
    611 	unsigned long offset;
    612 	unsigned int *lens;
    613 	void *pdu;
    614 	size_t mem;
    615 	int i, ret;
    616 
    617 	dprint(FD_NET, "client: send cmdline %d\n", client->argc);
    618 
    619 	lens = malloc(client->argc * sizeof(unsigned int));
    620 
    621 	/*
    622 	 * Find out how much mem we need
    623 	 */
    624 	for (i = 0, mem = 0; i < client->argc; i++) {
    625 		lens[i] = strlen(client->argv[i]) + 1;
    626 		mem += lens[i];
    627 	}
    628 
    629 	/*
    630 	 * We need one cmd_line_pdu, and argc number of cmd_single_line_pdu
    631 	 */
    632 	mem += sizeof(*clp) + (client->argc * sizeof(*cslp));
    633 
    634 	pdu = malloc(mem);
    635 	clp = pdu;
    636 	offset = sizeof(*clp);
    637 
    638 	for (i = 0; i < client->argc; i++) {
    639 		uint16_t arg_len = lens[i];
    640 
    641 		cslp = pdu + offset;
    642 		strcpy((char *) cslp->text, client->argv[i]);
    643 		cslp->len = cpu_to_le16(arg_len);
    644 		offset += sizeof(*cslp) + arg_len;
    645 	}
    646 
    647 	free(lens);
    648 	clp->lines = cpu_to_le16(client->argc);
    649 	clp->client_type = __cpu_to_le16(client->type);
    650 	ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, NULL, NULL);
    651 	free(pdu);
    652 	return ret;
    653 }
    654 
    655 int fio_clients_connect(void)
    656 {
    657 	struct fio_client *client;
    658 	struct flist_head *entry, *tmp;
    659 	int ret;
    660 
    661 #ifdef WIN32
    662 	WSADATA wsd;
    663 	WSAStartup(MAKEWORD(2, 2), &wsd);
    664 #endif
    665 
    666 	dprint(FD_NET, "client: connect all\n");
    667 
    668 	client_signal_handler();
    669 
    670 	flist_for_each_safe(entry, tmp, &client_list) {
    671 		client = flist_entry(entry, struct fio_client, list);
    672 
    673 		ret = fio_client_connect(client);
    674 		if (ret) {
    675 			remove_client(client);
    676 			continue;
    677 		}
    678 
    679 		if (client->argc > 1)
    680 			send_client_cmd_line(client);
    681 	}
    682 
    683 	return !nr_clients;
    684 }
    685 
    686 int fio_start_client(struct fio_client *client)
    687 {
    688 	dprint(FD_NET, "client: start %s\n", client->hostname);
    689 	return fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_RUN, 0, NULL);
    690 }
    691 
    692 int fio_start_all_clients(void)
    693 {
    694 	struct fio_client *client;
    695 	struct flist_head *entry, *tmp;
    696 	int ret;
    697 
    698 	dprint(FD_NET, "client: start all\n");
    699 
    700 	fio_client_json_init();
    701 
    702 	flist_for_each_safe(entry, tmp, &client_list) {
    703 		client = flist_entry(entry, struct fio_client, list);
    704 
    705 		ret = fio_start_client(client);
    706 		if (ret) {
    707 			remove_client(client);
    708 			continue;
    709 		}
    710 	}
    711 
    712 	return flist_empty(&client_list);
    713 }
    714 
    715 static int __fio_client_send_remote_ini(struct fio_client *client,
    716 					const char *filename)
    717 {
    718 	struct cmd_load_file_pdu *pdu;
    719 	size_t p_size;
    720 	int ret;
    721 
    722 	dprint(FD_NET, "send remote ini %s to %s\n", filename, client->hostname);
    723 
    724 	p_size = sizeof(*pdu) + strlen(filename) + 1;
    725 	pdu = malloc(p_size);
    726 	memset(pdu, 0, p_size);
    727 	pdu->name_len = strlen(filename);
    728 	strcpy((char *) pdu->file, filename);
    729 	pdu->client_type = cpu_to_le16((uint16_t) client->type);
    730 
    731 	client->sent_job = 1;
    732 	ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_LOAD_FILE, pdu, p_size,NULL, NULL);
    733 	free(pdu);
    734 	return ret;
    735 }
    736 
    737 /*
    738  * Send file contents to server backend. We could use sendfile(), but to remain
    739  * more portable lets just read/write the darn thing.
    740  */
    741 static int __fio_client_send_local_ini(struct fio_client *client,
    742 				       const char *filename)
    743 {
    744 	struct cmd_job_pdu *pdu;
    745 	size_t p_size;
    746 	struct stat sb;
    747 	char *p;
    748 	void *buf;
    749 	off_t len;
    750 	int fd, ret;
    751 
    752 	dprint(FD_NET, "send ini %s to %s\n", filename, client->hostname);
    753 
    754 	fd = open(filename, O_RDONLY);
    755 	if (fd < 0) {
    756 		ret = -errno;
    757 		log_err("fio: job file <%s> open: %s\n", filename, strerror(errno));
    758 		return ret;
    759 	}
    760 
    761 	if (fstat(fd, &sb) < 0) {
    762 		ret = -errno;
    763 		log_err("fio: job file stat: %s\n", strerror(errno));
    764 		close(fd);
    765 		return ret;
    766 	}
    767 
    768 	p_size = sb.st_size + sizeof(*pdu);
    769 	pdu = malloc(p_size);
    770 	buf = pdu->buf;
    771 
    772 	len = sb.st_size;
    773 	p = buf;
    774 	if (read_data(fd, p, len)) {
    775 		log_err("fio: failed reading job file %s\n", filename);
    776 		close(fd);
    777 		free(pdu);
    778 		return 1;
    779 	}
    780 
    781 	pdu->buf_len = __cpu_to_le32(sb.st_size);
    782 	pdu->client_type = cpu_to_le32(client->type);
    783 
    784 	client->sent_job = 1;
    785 	ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, pdu, p_size, NULL, NULL);
    786 	free(pdu);
    787 	close(fd);
    788 	return ret;
    789 }
    790 
    791 int fio_client_send_ini(struct fio_client *client, const char *filename,
    792 			bool remote)
    793 {
    794 	int ret;
    795 
    796 	if (!remote)
    797 		ret = __fio_client_send_local_ini(client, filename);
    798 	else
    799 		ret = __fio_client_send_remote_ini(client, filename);
    800 
    801 	if (!ret)
    802 		client->sent_job = 1;
    803 
    804 	return ret;
    805 }
    806 
    807 static int fio_client_send_cf(struct fio_client *client,
    808 			      struct client_file *cf)
    809 {
    810 	return fio_client_send_ini(client, cf->file, cf->remote);
    811 }
    812 
    813 int fio_clients_send_ini(const char *filename)
    814 {
    815 	struct fio_client *client;
    816 	struct flist_head *entry, *tmp;
    817 
    818 	flist_for_each_safe(entry, tmp, &client_list) {
    819 		bool failed = false;
    820 
    821 		client = flist_entry(entry, struct fio_client, list);
    822 
    823 		if (client->nr_files) {
    824 			int i;
    825 
    826 			for (i = 0; i < client->nr_files; i++) {
    827 				struct client_file *cf;
    828 
    829 				cf = &client->files[i];
    830 
    831 				if (fio_client_send_cf(client, cf)) {
    832 					failed = true;
    833 					remove_client(client);
    834 					break;
    835 				}
    836 			}
    837 		}
    838 		if (client->sent_job || failed)
    839 			continue;
    840 		if (!filename || fio_client_send_ini(client, filename, 0))
    841 			remove_client(client);
    842 	}
    843 
    844 	return !nr_clients;
    845 }
    846 
    847 int fio_client_update_options(struct fio_client *client,
    848 			      struct thread_options *o, uint64_t *tag)
    849 {
    850 	struct cmd_add_job_pdu pdu;
    851 
    852 	pdu.thread_number = cpu_to_le32(client->thread_number);
    853 	pdu.groupid = cpu_to_le32(client->groupid);
    854 	convert_thread_options_to_net(&pdu.top, o);
    855 
    856 	return fio_net_send_cmd(client->fd, FIO_NET_CMD_UPDATE_JOB, &pdu, sizeof(pdu), tag, &client->cmd_list);
    857 }
    858 
    859 static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
    860 {
    861 	dst->max_val	= le64_to_cpu(src->max_val);
    862 	dst->min_val	= le64_to_cpu(src->min_val);
    863 	dst->samples	= le64_to_cpu(src->samples);
    864 
    865 	/*
    866 	 * Floats arrive as IEEE 754 encoded uint64_t, convert back to double
    867 	 */
    868 	dst->mean.u.f	= fio_uint64_to_double(le64_to_cpu(dst->mean.u.i));
    869 	dst->S.u.f	= fio_uint64_to_double(le64_to_cpu(dst->S.u.i));
    870 }
    871 
    872 static void convert_ts(struct thread_stat *dst, struct thread_stat *src)
    873 {
    874 	int i, j;
    875 
    876 	dst->error		= le32_to_cpu(src->error);
    877 	dst->thread_number	= le32_to_cpu(src->thread_number);
    878 	dst->groupid		= le32_to_cpu(src->groupid);
    879 	dst->pid		= le32_to_cpu(src->pid);
    880 	dst->members		= le32_to_cpu(src->members);
    881 	dst->unified_rw_rep	= le32_to_cpu(src->unified_rw_rep);
    882 
    883 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
    884 		convert_io_stat(&dst->clat_stat[i], &src->clat_stat[i]);
    885 		convert_io_stat(&dst->slat_stat[i], &src->slat_stat[i]);
    886 		convert_io_stat(&dst->lat_stat[i], &src->lat_stat[i]);
    887 		convert_io_stat(&dst->bw_stat[i], &src->bw_stat[i]);
    888 	}
    889 
    890 	dst->usr_time		= le64_to_cpu(src->usr_time);
    891 	dst->sys_time		= le64_to_cpu(src->sys_time);
    892 	dst->ctx		= le64_to_cpu(src->ctx);
    893 	dst->minf		= le64_to_cpu(src->minf);
    894 	dst->majf		= le64_to_cpu(src->majf);
    895 	dst->clat_percentiles	= le64_to_cpu(src->clat_percentiles);
    896 	dst->percentile_precision = le64_to_cpu(src->percentile_precision);
    897 
    898 	for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
    899 		fio_fp64_t *fps = &src->percentile_list[i];
    900 		fio_fp64_t *fpd = &dst->percentile_list[i];
    901 
    902 		fpd->u.f = fio_uint64_to_double(le64_to_cpu(fps->u.i));
    903 	}
    904 
    905 	for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
    906 		dst->io_u_map[i]	= le32_to_cpu(src->io_u_map[i]);
    907 		dst->io_u_submit[i]	= le32_to_cpu(src->io_u_submit[i]);
    908 		dst->io_u_complete[i]	= le32_to_cpu(src->io_u_complete[i]);
    909 	}
    910 
    911 	for (i = 0; i < FIO_IO_U_LAT_U_NR; i++)
    912 		dst->io_u_lat_u[i]	= le32_to_cpu(src->io_u_lat_u[i]);
    913 	for (i = 0; i < FIO_IO_U_LAT_M_NR; i++)
    914 		dst->io_u_lat_m[i]	= le32_to_cpu(src->io_u_lat_m[i]);
    915 
    916 	for (i = 0; i < DDIR_RWDIR_CNT; i++)
    917 		for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
    918 			dst->io_u_plat[i][j] = le32_to_cpu(src->io_u_plat[i][j]);
    919 
    920 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
    921 		dst->total_io_u[i]	= le64_to_cpu(src->total_io_u[i]);
    922 		dst->short_io_u[i]	= le64_to_cpu(src->short_io_u[i]);
    923 		dst->drop_io_u[i]	= le64_to_cpu(src->drop_io_u[i]);
    924 	}
    925 
    926 	dst->total_submit	= le64_to_cpu(src->total_submit);
    927 	dst->total_complete	= le64_to_cpu(src->total_complete);
    928 
    929 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
    930 		dst->io_bytes[i]	= le64_to_cpu(src->io_bytes[i]);
    931 		dst->runtime[i]		= le64_to_cpu(src->runtime[i]);
    932 	}
    933 
    934 	dst->total_run_time	= le64_to_cpu(src->total_run_time);
    935 	dst->continue_on_error	= le16_to_cpu(src->continue_on_error);
    936 	dst->total_err_count	= le64_to_cpu(src->total_err_count);
    937 	dst->first_error	= le32_to_cpu(src->first_error);
    938 	dst->kb_base		= le32_to_cpu(src->kb_base);
    939 	dst->unit_base		= le32_to_cpu(src->unit_base);
    940 
    941 	dst->latency_depth	= le32_to_cpu(src->latency_depth);
    942 	dst->latency_target	= le64_to_cpu(src->latency_target);
    943 	dst->latency_window	= le64_to_cpu(src->latency_window);
    944 	dst->latency_percentile.u.f = fio_uint64_to_double(le64_to_cpu(src->latency_percentile.u.i));
    945 
    946 	dst->nr_block_infos	= le64_to_cpu(src->nr_block_infos);
    947 	for (i = 0; i < dst->nr_block_infos; i++)
    948 		dst->block_infos[i] = le32_to_cpu(src->block_infos[i]);
    949 
    950 	dst->ss_dur		= le64_to_cpu(src->ss_dur);
    951 	dst->ss_state		= le32_to_cpu(src->ss_state);
    952 	dst->ss_head		= le32_to_cpu(src->ss_head);
    953 	dst->ss_limit.u.f 	= fio_uint64_to_double(le64_to_cpu(src->ss_limit.u.i));
    954 	dst->ss_slope.u.f 	= fio_uint64_to_double(le64_to_cpu(src->ss_slope.u.i));
    955 	dst->ss_deviation.u.f 	= fio_uint64_to_double(le64_to_cpu(src->ss_deviation.u.i));
    956 	dst->ss_criterion.u.f 	= fio_uint64_to_double(le64_to_cpu(src->ss_criterion.u.i));
    957 
    958 	if (dst->ss_state & __FIO_SS_DATA) {
    959 		for (i = 0; i < dst->ss_dur; i++ ) {
    960 			dst->ss_iops_data[i] = le64_to_cpu(src->ss_iops_data[i]);
    961 			dst->ss_bw_data[i] = le64_to_cpu(src->ss_bw_data[i]);
    962 		}
    963 	}
    964 }
    965 
    966 static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
    967 {
    968 	int i;
    969 
    970 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
    971 		dst->max_run[i]		= le64_to_cpu(src->max_run[i]);
    972 		dst->min_run[i]		= le64_to_cpu(src->min_run[i]);
    973 		dst->max_bw[i]		= le64_to_cpu(src->max_bw[i]);
    974 		dst->min_bw[i]		= le64_to_cpu(src->min_bw[i]);
    975 		dst->iobytes[i]		= le64_to_cpu(src->iobytes[i]);
    976 		dst->agg[i]		= le64_to_cpu(src->agg[i]);
    977 	}
    978 
    979 	dst->kb_base	= le32_to_cpu(src->kb_base);
    980 	dst->unit_base	= le32_to_cpu(src->unit_base);
    981 	dst->groupid	= le32_to_cpu(src->groupid);
    982 	dst->unified_rw_rep	= le32_to_cpu(src->unified_rw_rep);
    983 }
    984 
    985 static void json_object_add_client_info(struct json_object *obj,
    986 					struct fio_client *client)
    987 {
    988 	const char *hostname = client->hostname ? client->hostname : "";
    989 
    990 	json_object_add_value_string(obj, "hostname", hostname);
    991 	json_object_add_value_int(obj, "port", client->port);
    992 }
    993 
    994 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd)
    995 {
    996 	struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
    997 	struct flist_head *opt_list = NULL;
    998 	struct json_object *tsobj;
    999 
   1000 	if (client->opt_lists && p->ts.thread_number <= client->jobs)
   1001 		opt_list = &client->opt_lists[p->ts.thread_number - 1];
   1002 
   1003 	tsobj = show_thread_status(&p->ts, &p->rs, opt_list, NULL);
   1004 	client->did_stat = 1;
   1005 	if (tsobj) {
   1006 		json_object_add_client_info(tsobj, client);
   1007 		json_array_add_value_object(clients_array, tsobj);
   1008 	}
   1009 
   1010 	if (sum_stat_clients <= 1)
   1011 		return;
   1012 
   1013 	sum_thread_stats(&client_ts, &p->ts, sum_stat_nr == 1);
   1014 	sum_group_stats(&client_gs, &p->rs);
   1015 
   1016 	client_ts.members++;
   1017 	client_ts.thread_number = p->ts.thread_number;
   1018 	client_ts.groupid = p->ts.groupid;
   1019 	client_ts.unified_rw_rep = p->ts.unified_rw_rep;
   1020 
   1021 	if (++sum_stat_nr == sum_stat_clients) {
   1022 		strcpy(client_ts.name, "All clients");
   1023 		tsobj = show_thread_status(&client_ts, &client_gs, NULL, NULL);
   1024 		if (tsobj) {
   1025 			json_object_add_client_info(tsobj, client);
   1026 			json_array_add_value_object(clients_array, tsobj);
   1027 		}
   1028 	}
   1029 }
   1030 
   1031 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd)
   1032 {
   1033 	struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
   1034 
   1035 	if (output_format & FIO_OUTPUT_NORMAL)
   1036 		show_group_stats(gs, NULL);
   1037 }
   1038 
   1039 static void handle_job_opt(struct fio_client *client, struct fio_net_cmd *cmd)
   1040 {
   1041 	struct cmd_job_option *pdu = (struct cmd_job_option *) cmd->payload;
   1042 	struct print_option *p;
   1043 
   1044 	if (!job_opt_object)
   1045 		return;
   1046 
   1047 	pdu->global = le16_to_cpu(pdu->global);
   1048 	pdu->truncated = le16_to_cpu(pdu->truncated);
   1049 	pdu->groupid = le32_to_cpu(pdu->groupid);
   1050 
   1051 	p = malloc(sizeof(*p));
   1052 	p->name = strdup((char *) pdu->name);
   1053 	if (pdu->value[0] != '\0')
   1054 		p->value = strdup((char *) pdu->value);
   1055 	else
   1056 		p->value = NULL;
   1057 
   1058 	if (pdu->global) {
   1059 		const char *pos = "";
   1060 
   1061 		if (p->value)
   1062 			pos = p->value;
   1063 
   1064 		json_object_add_value_string(job_opt_object, p->name, pos);
   1065 	} else if (client->opt_lists) {
   1066 		struct flist_head *opt_list = &client->opt_lists[pdu->groupid];
   1067 
   1068 		flist_add_tail(&p->list, opt_list);
   1069 	}
   1070 }
   1071 
   1072 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd)
   1073 {
   1074 	struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload;
   1075 	const char *buf = (const char *) pdu->buf;
   1076 	const char *name;
   1077 	int fio_unused ret;
   1078 
   1079 	name = client->name ? client->name : client->hostname;
   1080 
   1081 	if (!client->skip_newline)
   1082 		fprintf(f_out, "<%s> ", name);
   1083 	ret = fwrite(buf, pdu->buf_len, 1, f_out);
   1084 	fflush(f_out);
   1085 	client->skip_newline = strchr(buf, '\n') == NULL;
   1086 }
   1087 
   1088 static void convert_agg(struct disk_util_agg *agg)
   1089 {
   1090 	int i;
   1091 
   1092 	for (i = 0; i < 2; i++) {
   1093 		agg->ios[i]	= le64_to_cpu(agg->ios[i]);
   1094 		agg->merges[i]	= le64_to_cpu(agg->merges[i]);
   1095 		agg->sectors[i]	= le64_to_cpu(agg->sectors[i]);
   1096 		agg->ticks[i]	= le64_to_cpu(agg->ticks[i]);
   1097 	}
   1098 
   1099 	agg->io_ticks		= le64_to_cpu(agg->io_ticks);
   1100 	agg->time_in_queue	= le64_to_cpu(agg->time_in_queue);
   1101 	agg->slavecount		= le32_to_cpu(agg->slavecount);
   1102 	agg->max_util.u.f	= fio_uint64_to_double(le64_to_cpu(agg->max_util.u.i));
   1103 }
   1104 
   1105 static void convert_dus(struct disk_util_stat *dus)
   1106 {
   1107 	int i;
   1108 
   1109 	for (i = 0; i < 2; i++) {
   1110 		dus->s.ios[i]		= le64_to_cpu(dus->s.ios[i]);
   1111 		dus->s.merges[i]	= le64_to_cpu(dus->s.merges[i]);
   1112 		dus->s.sectors[i]	= le64_to_cpu(dus->s.sectors[i]);
   1113 		dus->s.ticks[i]		= le64_to_cpu(dus->s.ticks[i]);
   1114 	}
   1115 
   1116 	dus->s.io_ticks		= le64_to_cpu(dus->s.io_ticks);
   1117 	dus->s.time_in_queue	= le64_to_cpu(dus->s.time_in_queue);
   1118 	dus->s.msec		= le64_to_cpu(dus->s.msec);
   1119 }
   1120 
   1121 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd)
   1122 {
   1123 	struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
   1124 
   1125 	if (!client->disk_stats_shown) {
   1126 		client->disk_stats_shown = 1;
   1127 		log_info("\nDisk stats (read/write):\n");
   1128 	}
   1129 
   1130 	if (output_format & FIO_OUTPUT_JSON) {
   1131 		struct json_object *duobj;
   1132 		json_array_add_disk_util(&du->dus, &du->agg, du_array);
   1133 		duobj = json_array_last_value_object(du_array);
   1134 		json_object_add_client_info(duobj, client);
   1135 	}
   1136 	if (output_format & FIO_OUTPUT_TERSE)
   1137 		print_disk_util(&du->dus, &du->agg, 1, NULL);
   1138 	if (output_format & FIO_OUTPUT_NORMAL)
   1139 		print_disk_util(&du->dus, &du->agg, 0, NULL);
   1140 }
   1141 
   1142 static void convert_jobs_eta(struct jobs_eta *je)
   1143 {
   1144 	int i;
   1145 
   1146 	je->nr_running		= le32_to_cpu(je->nr_running);
   1147 	je->nr_ramp		= le32_to_cpu(je->nr_ramp);
   1148 	je->nr_pending		= le32_to_cpu(je->nr_pending);
   1149 	je->nr_setting_up	= le32_to_cpu(je->nr_setting_up);
   1150 	je->files_open		= le32_to_cpu(je->files_open);
   1151 
   1152 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
   1153 		je->m_rate[i]	= le64_to_cpu(je->m_rate[i]);
   1154 		je->t_rate[i]	= le64_to_cpu(je->t_rate[i]);
   1155 		je->m_iops[i]	= le32_to_cpu(je->m_iops[i]);
   1156 		je->t_iops[i]	= le32_to_cpu(je->t_iops[i]);
   1157 		je->rate[i]	= le64_to_cpu(je->rate[i]);
   1158 		je->iops[i]	= le32_to_cpu(je->iops[i]);
   1159 	}
   1160 
   1161 	je->elapsed_sec		= le64_to_cpu(je->elapsed_sec);
   1162 	je->eta_sec		= le64_to_cpu(je->eta_sec);
   1163 	je->nr_threads		= le32_to_cpu(je->nr_threads);
   1164 	je->is_pow2		= le32_to_cpu(je->is_pow2);
   1165 	je->unit_base		= le32_to_cpu(je->unit_base);
   1166 }
   1167 
   1168 void fio_client_sum_jobs_eta(struct jobs_eta *dst, struct jobs_eta *je)
   1169 {
   1170 	int i;
   1171 
   1172 	dst->nr_running		+= je->nr_running;
   1173 	dst->nr_ramp		+= je->nr_ramp;
   1174 	dst->nr_pending		+= je->nr_pending;
   1175 	dst->nr_setting_up	+= je->nr_setting_up;
   1176 	dst->files_open		+= je->files_open;
   1177 
   1178 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
   1179 		dst->m_rate[i]	+= je->m_rate[i];
   1180 		dst->t_rate[i]	+= je->t_rate[i];
   1181 		dst->m_iops[i]	+= je->m_iops[i];
   1182 		dst->t_iops[i]	+= je->t_iops[i];
   1183 		dst->rate[i]	+= je->rate[i];
   1184 		dst->iops[i]	+= je->iops[i];
   1185 	}
   1186 
   1187 	dst->elapsed_sec	+= je->elapsed_sec;
   1188 
   1189 	if (je->eta_sec > dst->eta_sec)
   1190 		dst->eta_sec = je->eta_sec;
   1191 
   1192 	dst->nr_threads		+= je->nr_threads;
   1193 
   1194 	/*
   1195 	 * This wont be correct for multiple strings, but at least it
   1196 	 * works for the basic cases.
   1197 	 */
   1198 	strcpy((char *) dst->run_str, (char *) je->run_str);
   1199 }
   1200 
   1201 static bool remove_reply_cmd(struct fio_client *client, struct fio_net_cmd *cmd)
   1202 {
   1203 	struct fio_net_cmd_reply *reply = NULL;
   1204 	struct flist_head *entry;
   1205 
   1206 	flist_for_each(entry, &client->cmd_list) {
   1207 		reply = flist_entry(entry, struct fio_net_cmd_reply, list);
   1208 
   1209 		if (cmd->tag == (uintptr_t) reply)
   1210 			break;
   1211 
   1212 		reply = NULL;
   1213 	}
   1214 
   1215 	if (!reply) {
   1216 		log_err("fio: client: unable to find matching tag (%llx)\n", (unsigned long long) cmd->tag);
   1217 		return false;
   1218 	}
   1219 
   1220 	flist_del(&reply->list);
   1221 	cmd->tag = reply->saved_tag;
   1222 	free(reply);
   1223 	return true;
   1224 }
   1225 
   1226 int fio_client_wait_for_reply(struct fio_client *client, uint64_t tag)
   1227 {
   1228 	do {
   1229 		struct fio_net_cmd_reply *reply = NULL;
   1230 		struct flist_head *entry;
   1231 
   1232 		flist_for_each(entry, &client->cmd_list) {
   1233 			reply = flist_entry(entry, struct fio_net_cmd_reply, list);
   1234 
   1235 			if (tag == (uintptr_t) reply)
   1236 				break;
   1237 
   1238 			reply = NULL;
   1239 		}
   1240 
   1241 		if (!reply)
   1242 			break;
   1243 
   1244 		usleep(1000);
   1245 	} while (1);
   1246 
   1247 	return 0;
   1248 }
   1249 
   1250 static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd)
   1251 {
   1252 	struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
   1253 	struct client_eta *eta = (struct client_eta *) (uintptr_t) cmd->tag;
   1254 
   1255 	dprint(FD_NET, "client: got eta tag %p, %d\n", eta, eta->pending);
   1256 
   1257 	assert(client->eta_in_flight == eta);
   1258 
   1259 	client->eta_in_flight = NULL;
   1260 	flist_del_init(&client->eta_list);
   1261 	client->eta_timeouts = 0;
   1262 
   1263 	if (client->ops->jobs_eta)
   1264 		client->ops->jobs_eta(client, je);
   1265 
   1266 	fio_client_sum_jobs_eta(&eta->eta, je);
   1267 	fio_client_dec_jobs_eta(eta, client->ops->eta);
   1268 }
   1269 
   1270 static void client_flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
   1271 				      uint64_t sample_size)
   1272 {
   1273 	struct io_sample *s;
   1274 	int log_offset;
   1275 	uint64_t i, j, nr_samples;
   1276 	struct io_u_plat_entry *entry;
   1277 	unsigned int *io_u_plat;
   1278 
   1279 	int stride = 1 << hist_coarseness;
   1280 
   1281 	if (!sample_size)
   1282 		return;
   1283 
   1284 	s = __get_sample(samples, 0, 0);
   1285 	log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
   1286 
   1287 	nr_samples = sample_size / __log_entry_sz(log_offset);
   1288 
   1289 	for (i = 0; i < nr_samples; i++) {
   1290 
   1291 		s = (struct io_sample *)((char *)__get_sample(samples, log_offset, i) +
   1292 			i * sizeof(struct io_u_plat_entry));
   1293 
   1294 		entry = s->data.plat_entry;
   1295 		io_u_plat = entry->io_u_plat;
   1296 
   1297 		fprintf(f, "%lu, %u, %u, ", (unsigned long) s->time,
   1298 						io_sample_ddir(s), s->bs);
   1299 		for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
   1300 			fprintf(f, "%lu, ", hist_sum(j, stride, io_u_plat, NULL));
   1301 		}
   1302 		fprintf(f, "%lu\n", (unsigned long)
   1303 			hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat, NULL));
   1304 
   1305 	}
   1306 }
   1307 
   1308 static int fio_client_handle_iolog(struct fio_client *client,
   1309 				   struct fio_net_cmd *cmd)
   1310 {
   1311 	struct cmd_iolog_pdu *pdu;
   1312 	bool store_direct;
   1313 	char *log_pathname;
   1314 
   1315 	pdu = convert_iolog(cmd, &store_direct);
   1316 	if (!pdu) {
   1317 		log_err("fio: failed converting IO log\n");
   1318 		return 1;
   1319 	}
   1320 
   1321         /* allocate buffer big enough for next sprintf() call */
   1322 	log_pathname = malloc(10 + strlen((char *)pdu->name) +
   1323 			strlen(client->hostname));
   1324 	if (!log_pathname) {
   1325 		log_err("fio: memory allocation of unique pathname failed\n");
   1326 		return -1;
   1327 	}
   1328 	/* generate a unique pathname for the log file using hostname */
   1329 	sprintf(log_pathname, "%s.%s", pdu->name, client->hostname);
   1330 
   1331 	if (store_direct) {
   1332 		ssize_t ret;
   1333 		size_t sz;
   1334 		int fd;
   1335 
   1336 		fd = open((const char *) log_pathname,
   1337 				O_WRONLY | O_CREAT | O_TRUNC, 0644);
   1338 		if (fd < 0) {
   1339 			log_err("fio: open log %s: %s\n",
   1340 				log_pathname, strerror(errno));
   1341 			return 1;
   1342 		}
   1343 
   1344 		sz = cmd->pdu_len - sizeof(*pdu);
   1345 		ret = write(fd, pdu->samples, sz);
   1346 		close(fd);
   1347 
   1348 		if (ret != sz) {
   1349 			log_err("fio: short write on compressed log\n");
   1350 			return 1;
   1351 		}
   1352 
   1353 		return 0;
   1354 	} else {
   1355 		FILE *f;
   1356 		f = fopen((const char *) log_pathname, "w");
   1357 		if (!f) {
   1358 			log_err("fio: fopen log %s : %s\n",
   1359 				log_pathname, strerror(errno));
   1360 			return 1;
   1361 		}
   1362 
   1363 		if (pdu->log_type == IO_LOG_TYPE_HIST) {
   1364 			client_flush_hist_samples(f, pdu->log_hist_coarseness, pdu->samples,
   1365 					   pdu->nr_samples * sizeof(struct io_sample));
   1366 		} else {
   1367 			flush_samples(f, pdu->samples,
   1368 					pdu->nr_samples * sizeof(struct io_sample));
   1369 		}
   1370 		fclose(f);
   1371 		return 0;
   1372 	}
   1373 }
   1374 
   1375 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
   1376 {
   1377 	struct cmd_probe_reply_pdu *probe = (struct cmd_probe_reply_pdu *) cmd->payload;
   1378 	const char *os, *arch;
   1379 	char bit[16];
   1380 
   1381 	os = fio_get_os_string(probe->os);
   1382 	if (!os)
   1383 		os = "unknown";
   1384 
   1385 	arch = fio_get_arch_string(probe->arch);
   1386 	if (!arch)
   1387 		os = "unknown";
   1388 
   1389 	sprintf(bit, "%d-bit", probe->bpp * 8);
   1390 	probe->flags = le64_to_cpu(probe->flags);
   1391 
   1392 	log_info("hostname=%s, be=%u, %s, os=%s, arch=%s, fio=%s, flags=%lx\n",
   1393 		probe->hostname, probe->bigendian, bit, os, arch,
   1394 		probe->fio_version, (unsigned long) probe->flags);
   1395 
   1396 	if (!client->name)
   1397 		client->name = strdup((char *) probe->hostname);
   1398 }
   1399 
   1400 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd)
   1401 {
   1402 	struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
   1403 
   1404 	client->state = Client_started;
   1405 	client->jobs = le32_to_cpu(pdu->jobs);
   1406 	client->nr_stat = le32_to_cpu(pdu->stat_outputs);
   1407 
   1408 	if (client->jobs) {
   1409 		int i;
   1410 
   1411 		if (client->opt_lists)
   1412 			free(client->opt_lists);
   1413 
   1414 		client->opt_lists = malloc(client->jobs * sizeof(struct flist_head));
   1415 		for (i = 0; i < client->jobs; i++)
   1416 			INIT_FLIST_HEAD(&client->opt_lists[i]);
   1417 	}
   1418 
   1419 	sum_stat_clients += client->nr_stat;
   1420 }
   1421 
   1422 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd)
   1423 {
   1424 	if (client->error)
   1425 		log_info("client <%s>: exited with error %d\n", client->hostname, client->error);
   1426 }
   1427 
   1428 static void convert_stop(struct fio_net_cmd *cmd)
   1429 {
   1430 	struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
   1431 
   1432 	pdu->error = le32_to_cpu(pdu->error);
   1433 }
   1434 
   1435 static void convert_text(struct fio_net_cmd *cmd)
   1436 {
   1437 	struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload;
   1438 
   1439 	pdu->level	= le32_to_cpu(pdu->level);
   1440 	pdu->buf_len	= le32_to_cpu(pdu->buf_len);
   1441 	pdu->log_sec	= le64_to_cpu(pdu->log_sec);
   1442 	pdu->log_usec	= le64_to_cpu(pdu->log_usec);
   1443 }
   1444 
   1445 static struct cmd_iolog_pdu *convert_iolog_gz(struct fio_net_cmd *cmd,
   1446 					      struct cmd_iolog_pdu *pdu)
   1447 {
   1448 #ifdef CONFIG_ZLIB
   1449 	struct cmd_iolog_pdu *ret;
   1450 	z_stream stream;
   1451 	uint32_t nr_samples;
   1452 	size_t total;
   1453 	void *p;
   1454 
   1455 	stream.zalloc = Z_NULL;
   1456 	stream.zfree = Z_NULL;
   1457 	stream.opaque = Z_NULL;
   1458 	stream.avail_in = 0;
   1459 	stream.next_in = Z_NULL;
   1460 
   1461 	if (inflateInit(&stream) != Z_OK)
   1462 		return NULL;
   1463 
   1464 	/*
   1465 	 * Get header first, it's not compressed
   1466 	 */
   1467 	nr_samples = le64_to_cpu(pdu->nr_samples);
   1468 
   1469 	if (pdu->log_type == IO_LOG_TYPE_HIST)
   1470 		total = nr_samples * (__log_entry_sz(le32_to_cpu(pdu->log_offset)) +
   1471 					sizeof(struct io_u_plat_entry));
   1472 	else
   1473 		total = nr_samples * __log_entry_sz(le32_to_cpu(pdu->log_offset));
   1474 	ret = malloc(total + sizeof(*pdu));
   1475 	ret->nr_samples = nr_samples;
   1476 
   1477 	memcpy(ret, pdu, sizeof(*pdu));
   1478 
   1479 	p = (void *) ret + sizeof(*pdu);
   1480 
   1481 	stream.avail_in = cmd->pdu_len - sizeof(*pdu);
   1482 	stream.next_in = (void *) pdu + sizeof(*pdu);
   1483 	while (stream.avail_in) {
   1484 		unsigned int this_chunk = 65536;
   1485 		unsigned int this_len;
   1486 		int err;
   1487 
   1488 		if (this_chunk > total)
   1489 			this_chunk = total;
   1490 
   1491 		stream.avail_out = this_chunk;
   1492 		stream.next_out = p;
   1493 		err = inflate(&stream, Z_NO_FLUSH);
   1494 		/* may be Z_OK, or Z_STREAM_END */
   1495 		if (err < 0) {
   1496 			log_err("fio: inflate error %d\n", err);
   1497 			free(ret);
   1498 			ret = NULL;
   1499 			goto err;
   1500 		}
   1501 
   1502 		this_len = this_chunk - stream.avail_out;
   1503 		p += this_len;
   1504 		total -= this_len;
   1505 	}
   1506 
   1507 err:
   1508 	inflateEnd(&stream);
   1509 	return ret;
   1510 #else
   1511 	return NULL;
   1512 #endif
   1513 }
   1514 
   1515 /*
   1516  * This has been compressed on the server side, since it can be big.
   1517  * Uncompress here.
   1518  */
   1519 static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd,
   1520 					   bool *store_direct)
   1521 {
   1522 	struct cmd_iolog_pdu *pdu = (struct cmd_iolog_pdu *) cmd->payload;
   1523 	struct cmd_iolog_pdu *ret;
   1524 	uint64_t i;
   1525 	int compressed;
   1526 	void *samples;
   1527 
   1528 	*store_direct = false;
   1529 
   1530 	/*
   1531 	 * Convert if compressed and we support it. If it's not
   1532 	 * compressed, we need not do anything.
   1533 	 */
   1534 	compressed = le32_to_cpu(pdu->compressed);
   1535 	if (compressed == XMIT_COMPRESSED) {
   1536 #ifndef CONFIG_ZLIB
   1537 		log_err("fio: server sent compressed data by mistake\n");
   1538 		return NULL;
   1539 #endif
   1540 		ret = convert_iolog_gz(cmd, pdu);
   1541 		if (!ret) {
   1542 			log_err("fio: failed decompressing log\n");
   1543 			return NULL;
   1544 		}
   1545 	} else if (compressed == STORE_COMPRESSED) {
   1546 		*store_direct = true;
   1547 		ret = pdu;
   1548 	} else
   1549 		ret = pdu;
   1550 
   1551 	ret->nr_samples		= le64_to_cpu(ret->nr_samples);
   1552 	ret->thread_number	= le32_to_cpu(ret->thread_number);
   1553 	ret->log_type		= le32_to_cpu(ret->log_type);
   1554 	ret->compressed		= le32_to_cpu(ret->compressed);
   1555 	ret->log_offset		= le32_to_cpu(ret->log_offset);
   1556 	ret->log_hist_coarseness = le32_to_cpu(ret->log_hist_coarseness);
   1557 
   1558 	if (*store_direct)
   1559 		return ret;
   1560 
   1561 	samples = &ret->samples[0];
   1562 	for (i = 0; i < ret->nr_samples; i++) {
   1563 		struct io_sample *s;
   1564 
   1565 		s = __get_sample(samples, ret->log_offset, i);
   1566 		if (ret->log_type == IO_LOG_TYPE_HIST)
   1567 			s = (struct io_sample *)((void *)s + sizeof(struct io_u_plat_entry) * i);
   1568 
   1569 		s->time		= le64_to_cpu(s->time);
   1570 		s->data.val	= le64_to_cpu(s->data.val);
   1571 		s->__ddir	= le32_to_cpu(s->__ddir);
   1572 		s->bs		= le32_to_cpu(s->bs);
   1573 
   1574 		if (ret->log_offset) {
   1575 			struct io_sample_offset *so = (void *) s;
   1576 
   1577 			so->offset = le64_to_cpu(so->offset);
   1578 		}
   1579 
   1580 		if (ret->log_type == IO_LOG_TYPE_HIST) {
   1581 			s->data.plat_entry = (struct io_u_plat_entry *)(((void *)s) + sizeof(*s));
   1582 			s->data.plat_entry->list.next = NULL;
   1583 			s->data.plat_entry->list.prev = NULL;
   1584 		}
   1585 	}
   1586 
   1587 	return ret;
   1588 }
   1589 
   1590 static void sendfile_reply(int fd, struct cmd_sendfile_reply *rep,
   1591 			   size_t size, uint64_t tag)
   1592 {
   1593 	rep->error = cpu_to_le32(rep->error);
   1594 	fio_net_send_cmd(fd, FIO_NET_CMD_SENDFILE, rep, size, &tag, NULL);
   1595 }
   1596 
   1597 static int fio_send_file(struct fio_client *client, struct cmd_sendfile *pdu,
   1598 			 uint64_t tag)
   1599 {
   1600 	struct cmd_sendfile_reply *rep;
   1601 	struct stat sb;
   1602 	size_t size;
   1603 	int fd;
   1604 
   1605 	size = sizeof(*rep);
   1606 	rep = malloc(size);
   1607 
   1608 	if (stat((char *)pdu->path, &sb) < 0) {
   1609 fail:
   1610 		rep->error = errno;
   1611 		sendfile_reply(client->fd, rep, size, tag);
   1612 		free(rep);
   1613 		return 1;
   1614 	}
   1615 
   1616 	size += sb.st_size;
   1617 	rep = realloc(rep, size);
   1618 	rep->size = cpu_to_le32((uint32_t) sb.st_size);
   1619 
   1620 	fd = open((char *)pdu->path, O_RDONLY);
   1621 	if (fd == -1 )
   1622 		goto fail;
   1623 
   1624 	rep->error = read_data(fd, &rep->data, sb.st_size);
   1625 	sendfile_reply(client->fd, rep, size, tag);
   1626 	free(rep);
   1627 	close(fd);
   1628 	return 0;
   1629 }
   1630 
   1631 int fio_handle_client(struct fio_client *client)
   1632 {
   1633 	struct client_ops *ops = client->ops;
   1634 	struct fio_net_cmd *cmd;
   1635 	int size;
   1636 
   1637 	dprint(FD_NET, "client: handle %s\n", client->hostname);
   1638 
   1639 	cmd = fio_net_recv_cmd(client->fd, true);
   1640 	if (!cmd)
   1641 		return 0;
   1642 
   1643 	dprint(FD_NET, "client: got cmd op %s from %s (pdu=%u)\n",
   1644 		fio_server_op(cmd->opcode), client->hostname, cmd->pdu_len);
   1645 
   1646 	switch (cmd->opcode) {
   1647 	case FIO_NET_CMD_QUIT:
   1648 		if (ops->quit)
   1649 			ops->quit(client, cmd);
   1650 		remove_client(client);
   1651 		break;
   1652 	case FIO_NET_CMD_TEXT:
   1653 		convert_text(cmd);
   1654 		ops->text(client, cmd);
   1655 		break;
   1656 	case FIO_NET_CMD_DU: {
   1657 		struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
   1658 
   1659 		convert_dus(&du->dus);
   1660 		convert_agg(&du->agg);
   1661 
   1662 		ops->disk_util(client, cmd);
   1663 		break;
   1664 		}
   1665 	case FIO_NET_CMD_TS: {
   1666 		struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
   1667 
   1668 		dprint(FD_NET, "client: ts->ss_state = %u\n", (unsigned int) le32_to_cpu(p->ts.ss_state));
   1669 		if (le32_to_cpu(p->ts.ss_state) & __FIO_SS_DATA) {
   1670 			dprint(FD_NET, "client: received steadystate ring buffers\n");
   1671 
   1672 			size = le64_to_cpu(p->ts.ss_dur);
   1673 			p->ts.ss_iops_data = (uint64_t *) ((struct cmd_ts_pdu *)cmd->payload + 1);
   1674 			p->ts.ss_bw_data = p->ts.ss_iops_data + size;
   1675 		}
   1676 
   1677 		convert_ts(&p->ts, &p->ts);
   1678 		convert_gs(&p->rs, &p->rs);
   1679 
   1680 		ops->thread_status(client, cmd);
   1681 		break;
   1682 		}
   1683 	case FIO_NET_CMD_GS: {
   1684 		struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
   1685 
   1686 		convert_gs(gs, gs);
   1687 
   1688 		ops->group_stats(client, cmd);
   1689 		break;
   1690 		}
   1691 	case FIO_NET_CMD_ETA: {
   1692 		struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
   1693 
   1694 		if (!remove_reply_cmd(client, cmd))
   1695 			break;
   1696 		convert_jobs_eta(je);
   1697 		handle_eta(client, cmd);
   1698 		break;
   1699 		}
   1700 	case FIO_NET_CMD_PROBE:
   1701 		remove_reply_cmd(client, cmd);
   1702 		ops->probe(client, cmd);
   1703 		break;
   1704 	case FIO_NET_CMD_SERVER_START:
   1705 		client->state = Client_running;
   1706 		if (ops->job_start)
   1707 			ops->job_start(client, cmd);
   1708 		break;
   1709 	case FIO_NET_CMD_START: {
   1710 		struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
   1711 
   1712 		pdu->jobs = le32_to_cpu(pdu->jobs);
   1713 		ops->start(client, cmd);
   1714 		break;
   1715 		}
   1716 	case FIO_NET_CMD_STOP: {
   1717 		struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
   1718 
   1719 		convert_stop(cmd);
   1720 		client->state = Client_stopped;
   1721 		client->error = le32_to_cpu(pdu->error);
   1722 		client->signal = le32_to_cpu(pdu->signal);
   1723 		ops->stop(client, cmd);
   1724 		break;
   1725 		}
   1726 	case FIO_NET_CMD_ADD_JOB: {
   1727 		struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload;
   1728 
   1729 		client->thread_number = le32_to_cpu(pdu->thread_number);
   1730 		client->groupid = le32_to_cpu(pdu->groupid);
   1731 
   1732 		if (ops->add_job)
   1733 			ops->add_job(client, cmd);
   1734 		break;
   1735 		}
   1736 	case FIO_NET_CMD_IOLOG:
   1737 		fio_client_handle_iolog(client, cmd);
   1738 		break;
   1739 	case FIO_NET_CMD_UPDATE_JOB:
   1740 		ops->update_job(client, cmd);
   1741 		remove_reply_cmd(client, cmd);
   1742 		break;
   1743 	case FIO_NET_CMD_VTRIGGER: {
   1744 		struct all_io_list *pdu = (struct all_io_list *) cmd->payload;
   1745 		char buf[128];
   1746 		int off = 0;
   1747 
   1748 		if (aux_path) {
   1749 			strcpy(buf, aux_path);
   1750 			off = strlen(buf);
   1751 		}
   1752 
   1753 		__verify_save_state(pdu, server_name(client, &buf[off], sizeof(buf) - off));
   1754 		exec_trigger(trigger_cmd);
   1755 		break;
   1756 		}
   1757 	case FIO_NET_CMD_SENDFILE: {
   1758 		struct cmd_sendfile *pdu = (struct cmd_sendfile *) cmd->payload;
   1759 		fio_send_file(client, pdu, cmd->tag);
   1760 		break;
   1761 		}
   1762 	case FIO_NET_CMD_JOB_OPT: {
   1763 		handle_job_opt(client, cmd);
   1764 		break;
   1765 	}
   1766 	default:
   1767 		log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode));
   1768 		break;
   1769 	}
   1770 
   1771 	free(cmd);
   1772 	return 1;
   1773 }
   1774 
   1775 int fio_clients_send_trigger(const char *cmd)
   1776 {
   1777 	struct flist_head *entry;
   1778 	struct fio_client *client;
   1779 	size_t slen;
   1780 
   1781 	dprint(FD_NET, "client: send vtrigger: %s\n", cmd);
   1782 
   1783 	if (!cmd)
   1784 		slen = 0;
   1785 	else
   1786 		slen = strlen(cmd);
   1787 
   1788 	flist_for_each(entry, &client_list) {
   1789 		struct cmd_vtrigger_pdu *pdu;
   1790 
   1791 		client = flist_entry(entry, struct fio_client, list);
   1792 
   1793 		pdu = malloc(sizeof(*pdu) + slen);
   1794 		pdu->len = cpu_to_le16((uint16_t) slen);
   1795 		if (slen)
   1796 			memcpy(pdu->cmd, cmd, slen);
   1797 		fio_net_send_cmd(client->fd, FIO_NET_CMD_VTRIGGER, pdu,
   1798 					sizeof(*pdu) + slen, NULL, NULL);
   1799 		free(pdu);
   1800 	}
   1801 
   1802 	return 0;
   1803 }
   1804 
   1805 static void request_client_etas(struct client_ops *ops)
   1806 {
   1807 	struct fio_client *client;
   1808 	struct flist_head *entry;
   1809 	struct client_eta *eta;
   1810 	int skipped = 0;
   1811 
   1812 	dprint(FD_NET, "client: request eta (%d)\n", nr_clients);
   1813 
   1814 	eta = calloc(1, sizeof(*eta) + __THREAD_RUNSTR_SZ(REAL_MAX_JOBS));
   1815 	eta->pending = nr_clients;
   1816 
   1817 	flist_for_each(entry, &client_list) {
   1818 		client = flist_entry(entry, struct fio_client, list);
   1819 
   1820 		if (!flist_empty(&client->eta_list)) {
   1821 			skipped++;
   1822 			continue;
   1823 		}
   1824 		if (client->state != Client_running)
   1825 			continue;
   1826 
   1827 		assert(!client->eta_in_flight);
   1828 		flist_add_tail(&client->eta_list, &eta_list);
   1829 		client->eta_in_flight = eta;
   1830 		fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA,
   1831 					(uintptr_t) eta, &client->cmd_list);
   1832 	}
   1833 
   1834 	while (skipped--) {
   1835 		if (!fio_client_dec_jobs_eta(eta, ops->eta))
   1836 			break;
   1837 	}
   1838 
   1839 	dprint(FD_NET, "client: requested eta tag %p\n", eta);
   1840 }
   1841 
   1842 /*
   1843  * A single SEND_ETA timeout isn't fatal. Attempt to recover.
   1844  */
   1845 static int handle_cmd_timeout(struct fio_client *client,
   1846 			      struct fio_net_cmd_reply *reply)
   1847 {
   1848 	flist_del(&reply->list);
   1849 	free(reply);
   1850 
   1851 	if (reply->opcode != FIO_NET_CMD_SEND_ETA)
   1852 		return 1;
   1853 
   1854 	log_info("client <%s>: timeout on SEND_ETA\n", client->hostname);
   1855 
   1856 	flist_del_init(&client->eta_list);
   1857 	if (client->eta_in_flight) {
   1858 		fio_client_dec_jobs_eta(client->eta_in_flight, client->ops->eta);
   1859 		client->eta_in_flight = NULL;
   1860 	}
   1861 
   1862 	/*
   1863 	 * If we fail 5 in a row, give up...
   1864 	 */
   1865 	if (client->eta_timeouts++ > 5)
   1866 		return 1;
   1867 
   1868 	return 0;
   1869 }
   1870 
   1871 static int client_check_cmd_timeout(struct fio_client *client,
   1872 				    struct timeval *now)
   1873 {
   1874 	struct fio_net_cmd_reply *reply;
   1875 	struct flist_head *entry, *tmp;
   1876 	int ret = 0;
   1877 
   1878 	flist_for_each_safe(entry, tmp, &client->cmd_list) {
   1879 		reply = flist_entry(entry, struct fio_net_cmd_reply, list);
   1880 
   1881 		if (mtime_since(&reply->tv, now) < FIO_NET_CLIENT_TIMEOUT)
   1882 			continue;
   1883 
   1884 		if (!handle_cmd_timeout(client, reply))
   1885 			continue;
   1886 
   1887 		log_err("fio: client %s, timeout on cmd %s\n", client->hostname,
   1888 						fio_server_op(reply->opcode));
   1889 		ret = 1;
   1890 	}
   1891 
   1892 	return flist_empty(&client->cmd_list) && ret;
   1893 }
   1894 
   1895 static int fio_check_clients_timed_out(void)
   1896 {
   1897 	struct fio_client *client;
   1898 	struct flist_head *entry, *tmp;
   1899 	struct timeval tv;
   1900 	int ret = 0;
   1901 
   1902 	fio_gettime(&tv, NULL);
   1903 
   1904 	flist_for_each_safe(entry, tmp, &client_list) {
   1905 		client = flist_entry(entry, struct fio_client, list);
   1906 
   1907 		if (flist_empty(&client->cmd_list))
   1908 			continue;
   1909 
   1910 		if (!client_check_cmd_timeout(client, &tv))
   1911 			continue;
   1912 
   1913 		if (client->ops->timed_out)
   1914 			client->ops->timed_out(client);
   1915 		else
   1916 			log_err("fio: client %s timed out\n", client->hostname);
   1917 
   1918 		client->error = ETIMEDOUT;
   1919 		remove_client(client);
   1920 		ret = 1;
   1921 	}
   1922 
   1923 	return ret;
   1924 }
   1925 
   1926 int fio_handle_clients(struct client_ops *ops)
   1927 {
   1928 	struct pollfd *pfds;
   1929 	int i, ret = 0, retval = 0;
   1930 
   1931 	fio_gettime(&eta_tv, NULL);
   1932 
   1933 	pfds = malloc(nr_clients * sizeof(struct pollfd));
   1934 
   1935 	init_thread_stat(&client_ts);
   1936 	init_group_run_stat(&client_gs);
   1937 
   1938 	while (!exit_backend && nr_clients) {
   1939 		struct flist_head *entry, *tmp;
   1940 		struct fio_client *client;
   1941 
   1942 		i = 0;
   1943 		flist_for_each_safe(entry, tmp, &client_list) {
   1944 			client = flist_entry(entry, struct fio_client, list);
   1945 
   1946 			if (!client->sent_job && !client->ops->stay_connected &&
   1947 			    flist_empty(&client->cmd_list)) {
   1948 				remove_client(client);
   1949 				continue;
   1950 			}
   1951 
   1952 			pfds[i].fd = client->fd;
   1953 			pfds[i].events = POLLIN;
   1954 			i++;
   1955 		}
   1956 
   1957 		if (!nr_clients)
   1958 			break;
   1959 
   1960 		assert(i == nr_clients);
   1961 
   1962 		do {
   1963 			struct timeval tv;
   1964 			int timeout;
   1965 
   1966 			fio_gettime(&tv, NULL);
   1967 			if (mtime_since(&eta_tv, &tv) >= 900) {
   1968 				request_client_etas(ops);
   1969 				memcpy(&eta_tv, &tv, sizeof(tv));
   1970 
   1971 				if (fio_check_clients_timed_out())
   1972 					break;
   1973 			}
   1974 
   1975 			check_trigger_file();
   1976 
   1977 			timeout = min(100u, ops->eta_msec);
   1978 
   1979 			ret = poll(pfds, nr_clients, timeout);
   1980 			if (ret < 0) {
   1981 				if (errno == EINTR)
   1982 					continue;
   1983 				log_err("fio: poll clients: %s\n", strerror(errno));
   1984 				break;
   1985 			} else if (!ret)
   1986 				continue;
   1987 		} while (ret <= 0);
   1988 
   1989 		for (i = 0; i < nr_clients; i++) {
   1990 			if (!(pfds[i].revents & POLLIN))
   1991 				continue;
   1992 
   1993 			client = find_client_by_fd(pfds[i].fd);
   1994 			if (!client) {
   1995 				log_err("fio: unknown client fd %ld\n", (long) pfds[i].fd);
   1996 				continue;
   1997 			}
   1998 			if (!fio_handle_client(client)) {
   1999 				log_info("client: host=%s disconnected\n",
   2000 						client->hostname);
   2001 				remove_client(client);
   2002 				retval = 1;
   2003 			} else if (client->error)
   2004 				retval = 1;
   2005 			fio_put_client(client);
   2006 		}
   2007 	}
   2008 
   2009 	fio_client_json_fini();
   2010 
   2011 	free(pfds);
   2012 	return retval || error_clients;
   2013 }
   2014