Home | History | Annotate | Download | only in fio
      1 #include <stdio.h>
      2 #include <stdlib.h>
      3 #include <unistd.h>
      4 #include <limits.h>
      5 #include <errno.h>
      6 #include <fcntl.h>
      7 #include <sys/poll.h>
      8 #include <sys/types.h>
      9 #include <sys/stat.h>
     10 #include <sys/wait.h>
     11 #include <sys/socket.h>
     12 #include <sys/un.h>
     13 #include <netinet/in.h>
     14 #include <arpa/inet.h>
     15 #include <netdb.h>
     16 #include <signal.h>
     17 #ifdef CONFIG_ZLIB
     18 #include <zlib.h>
     19 #endif
     20 
     21 #include "fio.h"
     22 #include "client.h"
     23 #include "server.h"
     24 #include "flist.h"
     25 #include "hash.h"
     26 
     27 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd);
     28 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd);
     29 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd);
     30 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd);
     31 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd);
     32 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd);
     33 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd);
     34 
     35 struct client_ops fio_client_ops = {
     36 	.text		= handle_text,
     37 	.disk_util	= handle_du,
     38 	.thread_status	= handle_ts,
     39 	.group_stats	= handle_gs,
     40 	.stop		= handle_stop,
     41 	.start		= handle_start,
     42 	.eta		= display_thread_status,
     43 	.probe		= handle_probe,
     44 	.eta_msec	= FIO_CLIENT_DEF_ETA_MSEC,
     45 	.client_type	= FIO_CLIENT_TYPE_CLI,
     46 };
     47 
     48 static struct timeval eta_tv;
     49 
     50 static FLIST_HEAD(client_list);
     51 static FLIST_HEAD(eta_list);
     52 
     53 static FLIST_HEAD(arg_list);
     54 
     55 struct thread_stat client_ts;
     56 struct group_run_stats client_gs;
     57 int sum_stat_clients;
     58 
     59 static int sum_stat_nr;
     60 static struct json_object *root = NULL;
     61 static struct json_array *clients_array = NULL;
     62 static struct json_array *du_array = NULL;
     63 static int do_output_all_clients;
     64 
     65 #define FIO_CLIENT_HASH_BITS	7
     66 #define FIO_CLIENT_HASH_SZ	(1 << FIO_CLIENT_HASH_BITS)
     67 #define FIO_CLIENT_HASH_MASK	(FIO_CLIENT_HASH_SZ - 1)
     68 static struct flist_head client_hash[FIO_CLIENT_HASH_SZ];
     69 
     70 static void fio_client_add_hash(struct fio_client *client)
     71 {
     72 	int bucket = hash_long(client->fd, FIO_CLIENT_HASH_BITS);
     73 
     74 	bucket &= FIO_CLIENT_HASH_MASK;
     75 	flist_add(&client->hash_list, &client_hash[bucket]);
     76 }
     77 
     78 static void fio_client_remove_hash(struct fio_client *client)
     79 {
     80 	if (!flist_empty(&client->hash_list))
     81 		flist_del_init(&client->hash_list);
     82 }
     83 
     84 static void fio_init fio_client_hash_init(void)
     85 {
     86 	int i;
     87 
     88 	for (i = 0; i < FIO_CLIENT_HASH_SZ; i++)
     89 		INIT_FLIST_HEAD(&client_hash[i]);
     90 }
     91 
     92 static void fio_client_json_init(void)
     93 {
     94 	if (output_format != FIO_OUTPUT_JSON)
     95 		return;
     96 	root = json_create_object();
     97 	json_object_add_value_string(root, "fio version", fio_version_string);
     98 	clients_array = json_create_array();
     99 	json_object_add_value_array(root, "client_stats", clients_array);
    100 	du_array = json_create_array();
    101 	json_object_add_value_array(root, "disk_util", du_array);
    102 }
    103 
    104 static void fio_client_json_fini(void)
    105 {
    106 	if (output_format != FIO_OUTPUT_JSON)
    107 		return;
    108 	json_print_object(root);
    109 	log_info("\n");
    110 	json_free_object(root);
    111 	root = NULL;
    112 	clients_array = NULL;
    113 	du_array = NULL;
    114 }
    115 
    116 static struct fio_client *find_client_by_fd(int fd)
    117 {
    118 	int bucket = hash_long(fd, FIO_CLIENT_HASH_BITS) & FIO_CLIENT_HASH_MASK;
    119 	struct fio_client *client;
    120 	struct flist_head *entry;
    121 
    122 	flist_for_each(entry, &client_hash[bucket]) {
    123 		client = flist_entry(entry, struct fio_client, hash_list);
    124 
    125 		if (client->fd == fd) {
    126 			client->refs++;
    127 			return client;
    128 		}
    129 	}
    130 
    131 	return NULL;
    132 }
    133 
    134 void fio_put_client(struct fio_client *client)
    135 {
    136 	if (--client->refs)
    137 		return;
    138 
    139 	free(client->hostname);
    140 	if (client->argv)
    141 		free(client->argv);
    142 	if (client->name)
    143 		free(client->name);
    144 	while (client->nr_ini_file)
    145 		free(client->ini_file[--client->nr_ini_file]);
    146 	if (client->ini_file)
    147 		free(client->ini_file);
    148 
    149 	if (!client->did_stat)
    150 		sum_stat_clients -= client->nr_stat;
    151 
    152 	free(client);
    153 }
    154 
    155 static void remove_client(struct fio_client *client)
    156 {
    157 	assert(client->refs);
    158 
    159 	dprint(FD_NET, "client: removed <%s>\n", client->hostname);
    160 
    161 	if (!flist_empty(&client->list))
    162 		flist_del_init(&client->list);
    163 
    164 	fio_client_remove_hash(client);
    165 
    166 	if (!flist_empty(&client->eta_list)) {
    167 		flist_del_init(&client->eta_list);
    168 		fio_client_dec_jobs_eta(client->eta_in_flight, client->ops->eta);
    169 	}
    170 
    171 	close(client->fd);
    172 	client->fd = -1;
    173 
    174 	if (client->ops->removed)
    175 		client->ops->removed(client);
    176 
    177 	nr_clients--;
    178 	fio_put_client(client);
    179 }
    180 
    181 struct fio_client *fio_get_client(struct fio_client *client)
    182 {
    183 	client->refs++;
    184 	return client;
    185 }
    186 
    187 static void __fio_client_add_cmd_option(struct fio_client *client,
    188 					const char *opt)
    189 {
    190 	int index;
    191 
    192 	index = client->argc++;
    193 	client->argv = realloc(client->argv, sizeof(char *) * client->argc);
    194 	client->argv[index] = strdup(opt);
    195 	dprint(FD_NET, "client: add cmd %d: %s\n", index, opt);
    196 }
    197 
    198 void fio_client_add_cmd_option(void *cookie, const char *opt)
    199 {
    200 	struct fio_client *client = cookie;
    201 	struct flist_head *entry;
    202 
    203 	if (!client || !opt)
    204 		return;
    205 
    206 	__fio_client_add_cmd_option(client, opt);
    207 
    208 	/*
    209 	 * Duplicate arguments to shared client group
    210 	 */
    211 	flist_for_each(entry, &arg_list) {
    212 		client = flist_entry(entry, struct fio_client, arg_list);
    213 
    214 		__fio_client_add_cmd_option(client, opt);
    215 	}
    216 }
    217 
    218 struct fio_client *fio_client_add_explicit(struct client_ops *ops,
    219 					   const char *hostname, int type,
    220 					   int port)
    221 {
    222 	struct fio_client *client;
    223 
    224 	client = malloc(sizeof(*client));
    225 	memset(client, 0, sizeof(*client));
    226 
    227 	INIT_FLIST_HEAD(&client->list);
    228 	INIT_FLIST_HEAD(&client->hash_list);
    229 	INIT_FLIST_HEAD(&client->arg_list);
    230 	INIT_FLIST_HEAD(&client->eta_list);
    231 	INIT_FLIST_HEAD(&client->cmd_list);
    232 
    233 	client->hostname = strdup(hostname);
    234 
    235 	if (type == Fio_client_socket)
    236 		client->is_sock = 1;
    237 	else {
    238 		int ipv6;
    239 
    240 		ipv6 = type == Fio_client_ipv6;
    241 		if (fio_server_parse_host(hostname, ipv6,
    242 						&client->addr.sin_addr,
    243 						&client->addr6.sin6_addr))
    244 			goto err;
    245 
    246 		client->port = port;
    247 	}
    248 
    249 	client->fd = -1;
    250 	client->ops = ops;
    251 	client->refs = 1;
    252 	client->type = ops->client_type;
    253 
    254 	__fio_client_add_cmd_option(client, "fio");
    255 
    256 	flist_add(&client->list, &client_list);
    257 	nr_clients++;
    258 	dprint(FD_NET, "client: added <%s>\n", client->hostname);
    259 	return client;
    260 err:
    261 	free(client);
    262 	return NULL;
    263 }
    264 
    265 void fio_client_add_ini_file(void *cookie, const char *ini_file)
    266 {
    267 	struct fio_client *client = cookie;
    268 	size_t new_size;
    269 
    270 	dprint(FD_NET, "client <%s>: add ini %s\n", client->hostname, ini_file);
    271 
    272 	new_size = (client->nr_ini_file + 1) * sizeof(char *);
    273 	client->ini_file = realloc(client->ini_file, new_size);
    274 	client->ini_file[client->nr_ini_file] = strdup(ini_file);
    275 	client->nr_ini_file++;
    276 }
    277 
    278 int fio_client_add(struct client_ops *ops, const char *hostname, void **cookie)
    279 {
    280 	struct fio_client *existing = *cookie;
    281 	struct fio_client *client;
    282 
    283 	if (existing) {
    284 		/*
    285 		 * We always add our "exec" name as the option, hence 1
    286 		 * means empty.
    287 		 */
    288 		if (existing->argc == 1)
    289 			flist_add_tail(&existing->arg_list, &arg_list);
    290 		else {
    291 			while (!flist_empty(&arg_list))
    292 				flist_del_init(arg_list.next);
    293 		}
    294 	}
    295 
    296 	client = malloc(sizeof(*client));
    297 	memset(client, 0, sizeof(*client));
    298 
    299 	INIT_FLIST_HEAD(&client->list);
    300 	INIT_FLIST_HEAD(&client->hash_list);
    301 	INIT_FLIST_HEAD(&client->arg_list);
    302 	INIT_FLIST_HEAD(&client->eta_list);
    303 	INIT_FLIST_HEAD(&client->cmd_list);
    304 
    305 	if (fio_server_parse_string(hostname, &client->hostname,
    306 					&client->is_sock, &client->port,
    307 					&client->addr.sin_addr,
    308 					&client->addr6.sin6_addr,
    309 					&client->ipv6))
    310 		return -1;
    311 
    312 	client->fd = -1;
    313 	client->ops = ops;
    314 	client->refs = 1;
    315 	client->type = ops->client_type;
    316 
    317 	__fio_client_add_cmd_option(client, "fio");
    318 
    319 	flist_add(&client->list, &client_list);
    320 	nr_clients++;
    321 	dprint(FD_NET, "client: added <%s>\n", client->hostname);
    322 	*cookie = client;
    323 	return 0;
    324 }
    325 
    326 static void probe_client(struct fio_client *client)
    327 {
    328 	struct cmd_client_probe_pdu pdu;
    329 	uint64_t tag;
    330 
    331 	dprint(FD_NET, "client: send probe\n");
    332 
    333 #ifdef CONFIG_ZLIB
    334 	pdu.flags = __le64_to_cpu(FIO_PROBE_FLAG_ZLIB);
    335 #else
    336 	pdu.flags = 0;
    337 #endif
    338 
    339 	fio_net_send_cmd(client->fd, FIO_NET_CMD_PROBE, &pdu, sizeof(pdu), &tag, &client->cmd_list);
    340 }
    341 
    342 static int fio_client_connect_ip(struct fio_client *client)
    343 {
    344 	struct sockaddr *addr;
    345 	socklen_t socklen;
    346 	int fd, domain;
    347 
    348 	if (client->ipv6) {
    349 		client->addr6.sin6_family = AF_INET6;
    350 		client->addr6.sin6_port = htons(client->port);
    351 		domain = AF_INET6;
    352 		addr = (struct sockaddr *) &client->addr6;
    353 		socklen = sizeof(client->addr6);
    354 	} else {
    355 		client->addr.sin_family = AF_INET;
    356 		client->addr.sin_port = htons(client->port);
    357 		domain = AF_INET;
    358 		addr = (struct sockaddr *) &client->addr;
    359 		socklen = sizeof(client->addr);
    360 	}
    361 
    362 	fd = socket(domain, SOCK_STREAM, 0);
    363 	if (fd < 0) {
    364 		int ret = -errno;
    365 
    366 		log_err("fio: socket: %s\n", strerror(errno));
    367 		return ret;
    368 	}
    369 
    370 	if (connect(fd, addr, socklen) < 0) {
    371 		int ret = -errno;
    372 
    373 		log_err("fio: connect: %s\n", strerror(errno));
    374 		log_err("fio: failed to connect to %s:%u\n", client->hostname,
    375 								client->port);
    376 		close(fd);
    377 		return ret;
    378 	}
    379 
    380 	return fd;
    381 }
    382 
    383 static int fio_client_connect_sock(struct fio_client *client)
    384 {
    385 	struct sockaddr_un *addr = &client->addr_un;
    386 	socklen_t len;
    387 	int fd;
    388 
    389 	memset(addr, 0, sizeof(*addr));
    390 	addr->sun_family = AF_UNIX;
    391 	strncpy(addr->sun_path, client->hostname, sizeof(addr->sun_path) - 1);
    392 
    393 	fd = socket(AF_UNIX, SOCK_STREAM, 0);
    394 	if (fd < 0) {
    395 		int ret = -errno;
    396 
    397 		log_err("fio: socket: %s\n", strerror(errno));
    398 		return ret;
    399 	}
    400 
    401 	len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
    402 	if (connect(fd, (struct sockaddr *) addr, len) < 0) {
    403 		int ret = -errno;
    404 
    405 		log_err("fio: connect; %s\n", strerror(errno));
    406 		close(fd);
    407 		return ret;
    408 	}
    409 
    410 	return fd;
    411 }
    412 
    413 int fio_client_connect(struct fio_client *client)
    414 {
    415 	int fd;
    416 
    417 	dprint(FD_NET, "client: connect to host %s\n", client->hostname);
    418 
    419 	if (client->is_sock)
    420 		fd = fio_client_connect_sock(client);
    421 	else
    422 		fd = fio_client_connect_ip(client);
    423 
    424 	dprint(FD_NET, "client: %s connected %d\n", client->hostname, fd);
    425 
    426 	if (fd < 0)
    427 		return fd;
    428 
    429 	client->fd = fd;
    430 	fio_client_add_hash(client);
    431 	client->state = Client_connected;
    432 
    433 	probe_client(client);
    434 	return 0;
    435 }
    436 
    437 int fio_client_terminate(struct fio_client *client)
    438 {
    439 	return fio_net_send_quit(client->fd);
    440 }
    441 
    442 void fio_clients_terminate(void)
    443 {
    444 	struct flist_head *entry;
    445 	struct fio_client *client;
    446 
    447 	dprint(FD_NET, "client: terminate clients\n");
    448 
    449 	flist_for_each(entry, &client_list) {
    450 		client = flist_entry(entry, struct fio_client, list);
    451 		fio_client_terminate(client);
    452 	}
    453 }
    454 
    455 static void sig_int(int sig)
    456 {
    457 	dprint(FD_NET, "client: got signal %d\n", sig);
    458 	fio_clients_terminate();
    459 }
    460 
    461 static void sig_show_status(int sig)
    462 {
    463 	show_running_run_stats();
    464 }
    465 
    466 static void client_signal_handler(void)
    467 {
    468 	struct sigaction act;
    469 
    470 	memset(&act, 0, sizeof(act));
    471 	act.sa_handler = sig_int;
    472 	act.sa_flags = SA_RESTART;
    473 	sigaction(SIGINT, &act, NULL);
    474 
    475 	memset(&act, 0, sizeof(act));
    476 	act.sa_handler = sig_int;
    477 	act.sa_flags = SA_RESTART;
    478 	sigaction(SIGTERM, &act, NULL);
    479 
    480 /* Windows uses SIGBREAK as a quit signal from other applications */
    481 #ifdef WIN32
    482 	memset(&act, 0, sizeof(act));
    483 	act.sa_handler = sig_int;
    484 	act.sa_flags = SA_RESTART;
    485 	sigaction(SIGBREAK, &act, NULL);
    486 #endif
    487 
    488 	memset(&act, 0, sizeof(act));
    489 	act.sa_handler = sig_show_status;
    490 	act.sa_flags = SA_RESTART;
    491 	sigaction(SIGUSR1, &act, NULL);
    492 }
    493 
    494 static int send_client_cmd_line(struct fio_client *client)
    495 {
    496 	struct cmd_single_line_pdu *cslp;
    497 	struct cmd_line_pdu *clp;
    498 	unsigned long offset;
    499 	unsigned int *lens;
    500 	void *pdu;
    501 	size_t mem;
    502 	int i, ret;
    503 
    504 	dprint(FD_NET, "client: send cmdline %d\n", client->argc);
    505 
    506 	lens = malloc(client->argc * sizeof(unsigned int));
    507 
    508 	/*
    509 	 * Find out how much mem we need
    510 	 */
    511 	for (i = 0, mem = 0; i < client->argc; i++) {
    512 		lens[i] = strlen(client->argv[i]) + 1;
    513 		mem += lens[i];
    514 	}
    515 
    516 	/*
    517 	 * We need one cmd_line_pdu, and argc number of cmd_single_line_pdu
    518 	 */
    519 	mem += sizeof(*clp) + (client->argc * sizeof(*cslp));
    520 
    521 	pdu = malloc(mem);
    522 	clp = pdu;
    523 	offset = sizeof(*clp);
    524 
    525 	for (i = 0; i < client->argc; i++) {
    526 		uint16_t arg_len = lens[i];
    527 
    528 		cslp = pdu + offset;
    529 		strcpy((char *) cslp->text, client->argv[i]);
    530 		cslp->len = cpu_to_le16(arg_len);
    531 		offset += sizeof(*cslp) + arg_len;
    532 	}
    533 
    534 	free(lens);
    535 	clp->lines = cpu_to_le16(client->argc);
    536 	clp->client_type = __cpu_to_le16(client->type);
    537 	ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, NULL, NULL);
    538 	free(pdu);
    539 	return ret;
    540 }
    541 
    542 int fio_clients_connect(void)
    543 {
    544 	struct fio_client *client;
    545 	struct flist_head *entry, *tmp;
    546 	int ret;
    547 
    548 #ifdef WIN32
    549 	WSADATA wsd;
    550 	WSAStartup(MAKEWORD(2, 2), &wsd);
    551 #endif
    552 
    553 	dprint(FD_NET, "client: connect all\n");
    554 
    555 	client_signal_handler();
    556 
    557 	flist_for_each_safe(entry, tmp, &client_list) {
    558 		client = flist_entry(entry, struct fio_client, list);
    559 
    560 		ret = fio_client_connect(client);
    561 		if (ret) {
    562 			remove_client(client);
    563 			continue;
    564 		}
    565 
    566 		if (client->argc > 1)
    567 			send_client_cmd_line(client);
    568 	}
    569 
    570 	return !nr_clients;
    571 }
    572 
    573 int fio_start_client(struct fio_client *client)
    574 {
    575 	dprint(FD_NET, "client: start %s\n", client->hostname);
    576 	return fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_RUN, 0, NULL);
    577 }
    578 
    579 int fio_start_all_clients(void)
    580 {
    581 	struct fio_client *client;
    582 	struct flist_head *entry, *tmp;
    583 	int ret;
    584 
    585 	dprint(FD_NET, "client: start all\n");
    586 
    587 	fio_client_json_init();
    588 
    589 	flist_for_each_safe(entry, tmp, &client_list) {
    590 		client = flist_entry(entry, struct fio_client, list);
    591 
    592 		ret = fio_start_client(client);
    593 		if (ret) {
    594 			remove_client(client);
    595 			continue;
    596 		}
    597 	}
    598 
    599 	return flist_empty(&client_list);
    600 }
    601 
    602 /*
    603  * Send file contents to server backend. We could use sendfile(), but to remain
    604  * more portable lets just read/write the darn thing.
    605  */
    606 static int __fio_client_send_ini(struct fio_client *client, const char *filename)
    607 {
    608 	struct cmd_job_pdu *pdu;
    609 	size_t p_size;
    610 	struct stat sb;
    611 	char *p;
    612 	void *buf;
    613 	off_t len;
    614 	int fd, ret;
    615 
    616 	dprint(FD_NET, "send ini %s to %s\n", filename, client->hostname);
    617 
    618 	fd = open(filename, O_RDONLY);
    619 	if (fd < 0) {
    620 		int ret = -errno;
    621 
    622 		log_err("fio: job file <%s> open: %s\n", filename, strerror(errno));
    623 		return ret;
    624 	}
    625 
    626 	if (fstat(fd, &sb) < 0) {
    627 		int ret = -errno;
    628 
    629 		log_err("fio: job file stat: %s\n", strerror(errno));
    630 		close(fd);
    631 		return ret;
    632 	}
    633 
    634 	p_size = sb.st_size + sizeof(*pdu);
    635 	pdu = malloc(p_size);
    636 	buf = pdu->buf;
    637 
    638 	len = sb.st_size;
    639 	p = buf;
    640 	do {
    641 		ret = read(fd, p, len);
    642 		if (ret > 0) {
    643 			len -= ret;
    644 			if (!len)
    645 				break;
    646 			p += ret;
    647 			continue;
    648 		} else if (!ret)
    649 			break;
    650 		else if (errno == EAGAIN || errno == EINTR)
    651 			continue;
    652 	} while (1);
    653 
    654 	if (len) {
    655 		log_err("fio: failed reading job file %s\n", filename);
    656 		close(fd);
    657 		free(pdu);
    658 		return 1;
    659 	}
    660 
    661 	pdu->buf_len = __cpu_to_le32(sb.st_size);
    662 	pdu->client_type = cpu_to_le32(client->type);
    663 
    664 	client->sent_job = 1;
    665 	ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, pdu, p_size, NULL, NULL);
    666 	free(pdu);
    667 	close(fd);
    668 	return ret;
    669 }
    670 
    671 int fio_client_send_ini(struct fio_client *client, const char *filename)
    672 {
    673 	int ret;
    674 
    675 	ret = __fio_client_send_ini(client, filename);
    676 	if (!ret)
    677 		client->sent_job = 1;
    678 
    679 	return ret;
    680 }
    681 
    682 int fio_clients_send_ini(const char *filename)
    683 {
    684 	struct fio_client *client;
    685 	struct flist_head *entry, *tmp;
    686 
    687 	flist_for_each_safe(entry, tmp, &client_list) {
    688 		client = flist_entry(entry, struct fio_client, list);
    689 
    690 		if (client->nr_ini_file) {
    691 			int i;
    692 
    693 			for (i = 0; i < client->nr_ini_file; i++) {
    694 				const char *ini = client->ini_file[i];
    695 
    696 				if (fio_client_send_ini(client, ini)) {
    697 					remove_client(client);
    698 					break;
    699 				}
    700 			}
    701 		} else if (!filename || fio_client_send_ini(client, filename))
    702 			remove_client(client);
    703 	}
    704 
    705 	return !nr_clients;
    706 }
    707 
    708 int fio_client_update_options(struct fio_client *client,
    709 			      struct thread_options *o, uint64_t *tag)
    710 {
    711 	struct cmd_add_job_pdu pdu;
    712 
    713 	pdu.thread_number = cpu_to_le32(client->thread_number);
    714 	pdu.groupid = cpu_to_le32(client->groupid);
    715 	convert_thread_options_to_net(&pdu.top, o);
    716 
    717 	return fio_net_send_cmd(client->fd, FIO_NET_CMD_UPDATE_JOB, &pdu, sizeof(pdu), tag, &client->cmd_list);
    718 }
    719 
    720 static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
    721 {
    722 	dst->max_val	= le64_to_cpu(src->max_val);
    723 	dst->min_val	= le64_to_cpu(src->min_val);
    724 	dst->samples	= le64_to_cpu(src->samples);
    725 
    726 	/*
    727 	 * Floats arrive as IEEE 754 encoded uint64_t, convert back to double
    728 	 */
    729 	dst->mean.u.f	= fio_uint64_to_double(le64_to_cpu(dst->mean.u.i));
    730 	dst->S.u.f	= fio_uint64_to_double(le64_to_cpu(dst->S.u.i));
    731 }
    732 
    733 static void convert_ts(struct thread_stat *dst, struct thread_stat *src)
    734 {
    735 	int i, j;
    736 
    737 	dst->error		= le32_to_cpu(src->error);
    738 	dst->thread_number	= le32_to_cpu(src->thread_number);
    739 	dst->groupid		= le32_to_cpu(src->groupid);
    740 	dst->pid		= le32_to_cpu(src->pid);
    741 	dst->members		= le32_to_cpu(src->members);
    742 	dst->unified_rw_rep	= le32_to_cpu(src->unified_rw_rep);
    743 
    744 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
    745 		convert_io_stat(&dst->clat_stat[i], &src->clat_stat[i]);
    746 		convert_io_stat(&dst->slat_stat[i], &src->slat_stat[i]);
    747 		convert_io_stat(&dst->lat_stat[i], &src->lat_stat[i]);
    748 		convert_io_stat(&dst->bw_stat[i], &src->bw_stat[i]);
    749 	}
    750 
    751 	dst->usr_time		= le64_to_cpu(src->usr_time);
    752 	dst->sys_time		= le64_to_cpu(src->sys_time);
    753 	dst->ctx		= le64_to_cpu(src->ctx);
    754 	dst->minf		= le64_to_cpu(src->minf);
    755 	dst->majf		= le64_to_cpu(src->majf);
    756 	dst->clat_percentiles	= le64_to_cpu(src->clat_percentiles);
    757 
    758 	for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
    759 		fio_fp64_t *fps = &src->percentile_list[i];
    760 		fio_fp64_t *fpd = &dst->percentile_list[i];
    761 
    762 		fpd->u.f = fio_uint64_to_double(le64_to_cpu(fps->u.i));
    763 	}
    764 
    765 	for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
    766 		dst->io_u_map[i]	= le32_to_cpu(src->io_u_map[i]);
    767 		dst->io_u_submit[i]	= le32_to_cpu(src->io_u_submit[i]);
    768 		dst->io_u_complete[i]	= le32_to_cpu(src->io_u_complete[i]);
    769 	}
    770 
    771 	for (i = 0; i < FIO_IO_U_LAT_U_NR; i++) {
    772 		dst->io_u_lat_u[i]	= le32_to_cpu(src->io_u_lat_u[i]);
    773 		dst->io_u_lat_m[i]	= le32_to_cpu(src->io_u_lat_m[i]);
    774 	}
    775 
    776 	for (i = 0; i < DDIR_RWDIR_CNT; i++)
    777 		for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
    778 			dst->io_u_plat[i][j] = le32_to_cpu(src->io_u_plat[i][j]);
    779 
    780 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
    781 		dst->total_io_u[i]	= le64_to_cpu(src->total_io_u[i]);
    782 		dst->short_io_u[i]	= le64_to_cpu(src->short_io_u[i]);
    783 	}
    784 
    785 	dst->total_submit	= le64_to_cpu(src->total_submit);
    786 	dst->total_complete	= le64_to_cpu(src->total_complete);
    787 
    788 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
    789 		dst->io_bytes[i]	= le64_to_cpu(src->io_bytes[i]);
    790 		dst->runtime[i]		= le64_to_cpu(src->runtime[i]);
    791 	}
    792 
    793 	dst->total_run_time	= le64_to_cpu(src->total_run_time);
    794 	dst->continue_on_error	= le16_to_cpu(src->continue_on_error);
    795 	dst->total_err_count	= le64_to_cpu(src->total_err_count);
    796 	dst->first_error	= le32_to_cpu(src->first_error);
    797 	dst->kb_base		= le32_to_cpu(src->kb_base);
    798 	dst->unit_base		= le32_to_cpu(src->unit_base);
    799 
    800 	dst->latency_depth	= le32_to_cpu(src->latency_depth);
    801 	dst->latency_target	= le64_to_cpu(src->latency_target);
    802 	dst->latency_window	= le64_to_cpu(src->latency_window);
    803 	dst->latency_percentile.u.f = fio_uint64_to_double(le64_to_cpu(src->latency_percentile.u.i));
    804 }
    805 
    806 static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
    807 {
    808 	int i;
    809 
    810 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
    811 		dst->max_run[i]		= le64_to_cpu(src->max_run[i]);
    812 		dst->min_run[i]		= le64_to_cpu(src->min_run[i]);
    813 		dst->max_bw[i]		= le64_to_cpu(src->max_bw[i]);
    814 		dst->min_bw[i]		= le64_to_cpu(src->min_bw[i]);
    815 		dst->io_kb[i]		= le64_to_cpu(src->io_kb[i]);
    816 		dst->agg[i]		= le64_to_cpu(src->agg[i]);
    817 	}
    818 
    819 	dst->kb_base	= le32_to_cpu(src->kb_base);
    820 	dst->unit_base	= le32_to_cpu(src->unit_base);
    821 	dst->groupid	= le32_to_cpu(src->groupid);
    822 	dst->unified_rw_rep	= le32_to_cpu(src->unified_rw_rep);
    823 }
    824 
    825 static void json_object_add_client_info(struct json_object *obj,
    826 struct fio_client *client)
    827 {
    828 	json_object_add_value_string(obj, "hostname", client->hostname);
    829 	json_object_add_value_int(obj, "port", client->port);
    830 }
    831 
    832 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd)
    833 {
    834 	struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
    835 	struct json_object *tsobj;
    836 
    837 	tsobj = show_thread_status(&p->ts, &p->rs);
    838 	client->did_stat = 1;
    839 	if (tsobj) {
    840 		json_object_add_client_info(tsobj, client);
    841 		json_array_add_value_object(clients_array, tsobj);
    842 	}
    843 
    844 	if (!do_output_all_clients)
    845 		return;
    846 
    847 	sum_thread_stats(&client_ts, &p->ts, sum_stat_nr);
    848 	sum_group_stats(&client_gs, &p->rs);
    849 
    850 	client_ts.members++;
    851 	client_ts.thread_number = p->ts.thread_number;
    852 	client_ts.groupid = p->ts.groupid;
    853 	client_ts.unified_rw_rep = p->ts.unified_rw_rep;
    854 
    855 	if (++sum_stat_nr == sum_stat_clients) {
    856 		strcpy(client_ts.name, "All clients");
    857 		tsobj = show_thread_status(&client_ts, &client_gs);
    858 		if (tsobj) {
    859 			json_object_add_client_info(tsobj, client);
    860 			json_array_add_value_object(clients_array, tsobj);
    861 		}
    862 	}
    863 }
    864 
    865 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd)
    866 {
    867 	struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
    868 
    869 	show_group_stats(gs);
    870 }
    871 
    872 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd)
    873 {
    874 	struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload;
    875 	const char *buf = (const char *) pdu->buf;
    876 	const char *name;
    877 	int fio_unused ret;
    878 
    879 	name = client->name ? client->name : client->hostname;
    880 
    881 	if (!client->skip_newline)
    882 		fprintf(f_out, "<%s> ", name);
    883 	ret = fwrite(buf, pdu->buf_len, 1, f_out);
    884 	fflush(f_out);
    885 	client->skip_newline = strchr(buf, '\n') == NULL;
    886 }
    887 
    888 static void convert_agg(struct disk_util_agg *agg)
    889 {
    890 	int i;
    891 
    892 	for (i = 0; i < 2; i++) {
    893 		agg->ios[i]	= le32_to_cpu(agg->ios[i]);
    894 		agg->merges[i]	= le32_to_cpu(agg->merges[i]);
    895 		agg->sectors[i]	= le64_to_cpu(agg->sectors[i]);
    896 		agg->ticks[i]	= le32_to_cpu(agg->ticks[i]);
    897 	}
    898 
    899 	agg->io_ticks		= le32_to_cpu(agg->io_ticks);
    900 	agg->time_in_queue	= le32_to_cpu(agg->time_in_queue);
    901 	agg->slavecount		= le32_to_cpu(agg->slavecount);
    902 	agg->max_util.u.f	= fio_uint64_to_double(__le64_to_cpu(agg->max_util.u.i));
    903 }
    904 
    905 static void convert_dus(struct disk_util_stat *dus)
    906 {
    907 	int i;
    908 
    909 	for (i = 0; i < 2; i++) {
    910 		dus->s.ios[i]		= le32_to_cpu(dus->s.ios[i]);
    911 		dus->s.merges[i]	= le32_to_cpu(dus->s.merges[i]);
    912 		dus->s.sectors[i]	= le64_to_cpu(dus->s.sectors[i]);
    913 		dus->s.ticks[i]		= le32_to_cpu(dus->s.ticks[i]);
    914 	}
    915 
    916 	dus->s.io_ticks		= le32_to_cpu(dus->s.io_ticks);
    917 	dus->s.time_in_queue	= le32_to_cpu(dus->s.time_in_queue);
    918 	dus->s.msec		= le64_to_cpu(dus->s.msec);
    919 }
    920 
    921 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd)
    922 {
    923 	struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
    924 
    925 	if (!client->disk_stats_shown) {
    926 		client->disk_stats_shown = 1;
    927 		log_info("\nDisk stats (read/write):\n");
    928 	}
    929 
    930 	if (output_format == FIO_OUTPUT_JSON) {
    931 		struct json_object *duobj;
    932 		json_array_add_disk_util(&du->dus, &du->agg, du_array);
    933 		duobj = json_array_last_value_object(du_array);
    934 		json_object_add_client_info(duobj, client);
    935 	} else
    936 		print_disk_util(&du->dus, &du->agg, output_format == FIO_OUTPUT_TERSE);
    937 }
    938 
    939 static void convert_jobs_eta(struct jobs_eta *je)
    940 {
    941 	int i;
    942 
    943 	je->nr_running		= le32_to_cpu(je->nr_running);
    944 	je->nr_ramp		= le32_to_cpu(je->nr_ramp);
    945 	je->nr_pending		= le32_to_cpu(je->nr_pending);
    946 	je->nr_setting_up	= le32_to_cpu(je->nr_setting_up);
    947 	je->files_open		= le32_to_cpu(je->files_open);
    948 
    949 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
    950 		je->m_rate[i]	= le32_to_cpu(je->m_rate[i]);
    951 		je->t_rate[i]	= le32_to_cpu(je->t_rate[i]);
    952 		je->m_iops[i]	= le32_to_cpu(je->m_iops[i]);
    953 		je->t_iops[i]	= le32_to_cpu(je->t_iops[i]);
    954 		je->rate[i]	= le32_to_cpu(je->rate[i]);
    955 		je->iops[i]	= le32_to_cpu(je->iops[i]);
    956 	}
    957 
    958 	je->elapsed_sec		= le64_to_cpu(je->elapsed_sec);
    959 	je->eta_sec		= le64_to_cpu(je->eta_sec);
    960 	je->nr_threads		= le32_to_cpu(je->nr_threads);
    961 	je->is_pow2		= le32_to_cpu(je->is_pow2);
    962 	je->unit_base		= le32_to_cpu(je->unit_base);
    963 }
    964 
    965 void fio_client_sum_jobs_eta(struct jobs_eta *dst, struct jobs_eta *je)
    966 {
    967 	int i;
    968 
    969 	dst->nr_running		+= je->nr_running;
    970 	dst->nr_ramp		+= je->nr_ramp;
    971 	dst->nr_pending		+= je->nr_pending;
    972 	dst->nr_setting_up	+= je->nr_setting_up;
    973 	dst->files_open		+= je->files_open;
    974 
    975 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
    976 		dst->m_rate[i]	+= je->m_rate[i];
    977 		dst->t_rate[i]	+= je->t_rate[i];
    978 		dst->m_iops[i]	+= je->m_iops[i];
    979 		dst->t_iops[i]	+= je->t_iops[i];
    980 		dst->rate[i]	+= je->rate[i];
    981 		dst->iops[i]	+= je->iops[i];
    982 	}
    983 
    984 	dst->elapsed_sec	+= je->elapsed_sec;
    985 
    986 	if (je->eta_sec > dst->eta_sec)
    987 		dst->eta_sec = je->eta_sec;
    988 
    989 	dst->nr_threads		+= je->nr_threads;
    990 	/* we need to handle je->run_str too ... */
    991 }
    992 
    993 void fio_client_dec_jobs_eta(struct client_eta *eta, client_eta_op eta_fn)
    994 {
    995 	if (!--eta->pending) {
    996 		eta_fn(&eta->eta);
    997 		free(eta);
    998 	}
    999 }
   1000 
   1001 static void remove_reply_cmd(struct fio_client *client, struct fio_net_cmd *cmd)
   1002 {
   1003 	struct fio_net_cmd_reply *reply = NULL;
   1004 	struct flist_head *entry;
   1005 
   1006 	flist_for_each(entry, &client->cmd_list) {
   1007 		reply = flist_entry(entry, struct fio_net_cmd_reply, list);
   1008 
   1009 		if (cmd->tag == (uintptr_t) reply)
   1010 			break;
   1011 
   1012 		reply = NULL;
   1013 	}
   1014 
   1015 	if (!reply) {
   1016 		log_err("fio: client: unable to find matching tag (%llx)\n", (unsigned long long) cmd->tag);
   1017 		return;
   1018 	}
   1019 
   1020 	flist_del(&reply->list);
   1021 	cmd->tag = reply->saved_tag;
   1022 	free(reply);
   1023 }
   1024 
   1025 int fio_client_wait_for_reply(struct fio_client *client, uint64_t tag)
   1026 {
   1027 	do {
   1028 		struct fio_net_cmd_reply *reply = NULL;
   1029 		struct flist_head *entry;
   1030 
   1031 		flist_for_each(entry, &client->cmd_list) {
   1032 			reply = flist_entry(entry, struct fio_net_cmd_reply, list);
   1033 
   1034 			if (tag == (uintptr_t) reply)
   1035 				break;
   1036 
   1037 			reply = NULL;
   1038 		}
   1039 
   1040 		if (!reply)
   1041 			break;
   1042 
   1043 		usleep(1000);
   1044 	} while (1);
   1045 
   1046 	return 0;
   1047 }
   1048 
   1049 static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd)
   1050 {
   1051 	struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
   1052 	struct client_eta *eta = (struct client_eta *) (uintptr_t) cmd->tag;
   1053 
   1054 	dprint(FD_NET, "client: got eta tag %p, %d\n", eta, eta->pending);
   1055 
   1056 	assert(client->eta_in_flight == eta);
   1057 
   1058 	client->eta_in_flight = NULL;
   1059 	flist_del_init(&client->eta_list);
   1060 
   1061 	if (client->ops->jobs_eta)
   1062 		client->ops->jobs_eta(client, je);
   1063 
   1064 	fio_client_sum_jobs_eta(&eta->eta, je);
   1065 	fio_client_dec_jobs_eta(eta, client->ops->eta);
   1066 }
   1067 
   1068 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
   1069 {
   1070 	struct cmd_probe_reply_pdu *probe = (struct cmd_probe_reply_pdu *) cmd->payload;
   1071 	const char *os, *arch;
   1072 	char bit[16];
   1073 
   1074 	os = fio_get_os_string(probe->os);
   1075 	if (!os)
   1076 		os = "unknown";
   1077 
   1078 	arch = fio_get_arch_string(probe->arch);
   1079 	if (!arch)
   1080 		os = "unknown";
   1081 
   1082 	sprintf(bit, "%d-bit", probe->bpp * 8);
   1083 	probe->flags = le64_to_cpu(probe->flags);
   1084 
   1085 	log_info("hostname=%s, be=%u, %s, os=%s, arch=%s, fio=%s, flags=%lx\n",
   1086 		probe->hostname, probe->bigendian, bit, os, arch,
   1087 		probe->fio_version, (unsigned long) probe->flags);
   1088 
   1089 	if (!client->name)
   1090 		client->name = strdup((char *) probe->hostname);
   1091 }
   1092 
   1093 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd)
   1094 {
   1095 	struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
   1096 
   1097 	client->state = Client_started;
   1098 	client->jobs = le32_to_cpu(pdu->jobs);
   1099 	client->nr_stat = le32_to_cpu(pdu->stat_outputs);
   1100 
   1101 	if (sum_stat_clients > 1)
   1102 		do_output_all_clients = 1;
   1103 
   1104 	sum_stat_clients += client->nr_stat;
   1105 }
   1106 
   1107 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd)
   1108 {
   1109 	if (client->error)
   1110 		log_info("client <%s>: exited with error %d\n", client->hostname, client->error);
   1111 }
   1112 
   1113 static void convert_stop(struct fio_net_cmd *cmd)
   1114 {
   1115 	struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
   1116 
   1117 	pdu->error = le32_to_cpu(pdu->error);
   1118 }
   1119 
   1120 static void convert_text(struct fio_net_cmd *cmd)
   1121 {
   1122 	struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload;
   1123 
   1124 	pdu->level	= le32_to_cpu(pdu->level);
   1125 	pdu->buf_len	= le32_to_cpu(pdu->buf_len);
   1126 	pdu->log_sec	= le64_to_cpu(pdu->log_sec);
   1127 	pdu->log_usec	= le64_to_cpu(pdu->log_usec);
   1128 }
   1129 
   1130 static struct cmd_iolog_pdu *convert_iolog_gz(struct fio_net_cmd *cmd,
   1131 					      struct cmd_iolog_pdu *pdu)
   1132 {
   1133 #ifdef CONFIG_ZLIB
   1134 	struct cmd_iolog_pdu *ret;
   1135 	z_stream stream;
   1136 	uint32_t nr_samples;
   1137 	size_t total;
   1138 	void *p;
   1139 
   1140 	stream.zalloc = Z_NULL;
   1141 	stream.zfree = Z_NULL;
   1142 	stream.opaque = Z_NULL;
   1143 	stream.avail_in = 0;
   1144 	stream.next_in = Z_NULL;
   1145 
   1146 	if (inflateInit(&stream) != Z_OK)
   1147 		return NULL;
   1148 
   1149 	/*
   1150 	 * Get header first, it's not compressed
   1151 	 */
   1152 	nr_samples = le32_to_cpu(pdu->nr_samples);
   1153 
   1154 	total = nr_samples * sizeof(struct io_sample);
   1155 	ret = malloc(total + sizeof(*pdu));
   1156 	ret->nr_samples = nr_samples;
   1157 
   1158 	memcpy(ret, pdu, sizeof(*pdu));
   1159 
   1160 	p = (void *) ret + sizeof(*pdu);
   1161 
   1162 	stream.avail_in = cmd->pdu_len - sizeof(*pdu);
   1163 	stream.next_in = (void *) pdu + sizeof(*pdu);
   1164 	while (stream.avail_in) {
   1165 		unsigned int this_chunk = 65536;
   1166 		unsigned int this_len;
   1167 		int err;
   1168 
   1169 		if (this_chunk > total)
   1170 			this_chunk = total;
   1171 
   1172 		stream.avail_out = this_chunk;
   1173 		stream.next_out = p;
   1174 		err = inflate(&stream, Z_NO_FLUSH);
   1175 		/* may be Z_OK, or Z_STREAM_END */
   1176 		if (err < 0) {
   1177 			log_err("fio: inflate error %d\n", err);
   1178 			free(ret);
   1179 			ret = NULL;
   1180 			goto err;
   1181 		}
   1182 
   1183 		this_len = this_chunk - stream.avail_out;
   1184 		p += this_len;
   1185 		total -= this_len;
   1186 	}
   1187 
   1188 err:
   1189 	inflateEnd(&stream);
   1190 	return ret;
   1191 #else
   1192 	return NULL;
   1193 #endif
   1194 }
   1195 
   1196 /*
   1197  * This has been compressed on the server side, since it can be big.
   1198  * Uncompress here.
   1199  */
   1200 static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd)
   1201 {
   1202 	struct cmd_iolog_pdu *pdu = (struct cmd_iolog_pdu *) cmd->payload;
   1203 	struct cmd_iolog_pdu *ret;
   1204 	int i;
   1205 
   1206 	/*
   1207 	 * Convert if compressed and we support it. If it's not
   1208 	 * compressed, we need not do anything.
   1209 	 */
   1210 	if (le32_to_cpu(pdu->compressed)) {
   1211 #ifndef CONFIG_ZLIB
   1212 		log_err("fio: server sent compressed data by mistake\n");
   1213 		return NULL;
   1214 #endif
   1215 		ret = convert_iolog_gz(cmd, pdu);
   1216 		if (!ret) {
   1217 			log_err("fio: failed decompressing log\n");
   1218 			return NULL;
   1219 		}
   1220 	} else
   1221 		ret = pdu;
   1222 
   1223 	ret->thread_number	= le32_to_cpu(ret->thread_number);
   1224 	ret->nr_samples		= le32_to_cpu(ret->nr_samples);
   1225 	ret->log_type		= le32_to_cpu(ret->log_type);
   1226 	ret->compressed		= le32_to_cpu(ret->compressed);
   1227 
   1228 	for (i = 0; i < ret->nr_samples; i++) {
   1229 		struct io_sample *s = &ret->samples[i];
   1230 
   1231 		s->time	= le64_to_cpu(s->time);
   1232 		s->val	= le64_to_cpu(s->val);
   1233 		s->ddir	= le32_to_cpu(s->ddir);
   1234 		s->bs	= le32_to_cpu(s->bs);
   1235 	}
   1236 
   1237 	return ret;
   1238 }
   1239 
   1240 int fio_handle_client(struct fio_client *client)
   1241 {
   1242 	struct client_ops *ops = client->ops;
   1243 	struct fio_net_cmd *cmd;
   1244 
   1245 	dprint(FD_NET, "client: handle %s\n", client->hostname);
   1246 
   1247 	cmd = fio_net_recv_cmd(client->fd);
   1248 	if (!cmd)
   1249 		return 0;
   1250 
   1251 	dprint(FD_NET, "client: got cmd op %s from %s (pdu=%u)\n",
   1252 		fio_server_op(cmd->opcode), client->hostname, cmd->pdu_len);
   1253 
   1254 	switch (cmd->opcode) {
   1255 	case FIO_NET_CMD_QUIT:
   1256 		if (ops->quit)
   1257 			ops->quit(client, cmd);
   1258 		remove_client(client);
   1259 		free(cmd);
   1260 		break;
   1261 	case FIO_NET_CMD_TEXT:
   1262 		convert_text(cmd);
   1263 		ops->text(client, cmd);
   1264 		free(cmd);
   1265 		break;
   1266 	case FIO_NET_CMD_DU: {
   1267 		struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
   1268 
   1269 		convert_dus(&du->dus);
   1270 		convert_agg(&du->agg);
   1271 
   1272 		ops->disk_util(client, cmd);
   1273 		free(cmd);
   1274 		break;
   1275 		}
   1276 	case FIO_NET_CMD_TS: {
   1277 		struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
   1278 
   1279 		convert_ts(&p->ts, &p->ts);
   1280 		convert_gs(&p->rs, &p->rs);
   1281 
   1282 		ops->thread_status(client, cmd);
   1283 		free(cmd);
   1284 		break;
   1285 		}
   1286 	case FIO_NET_CMD_GS: {
   1287 		struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
   1288 
   1289 		convert_gs(gs, gs);
   1290 
   1291 		ops->group_stats(client, cmd);
   1292 		free(cmd);
   1293 		break;
   1294 		}
   1295 	case FIO_NET_CMD_ETA: {
   1296 		struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
   1297 
   1298 		remove_reply_cmd(client, cmd);
   1299 		convert_jobs_eta(je);
   1300 		handle_eta(client, cmd);
   1301 		free(cmd);
   1302 		break;
   1303 		}
   1304 	case FIO_NET_CMD_PROBE:
   1305 		remove_reply_cmd(client, cmd);
   1306 		ops->probe(client, cmd);
   1307 		free(cmd);
   1308 		break;
   1309 	case FIO_NET_CMD_SERVER_START:
   1310 		client->state = Client_running;
   1311 		if (ops->job_start)
   1312 			ops->job_start(client, cmd);
   1313 		free(cmd);
   1314 		break;
   1315 	case FIO_NET_CMD_START: {
   1316 		struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
   1317 
   1318 		pdu->jobs = le32_to_cpu(pdu->jobs);
   1319 		ops->start(client, cmd);
   1320 		free(cmd);
   1321 		break;
   1322 		}
   1323 	case FIO_NET_CMD_STOP: {
   1324 		struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
   1325 
   1326 		convert_stop(cmd);
   1327 		client->state = Client_stopped;
   1328 		client->error = le32_to_cpu(pdu->error);
   1329 		client->signal = le32_to_cpu(pdu->signal);
   1330 		ops->stop(client, cmd);
   1331 		free(cmd);
   1332 		break;
   1333 		}
   1334 	case FIO_NET_CMD_ADD_JOB: {
   1335 		struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload;
   1336 
   1337 		client->thread_number = le32_to_cpu(pdu->thread_number);
   1338 		client->groupid = le32_to_cpu(pdu->groupid);
   1339 
   1340 		if (ops->add_job)
   1341 			ops->add_job(client, cmd);
   1342 		free(cmd);
   1343 		break;
   1344 		}
   1345 	case FIO_NET_CMD_IOLOG:
   1346 		if (ops->iolog) {
   1347 			struct cmd_iolog_pdu *pdu;
   1348 
   1349 			pdu = convert_iolog(cmd);
   1350 			ops->iolog(client, pdu);
   1351 		}
   1352 		free(cmd);
   1353 		break;
   1354 	case FIO_NET_CMD_UPDATE_JOB:
   1355 		ops->update_job(client, cmd);
   1356 		remove_reply_cmd(client, cmd);
   1357 		free(cmd);
   1358 		break;
   1359 	default:
   1360 		log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode));
   1361 		free(cmd);
   1362 		break;
   1363 	}
   1364 
   1365 	return 1;
   1366 }
   1367 
   1368 static void request_client_etas(struct client_ops *ops)
   1369 {
   1370 	struct fio_client *client;
   1371 	struct flist_head *entry;
   1372 	struct client_eta *eta;
   1373 	int skipped = 0;
   1374 
   1375 	dprint(FD_NET, "client: request eta (%d)\n", nr_clients);
   1376 
   1377 	eta = malloc(sizeof(*eta));
   1378 	memset(&eta->eta, 0, sizeof(eta->eta));
   1379 	eta->pending = nr_clients;
   1380 
   1381 	flist_for_each(entry, &client_list) {
   1382 		client = flist_entry(entry, struct fio_client, list);
   1383 
   1384 		if (!flist_empty(&client->eta_list)) {
   1385 			skipped++;
   1386 			continue;
   1387 		}
   1388 		if (client->state != Client_running)
   1389 			continue;
   1390 
   1391 		assert(!client->eta_in_flight);
   1392 		flist_add_tail(&client->eta_list, &eta_list);
   1393 		client->eta_in_flight = eta;
   1394 		fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA,
   1395 					(uintptr_t) eta, &client->cmd_list);
   1396 	}
   1397 
   1398 	while (skipped--)
   1399 		fio_client_dec_jobs_eta(eta, ops->eta);
   1400 
   1401 	dprint(FD_NET, "client: requested eta tag %p\n", eta);
   1402 }
   1403 
   1404 static int client_check_cmd_timeout(struct fio_client *client,
   1405 				    struct timeval *now)
   1406 {
   1407 	struct fio_net_cmd_reply *reply;
   1408 	struct flist_head *entry, *tmp;
   1409 	int ret = 0;
   1410 
   1411 	flist_for_each_safe(entry, tmp, &client->cmd_list) {
   1412 		reply = flist_entry(entry, struct fio_net_cmd_reply, list);
   1413 
   1414 		if (mtime_since(&reply->tv, now) < FIO_NET_CLIENT_TIMEOUT)
   1415 			continue;
   1416 
   1417 		log_err("fio: client %s, timeout on cmd %s\n", client->hostname,
   1418 						fio_server_op(reply->opcode));
   1419 		flist_del(&reply->list);
   1420 		free(reply);
   1421 		ret = 1;
   1422 	}
   1423 
   1424 	return flist_empty(&client->cmd_list) && ret;
   1425 }
   1426 
   1427 static int fio_check_clients_timed_out(void)
   1428 {
   1429 	struct fio_client *client;
   1430 	struct flist_head *entry, *tmp;
   1431 	struct timeval tv;
   1432 	int ret = 0;
   1433 
   1434 	fio_gettime(&tv, NULL);
   1435 
   1436 	flist_for_each_safe(entry, tmp, &client_list) {
   1437 		client = flist_entry(entry, struct fio_client, list);
   1438 
   1439 		if (flist_empty(&client->cmd_list))
   1440 			continue;
   1441 
   1442 		if (!client_check_cmd_timeout(client, &tv))
   1443 			continue;
   1444 
   1445 		if (client->ops->timed_out)
   1446 			client->ops->timed_out(client);
   1447 		else
   1448 			log_err("fio: client %s timed out\n", client->hostname);
   1449 
   1450 		remove_client(client);
   1451 		ret = 1;
   1452 	}
   1453 
   1454 	return ret;
   1455 }
   1456 
   1457 int fio_handle_clients(struct client_ops *ops)
   1458 {
   1459 	struct pollfd *pfds;
   1460 	int i, ret = 0, retval = 0;
   1461 
   1462 	fio_gettime(&eta_tv, NULL);
   1463 
   1464 	pfds = malloc(nr_clients * sizeof(struct pollfd));
   1465 
   1466 	init_thread_stat(&client_ts);
   1467 	init_group_run_stat(&client_gs);
   1468 
   1469 	while (!exit_backend && nr_clients) {
   1470 		struct flist_head *entry, *tmp;
   1471 		struct fio_client *client;
   1472 
   1473 		i = 0;
   1474 		flist_for_each_safe(entry, tmp, &client_list) {
   1475 			client = flist_entry(entry, struct fio_client, list);
   1476 
   1477 			if (!client->sent_job && !client->ops->stay_connected &&
   1478 			    flist_empty(&client->cmd_list)) {
   1479 				remove_client(client);
   1480 				continue;
   1481 			}
   1482 
   1483 			pfds[i].fd = client->fd;
   1484 			pfds[i].events = POLLIN;
   1485 			i++;
   1486 		}
   1487 
   1488 		if (!nr_clients)
   1489 			break;
   1490 
   1491 		assert(i == nr_clients);
   1492 
   1493 		do {
   1494 			struct timeval tv;
   1495 
   1496 			fio_gettime(&tv, NULL);
   1497 			if (mtime_since(&eta_tv, &tv) >= 900) {
   1498 				request_client_etas(ops);
   1499 				memcpy(&eta_tv, &tv, sizeof(tv));
   1500 
   1501 				if (fio_check_clients_timed_out())
   1502 					break;
   1503 			}
   1504 
   1505 			ret = poll(pfds, nr_clients, ops->eta_msec);
   1506 			if (ret < 0) {
   1507 				if (errno == EINTR)
   1508 					continue;
   1509 				log_err("fio: poll clients: %s\n", strerror(errno));
   1510 				break;
   1511 			} else if (!ret)
   1512 				continue;
   1513 		} while (ret <= 0);
   1514 
   1515 		for (i = 0; i < nr_clients; i++) {
   1516 			if (!(pfds[i].revents & POLLIN))
   1517 				continue;
   1518 
   1519 			client = find_client_by_fd(pfds[i].fd);
   1520 			if (!client) {
   1521 				log_err("fio: unknown client fd %ld\n", (long) pfds[i].fd);
   1522 				continue;
   1523 			}
   1524 			if (!fio_handle_client(client)) {
   1525 				log_info("client: host=%s disconnected\n",
   1526 						client->hostname);
   1527 				remove_client(client);
   1528 				retval = 1;
   1529 			} else if (client->error)
   1530 				retval = 1;
   1531 			fio_put_client(client);
   1532 		}
   1533 	}
   1534 
   1535 	fio_client_json_fini();
   1536 
   1537 	free(pfds);
   1538 	return retval;
   1539 }
   1540