Home | History | Annotate | Download | only in fio
      1 #include <stdio.h>
      2 #include <stdlib.h>
      3 #include <unistd.h>
      4 #include <limits.h>
      5 #include <errno.h>
      6 #include <fcntl.h>
      7 #include <sys/poll.h>
      8 #include <sys/types.h>
      9 #include <sys/stat.h>
     10 #include <sys/wait.h>
     11 #include <sys/socket.h>
     12 #include <sys/un.h>
     13 #include <netinet/in.h>
     14 #include <arpa/inet.h>
     15 #include <netdb.h>
     16 #include <signal.h>
     17 #ifdef CONFIG_ZLIB
     18 #include <zlib.h>
     19 #endif
     20 
     21 #include "fio.h"
     22 #include "client.h"
     23 #include "server.h"
     24 #include "flist.h"
     25 #include "hash.h"
     26 #include "verify.h"
     27 
     28 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd);
     29 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd);
     30 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd);
     31 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd);
     32 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd);
     33 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd);
     34 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd);
     35 
     36 struct client_ops fio_client_ops = {
     37 	.text		= handle_text,
     38 	.disk_util	= handle_du,
     39 	.thread_status	= handle_ts,
     40 	.group_stats	= handle_gs,
     41 	.stop		= handle_stop,
     42 	.start		= handle_start,
     43 	.eta		= display_thread_status,
     44 	.probe		= handle_probe,
     45 	.eta_msec	= FIO_CLIENT_DEF_ETA_MSEC,
     46 	.client_type	= FIO_CLIENT_TYPE_CLI,
     47 };
     48 
     49 static struct timeval eta_tv;
     50 
     51 static FLIST_HEAD(client_list);
     52 static FLIST_HEAD(eta_list);
     53 
     54 static FLIST_HEAD(arg_list);
     55 
     56 struct thread_stat client_ts;
     57 struct group_run_stats client_gs;
     58 int sum_stat_clients;
     59 
     60 static int sum_stat_nr;
     61 static struct json_object *root = NULL;
     62 static struct json_array *clients_array = NULL;
     63 static struct json_array *du_array = NULL;
     64 
     65 static int error_clients;
     66 
     67 #define FIO_CLIENT_HASH_BITS	7
     68 #define FIO_CLIENT_HASH_SZ	(1 << FIO_CLIENT_HASH_BITS)
     69 #define FIO_CLIENT_HASH_MASK	(FIO_CLIENT_HASH_SZ - 1)
     70 static struct flist_head client_hash[FIO_CLIENT_HASH_SZ];
     71 
     72 static void fio_client_add_hash(struct fio_client *client)
     73 {
     74 	int bucket = hash_long(client->fd, FIO_CLIENT_HASH_BITS);
     75 
     76 	bucket &= FIO_CLIENT_HASH_MASK;
     77 	flist_add(&client->hash_list, &client_hash[bucket]);
     78 }
     79 
     80 static void fio_client_remove_hash(struct fio_client *client)
     81 {
     82 	if (!flist_empty(&client->hash_list))
     83 		flist_del_init(&client->hash_list);
     84 }
     85 
     86 static void fio_init fio_client_hash_init(void)
     87 {
     88 	int i;
     89 
     90 	for (i = 0; i < FIO_CLIENT_HASH_SZ; i++)
     91 		INIT_FLIST_HEAD(&client_hash[i]);
     92 }
     93 
     94 static int read_data(int fd, void *data, size_t size)
     95 {
     96 	ssize_t ret;
     97 
     98 	while (size) {
     99 		ret = read(fd, data, size);
    100 		if (ret < 0) {
    101 			if (errno == EAGAIN || errno == EINTR)
    102 				continue;
    103 			break;
    104 		} else if (!ret)
    105 			break;
    106 		else {
    107 			data += ret;
    108 			size -= ret;
    109 		}
    110 	}
    111 
    112 	if (size)
    113 		return EAGAIN;
    114 
    115 	return 0;
    116 }
    117 
    118 static void fio_client_json_init(void)
    119 {
    120 	if (output_format != FIO_OUTPUT_JSON)
    121 		return;
    122 	root = json_create_object();
    123 	json_object_add_value_string(root, "fio version", fio_version_string);
    124 	clients_array = json_create_array();
    125 	json_object_add_value_array(root, "client_stats", clients_array);
    126 	du_array = json_create_array();
    127 	json_object_add_value_array(root, "disk_util", du_array);
    128 }
    129 
    130 static void fio_client_json_fini(void)
    131 {
    132 	if (output_format != FIO_OUTPUT_JSON)
    133 		return;
    134 	json_print_object(root);
    135 	log_info("\n");
    136 	json_free_object(root);
    137 	root = NULL;
    138 	clients_array = NULL;
    139 	du_array = NULL;
    140 }
    141 
    142 static struct fio_client *find_client_by_fd(int fd)
    143 {
    144 	int bucket = hash_long(fd, FIO_CLIENT_HASH_BITS) & FIO_CLIENT_HASH_MASK;
    145 	struct fio_client *client;
    146 	struct flist_head *entry;
    147 
    148 	flist_for_each(entry, &client_hash[bucket]) {
    149 		client = flist_entry(entry, struct fio_client, hash_list);
    150 
    151 		if (client->fd == fd) {
    152 			client->refs++;
    153 			return client;
    154 		}
    155 	}
    156 
    157 	return NULL;
    158 }
    159 
    160 void fio_put_client(struct fio_client *client)
    161 {
    162 	if (--client->refs)
    163 		return;
    164 
    165 	free(client->hostname);
    166 	if (client->argv)
    167 		free(client->argv);
    168 	if (client->name)
    169 		free(client->name);
    170 	while (client->nr_files) {
    171 		struct client_file *cf = &client->files[--client->nr_files];
    172 
    173 		free(cf->file);
    174 	}
    175 	if (client->files)
    176 		free(client->files);
    177 
    178 	if (!client->did_stat)
    179 		sum_stat_clients--;
    180 
    181 	if (client->error)
    182 		error_clients++;
    183 
    184 	free(client);
    185 }
    186 
    187 static void remove_client(struct fio_client *client)
    188 {
    189 	assert(client->refs);
    190 
    191 	dprint(FD_NET, "client: removed <%s>\n", client->hostname);
    192 
    193 	if (!flist_empty(&client->list))
    194 		flist_del_init(&client->list);
    195 
    196 	fio_client_remove_hash(client);
    197 
    198 	if (!flist_empty(&client->eta_list)) {
    199 		flist_del_init(&client->eta_list);
    200 		fio_client_dec_jobs_eta(client->eta_in_flight, client->ops->eta);
    201 	}
    202 
    203 	close(client->fd);
    204 	client->fd = -1;
    205 
    206 	if (client->ops->removed)
    207 		client->ops->removed(client);
    208 
    209 	nr_clients--;
    210 	fio_put_client(client);
    211 }
    212 
    213 struct fio_client *fio_get_client(struct fio_client *client)
    214 {
    215 	client->refs++;
    216 	return client;
    217 }
    218 
    219 static void __fio_client_add_cmd_option(struct fio_client *client,
    220 					const char *opt)
    221 {
    222 	int index;
    223 
    224 	index = client->argc++;
    225 	client->argv = realloc(client->argv, sizeof(char *) * client->argc);
    226 	client->argv[index] = strdup(opt);
    227 	dprint(FD_NET, "client: add cmd %d: %s\n", index, opt);
    228 }
    229 
    230 void fio_client_add_cmd_option(void *cookie, const char *opt)
    231 {
    232 	struct fio_client *client = cookie;
    233 	struct flist_head *entry;
    234 
    235 	if (!client || !opt)
    236 		return;
    237 
    238 	__fio_client_add_cmd_option(client, opt);
    239 
    240 	/*
    241 	 * Duplicate arguments to shared client group
    242 	 */
    243 	flist_for_each(entry, &arg_list) {
    244 		client = flist_entry(entry, struct fio_client, arg_list);
    245 
    246 		__fio_client_add_cmd_option(client, opt);
    247 	}
    248 }
    249 
    250 struct fio_client *fio_client_add_explicit(struct client_ops *ops,
    251 					   const char *hostname, int type,
    252 					   int port)
    253 {
    254 	struct fio_client *client;
    255 
    256 	client = malloc(sizeof(*client));
    257 	memset(client, 0, sizeof(*client));
    258 
    259 	INIT_FLIST_HEAD(&client->list);
    260 	INIT_FLIST_HEAD(&client->hash_list);
    261 	INIT_FLIST_HEAD(&client->arg_list);
    262 	INIT_FLIST_HEAD(&client->eta_list);
    263 	INIT_FLIST_HEAD(&client->cmd_list);
    264 
    265 	client->hostname = strdup(hostname);
    266 
    267 	if (type == Fio_client_socket)
    268 		client->is_sock = 1;
    269 	else {
    270 		int ipv6;
    271 
    272 		ipv6 = type == Fio_client_ipv6;
    273 		if (fio_server_parse_host(hostname, ipv6,
    274 						&client->addr.sin_addr,
    275 						&client->addr6.sin6_addr))
    276 			goto err;
    277 
    278 		client->port = port;
    279 	}
    280 
    281 	client->fd = -1;
    282 	client->ops = ops;
    283 	client->refs = 1;
    284 	client->type = ops->client_type;
    285 
    286 	__fio_client_add_cmd_option(client, "fio");
    287 
    288 	flist_add(&client->list, &client_list);
    289 	nr_clients++;
    290 	dprint(FD_NET, "client: added <%s>\n", client->hostname);
    291 	return client;
    292 err:
    293 	free(client);
    294 	return NULL;
    295 }
    296 
    297 int fio_client_add_ini_file(void *cookie, const char *ini_file, int remote)
    298 {
    299 	struct fio_client *client = cookie;
    300 	struct client_file *cf;
    301 	size_t new_size;
    302 	void *new_files;
    303 
    304 	if (!client)
    305 		return 1;
    306 
    307 	dprint(FD_NET, "client <%s>: add ini %s\n", client->hostname, ini_file);
    308 
    309 	new_size = (client->nr_files + 1) * sizeof(struct client_file);
    310 	new_files = realloc(client->files, new_size);
    311 	if (!new_files)
    312 		return 1;
    313 
    314 	client->files = new_files;
    315 	cf = &client->files[client->nr_files];
    316 	cf->file = strdup(ini_file);
    317 	cf->remote = remote;
    318 	client->nr_files++;
    319 	return 0;
    320 }
    321 
    322 int fio_client_add(struct client_ops *ops, const char *hostname, void **cookie)
    323 {
    324 	struct fio_client *existing = *cookie;
    325 	struct fio_client *client;
    326 
    327 	if (existing) {
    328 		/*
    329 		 * We always add our "exec" name as the option, hence 1
    330 		 * means empty.
    331 		 */
    332 		if (existing->argc == 1)
    333 			flist_add_tail(&existing->arg_list, &arg_list);
    334 		else {
    335 			while (!flist_empty(&arg_list))
    336 				flist_del_init(arg_list.next);
    337 		}
    338 	}
    339 
    340 	client = malloc(sizeof(*client));
    341 	memset(client, 0, sizeof(*client));
    342 
    343 	INIT_FLIST_HEAD(&client->list);
    344 	INIT_FLIST_HEAD(&client->hash_list);
    345 	INIT_FLIST_HEAD(&client->arg_list);
    346 	INIT_FLIST_HEAD(&client->eta_list);
    347 	INIT_FLIST_HEAD(&client->cmd_list);
    348 
    349 	if (fio_server_parse_string(hostname, &client->hostname,
    350 					&client->is_sock, &client->port,
    351 					&client->addr.sin_addr,
    352 					&client->addr6.sin6_addr,
    353 					&client->ipv6))
    354 		return -1;
    355 
    356 	client->fd = -1;
    357 	client->ops = ops;
    358 	client->refs = 1;
    359 	client->type = ops->client_type;
    360 
    361 	__fio_client_add_cmd_option(client, "fio");
    362 
    363 	flist_add(&client->list, &client_list);
    364 	nr_clients++;
    365 	dprint(FD_NET, "client: added <%s>\n", client->hostname);
    366 	*cookie = client;
    367 	return 0;
    368 }
    369 
    370 static const char *server_name(struct fio_client *client, char *buf,
    371 			       size_t bufsize)
    372 {
    373 	const char *from;
    374 
    375 	if (client->ipv6)
    376 		from = inet_ntop(AF_INET6, (struct sockaddr *) &client->addr6.sin6_addr, buf, bufsize);
    377 	else if (client->is_sock)
    378 		from = "sock";
    379 	else
    380 		from = inet_ntop(AF_INET, (struct sockaddr *) &client->addr.sin_addr, buf, bufsize);
    381 
    382 	return from;
    383 }
    384 
    385 static void probe_client(struct fio_client *client)
    386 {
    387 	struct cmd_client_probe_pdu pdu;
    388 	const char *sname;
    389 	uint64_t tag;
    390 	char buf[64];
    391 
    392 	dprint(FD_NET, "client: send probe\n");
    393 
    394 #ifdef CONFIG_ZLIB
    395 	pdu.flags = __le64_to_cpu(FIO_PROBE_FLAG_ZLIB);
    396 #else
    397 	pdu.flags = 0;
    398 #endif
    399 
    400 	sname = server_name(client, buf, sizeof(buf));
    401 	memset(pdu.server, 0, sizeof(pdu.server));
    402 	strncpy((char *) pdu.server, sname, sizeof(pdu.server) - 1);
    403 
    404 	fio_net_send_cmd(client->fd, FIO_NET_CMD_PROBE, &pdu, sizeof(pdu), &tag, &client->cmd_list);
    405 }
    406 
    407 static int fio_client_connect_ip(struct fio_client *client)
    408 {
    409 	struct sockaddr *addr;
    410 	socklen_t socklen;
    411 	int fd, domain;
    412 
    413 	if (client->ipv6) {
    414 		client->addr6.sin6_family = AF_INET6;
    415 		client->addr6.sin6_port = htons(client->port);
    416 		domain = AF_INET6;
    417 		addr = (struct sockaddr *) &client->addr6;
    418 		socklen = sizeof(client->addr6);
    419 	} else {
    420 		client->addr.sin_family = AF_INET;
    421 		client->addr.sin_port = htons(client->port);
    422 		domain = AF_INET;
    423 		addr = (struct sockaddr *) &client->addr;
    424 		socklen = sizeof(client->addr);
    425 	}
    426 
    427 	fd = socket(domain, SOCK_STREAM, 0);
    428 	if (fd < 0) {
    429 		int ret = -errno;
    430 
    431 		log_err("fio: socket: %s\n", strerror(errno));
    432 		return ret;
    433 	}
    434 
    435 	if (connect(fd, addr, socklen) < 0) {
    436 		int ret = -errno;
    437 
    438 		log_err("fio: connect: %s\n", strerror(errno));
    439 		log_err("fio: failed to connect to %s:%u\n", client->hostname,
    440 								client->port);
    441 		close(fd);
    442 		return ret;
    443 	}
    444 
    445 	return fd;
    446 }
    447 
    448 static int fio_client_connect_sock(struct fio_client *client)
    449 {
    450 	struct sockaddr_un *addr = &client->addr_un;
    451 	socklen_t len;
    452 	int fd;
    453 
    454 	memset(addr, 0, sizeof(*addr));
    455 	addr->sun_family = AF_UNIX;
    456 	strncpy(addr->sun_path, client->hostname, sizeof(addr->sun_path) - 1);
    457 
    458 	fd = socket(AF_UNIX, SOCK_STREAM, 0);
    459 	if (fd < 0) {
    460 		int ret = -errno;
    461 
    462 		log_err("fio: socket: %s\n", strerror(errno));
    463 		return ret;
    464 	}
    465 
    466 	len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
    467 	if (connect(fd, (struct sockaddr *) addr, len) < 0) {
    468 		int ret = -errno;
    469 
    470 		log_err("fio: connect; %s\n", strerror(errno));
    471 		close(fd);
    472 		return ret;
    473 	}
    474 
    475 	return fd;
    476 }
    477 
    478 int fio_client_connect(struct fio_client *client)
    479 {
    480 	int fd;
    481 
    482 	dprint(FD_NET, "client: connect to host %s\n", client->hostname);
    483 
    484 	if (client->is_sock)
    485 		fd = fio_client_connect_sock(client);
    486 	else
    487 		fd = fio_client_connect_ip(client);
    488 
    489 	dprint(FD_NET, "client: %s connected %d\n", client->hostname, fd);
    490 
    491 	if (fd < 0)
    492 		return fd;
    493 
    494 	client->fd = fd;
    495 	fio_client_add_hash(client);
    496 	client->state = Client_connected;
    497 
    498 	probe_client(client);
    499 	return 0;
    500 }
    501 
    502 int fio_client_terminate(struct fio_client *client)
    503 {
    504 	return fio_net_send_quit(client->fd);
    505 }
    506 
    507 void fio_clients_terminate(void)
    508 {
    509 	struct flist_head *entry;
    510 	struct fio_client *client;
    511 
    512 	dprint(FD_NET, "client: terminate clients\n");
    513 
    514 	flist_for_each(entry, &client_list) {
    515 		client = flist_entry(entry, struct fio_client, list);
    516 		fio_client_terminate(client);
    517 	}
    518 }
    519 
    520 static void sig_int(int sig)
    521 {
    522 	dprint(FD_NET, "client: got signal %d\n", sig);
    523 	fio_clients_terminate();
    524 }
    525 
    526 static void sig_show_status(int sig)
    527 {
    528 	show_running_run_stats();
    529 }
    530 
    531 static void client_signal_handler(void)
    532 {
    533 	struct sigaction act;
    534 
    535 	memset(&act, 0, sizeof(act));
    536 	act.sa_handler = sig_int;
    537 	act.sa_flags = SA_RESTART;
    538 	sigaction(SIGINT, &act, NULL);
    539 
    540 	memset(&act, 0, sizeof(act));
    541 	act.sa_handler = sig_int;
    542 	act.sa_flags = SA_RESTART;
    543 	sigaction(SIGTERM, &act, NULL);
    544 
    545 /* Windows uses SIGBREAK as a quit signal from other applications */
    546 #ifdef WIN32
    547 	memset(&act, 0, sizeof(act));
    548 	act.sa_handler = sig_int;
    549 	act.sa_flags = SA_RESTART;
    550 	sigaction(SIGBREAK, &act, NULL);
    551 #endif
    552 
    553 	memset(&act, 0, sizeof(act));
    554 	act.sa_handler = sig_show_status;
    555 	act.sa_flags = SA_RESTART;
    556 	sigaction(SIGUSR1, &act, NULL);
    557 }
    558 
    559 static int send_client_cmd_line(struct fio_client *client)
    560 {
    561 	struct cmd_single_line_pdu *cslp;
    562 	struct cmd_line_pdu *clp;
    563 	unsigned long offset;
    564 	unsigned int *lens;
    565 	void *pdu;
    566 	size_t mem;
    567 	int i, ret;
    568 
    569 	dprint(FD_NET, "client: send cmdline %d\n", client->argc);
    570 
    571 	lens = malloc(client->argc * sizeof(unsigned int));
    572 
    573 	/*
    574 	 * Find out how much mem we need
    575 	 */
    576 	for (i = 0, mem = 0; i < client->argc; i++) {
    577 		lens[i] = strlen(client->argv[i]) + 1;
    578 		mem += lens[i];
    579 	}
    580 
    581 	/*
    582 	 * We need one cmd_line_pdu, and argc number of cmd_single_line_pdu
    583 	 */
    584 	mem += sizeof(*clp) + (client->argc * sizeof(*cslp));
    585 
    586 	pdu = malloc(mem);
    587 	clp = pdu;
    588 	offset = sizeof(*clp);
    589 
    590 	for (i = 0; i < client->argc; i++) {
    591 		uint16_t arg_len = lens[i];
    592 
    593 		cslp = pdu + offset;
    594 		strcpy((char *) cslp->text, client->argv[i]);
    595 		cslp->len = cpu_to_le16(arg_len);
    596 		offset += sizeof(*cslp) + arg_len;
    597 	}
    598 
    599 	free(lens);
    600 	clp->lines = cpu_to_le16(client->argc);
    601 	clp->client_type = __cpu_to_le16(client->type);
    602 	ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, NULL, NULL);
    603 	free(pdu);
    604 	return ret;
    605 }
    606 
    607 int fio_clients_connect(void)
    608 {
    609 	struct fio_client *client;
    610 	struct flist_head *entry, *tmp;
    611 	int ret;
    612 
    613 #ifdef WIN32
    614 	WSADATA wsd;
    615 	WSAStartup(MAKEWORD(2, 2), &wsd);
    616 #endif
    617 
    618 	dprint(FD_NET, "client: connect all\n");
    619 
    620 	client_signal_handler();
    621 
    622 	flist_for_each_safe(entry, tmp, &client_list) {
    623 		client = flist_entry(entry, struct fio_client, list);
    624 
    625 		ret = fio_client_connect(client);
    626 		if (ret) {
    627 			remove_client(client);
    628 			continue;
    629 		}
    630 
    631 		if (client->argc > 1)
    632 			send_client_cmd_line(client);
    633 	}
    634 
    635 	return !nr_clients;
    636 }
    637 
    638 int fio_start_client(struct fio_client *client)
    639 {
    640 	dprint(FD_NET, "client: start %s\n", client->hostname);
    641 	return fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_RUN, 0, NULL);
    642 }
    643 
    644 int fio_start_all_clients(void)
    645 {
    646 	struct fio_client *client;
    647 	struct flist_head *entry, *tmp;
    648 	int ret;
    649 
    650 	dprint(FD_NET, "client: start all\n");
    651 
    652 	fio_client_json_init();
    653 
    654 	flist_for_each_safe(entry, tmp, &client_list) {
    655 		client = flist_entry(entry, struct fio_client, list);
    656 
    657 		ret = fio_start_client(client);
    658 		if (ret) {
    659 			remove_client(client);
    660 			continue;
    661 		}
    662 	}
    663 
    664 	return flist_empty(&client_list);
    665 }
    666 
    667 static int __fio_client_send_remote_ini(struct fio_client *client,
    668 					const char *filename)
    669 {
    670 	struct cmd_load_file_pdu *pdu;
    671 	size_t p_size;
    672 	int ret;
    673 
    674 	dprint(FD_NET, "send remote ini %s to %s\n", filename, client->hostname);
    675 
    676 	p_size = sizeof(*pdu) + strlen(filename) + 1;
    677 	pdu = malloc(p_size);
    678 	memset(pdu, 0, p_size);
    679 	pdu->name_len = strlen(filename);
    680 	strcpy((char *) pdu->file, filename);
    681 	pdu->client_type = cpu_to_le16((uint16_t) client->type);
    682 
    683 	client->sent_job = 1;
    684 	ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_LOAD_FILE, pdu, p_size,NULL, NULL);
    685 	free(pdu);
    686 	return ret;
    687 }
    688 
    689 /*
    690  * Send file contents to server backend. We could use sendfile(), but to remain
    691  * more portable lets just read/write the darn thing.
    692  */
    693 static int __fio_client_send_local_ini(struct fio_client *client,
    694 				       const char *filename)
    695 {
    696 	struct cmd_job_pdu *pdu;
    697 	size_t p_size;
    698 	struct stat sb;
    699 	char *p;
    700 	void *buf;
    701 	off_t len;
    702 	int fd, ret;
    703 
    704 	dprint(FD_NET, "send ini %s to %s\n", filename, client->hostname);
    705 
    706 	fd = open(filename, O_RDONLY);
    707 	if (fd < 0) {
    708 		ret = -errno;
    709 		log_err("fio: job file <%s> open: %s\n", filename, strerror(errno));
    710 		return ret;
    711 	}
    712 
    713 	if (fstat(fd, &sb) < 0) {
    714 		ret = -errno;
    715 		log_err("fio: job file stat: %s\n", strerror(errno));
    716 		close(fd);
    717 		return ret;
    718 	}
    719 
    720 	p_size = sb.st_size + sizeof(*pdu);
    721 	pdu = malloc(p_size);
    722 	buf = pdu->buf;
    723 
    724 	len = sb.st_size;
    725 	p = buf;
    726 	if (read_data(fd, p, len)) {
    727 		log_err("fio: failed reading job file %s\n", filename);
    728 		close(fd);
    729 		free(pdu);
    730 		return 1;
    731 	}
    732 
    733 	pdu->buf_len = __cpu_to_le32(sb.st_size);
    734 	pdu->client_type = cpu_to_le32(client->type);
    735 
    736 	client->sent_job = 1;
    737 	ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, pdu, p_size, NULL, NULL);
    738 	free(pdu);
    739 	close(fd);
    740 	return ret;
    741 }
    742 
    743 int fio_client_send_ini(struct fio_client *client, const char *filename,
    744 			int remote)
    745 {
    746 	int ret;
    747 
    748 	if (!remote)
    749 		ret = __fio_client_send_local_ini(client, filename);
    750 	else
    751 		ret = __fio_client_send_remote_ini(client, filename);
    752 
    753 	if (!ret)
    754 		client->sent_job = 1;
    755 
    756 	return ret;
    757 }
    758 
    759 static int fio_client_send_cf(struct fio_client *client,
    760 			      struct client_file *cf)
    761 {
    762 	return fio_client_send_ini(client, cf->file, cf->remote);
    763 }
    764 
    765 int fio_clients_send_ini(const char *filename)
    766 {
    767 	struct fio_client *client;
    768 	struct flist_head *entry, *tmp;
    769 
    770 	flist_for_each_safe(entry, tmp, &client_list) {
    771 		client = flist_entry(entry, struct fio_client, list);
    772 
    773 		if (client->nr_files) {
    774 			int i;
    775 
    776 			for (i = 0; i < client->nr_files; i++) {
    777 				struct client_file *cf;
    778 
    779 				cf = &client->files[i];
    780 
    781 				if (fio_client_send_cf(client, cf)) {
    782 					remove_client(client);
    783 					break;
    784 				}
    785 			}
    786 		}
    787 		if (client->sent_job)
    788 			continue;
    789 		if (!filename || fio_client_send_ini(client, filename, 0))
    790 			remove_client(client);
    791 	}
    792 
    793 	return !nr_clients;
    794 }
    795 
    796 int fio_client_update_options(struct fio_client *client,
    797 			      struct thread_options *o, uint64_t *tag)
    798 {
    799 	struct cmd_add_job_pdu pdu;
    800 
    801 	pdu.thread_number = cpu_to_le32(client->thread_number);
    802 	pdu.groupid = cpu_to_le32(client->groupid);
    803 	convert_thread_options_to_net(&pdu.top, o);
    804 
    805 	return fio_net_send_cmd(client->fd, FIO_NET_CMD_UPDATE_JOB, &pdu, sizeof(pdu), tag, &client->cmd_list);
    806 }
    807 
    808 static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
    809 {
    810 	dst->max_val	= le64_to_cpu(src->max_val);
    811 	dst->min_val	= le64_to_cpu(src->min_val);
    812 	dst->samples	= le64_to_cpu(src->samples);
    813 
    814 	/*
    815 	 * Floats arrive as IEEE 754 encoded uint64_t, convert back to double
    816 	 */
    817 	dst->mean.u.f	= fio_uint64_to_double(le64_to_cpu(dst->mean.u.i));
    818 	dst->S.u.f	= fio_uint64_to_double(le64_to_cpu(dst->S.u.i));
    819 }
    820 
    821 static void convert_ts(struct thread_stat *dst, struct thread_stat *src)
    822 {
    823 	int i, j;
    824 
    825 	dst->error		= le32_to_cpu(src->error);
    826 	dst->thread_number	= le32_to_cpu(src->thread_number);
    827 	dst->groupid		= le32_to_cpu(src->groupid);
    828 	dst->pid		= le32_to_cpu(src->pid);
    829 	dst->members		= le32_to_cpu(src->members);
    830 	dst->unified_rw_rep	= le32_to_cpu(src->unified_rw_rep);
    831 
    832 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
    833 		convert_io_stat(&dst->clat_stat[i], &src->clat_stat[i]);
    834 		convert_io_stat(&dst->slat_stat[i], &src->slat_stat[i]);
    835 		convert_io_stat(&dst->lat_stat[i], &src->lat_stat[i]);
    836 		convert_io_stat(&dst->bw_stat[i], &src->bw_stat[i]);
    837 	}
    838 
    839 	dst->usr_time		= le64_to_cpu(src->usr_time);
    840 	dst->sys_time		= le64_to_cpu(src->sys_time);
    841 	dst->ctx		= le64_to_cpu(src->ctx);
    842 	dst->minf		= le64_to_cpu(src->minf);
    843 	dst->majf		= le64_to_cpu(src->majf);
    844 	dst->clat_percentiles	= le64_to_cpu(src->clat_percentiles);
    845 	dst->percentile_precision = le64_to_cpu(src->percentile_precision);
    846 
    847 	for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
    848 		fio_fp64_t *fps = &src->percentile_list[i];
    849 		fio_fp64_t *fpd = &dst->percentile_list[i];
    850 
    851 		fpd->u.f = fio_uint64_to_double(le64_to_cpu(fps->u.i));
    852 	}
    853 
    854 	for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
    855 		dst->io_u_map[i]	= le32_to_cpu(src->io_u_map[i]);
    856 		dst->io_u_submit[i]	= le32_to_cpu(src->io_u_submit[i]);
    857 		dst->io_u_complete[i]	= le32_to_cpu(src->io_u_complete[i]);
    858 	}
    859 
    860 	for (i = 0; i < FIO_IO_U_LAT_U_NR; i++) {
    861 		dst->io_u_lat_u[i]	= le32_to_cpu(src->io_u_lat_u[i]);
    862 		dst->io_u_lat_m[i]	= le32_to_cpu(src->io_u_lat_m[i]);
    863 	}
    864 
    865 	for (i = 0; i < DDIR_RWDIR_CNT; i++)
    866 		for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
    867 			dst->io_u_plat[i][j] = le32_to_cpu(src->io_u_plat[i][j]);
    868 
    869 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
    870 		dst->total_io_u[i]	= le64_to_cpu(src->total_io_u[i]);
    871 		dst->short_io_u[i]	= le64_to_cpu(src->short_io_u[i]);
    872 		dst->drop_io_u[i]	= le64_to_cpu(src->drop_io_u[i]);
    873 	}
    874 
    875 	dst->total_submit	= le64_to_cpu(src->total_submit);
    876 	dst->total_complete	= le64_to_cpu(src->total_complete);
    877 
    878 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
    879 		dst->io_bytes[i]	= le64_to_cpu(src->io_bytes[i]);
    880 		dst->runtime[i]		= le64_to_cpu(src->runtime[i]);
    881 	}
    882 
    883 	dst->total_run_time	= le64_to_cpu(src->total_run_time);
    884 	dst->continue_on_error	= le16_to_cpu(src->continue_on_error);
    885 	dst->total_err_count	= le64_to_cpu(src->total_err_count);
    886 	dst->first_error	= le32_to_cpu(src->first_error);
    887 	dst->kb_base		= le32_to_cpu(src->kb_base);
    888 	dst->unit_base		= le32_to_cpu(src->unit_base);
    889 
    890 	dst->latency_depth	= le32_to_cpu(src->latency_depth);
    891 	dst->latency_target	= le64_to_cpu(src->latency_target);
    892 	dst->latency_window	= le64_to_cpu(src->latency_window);
    893 	dst->latency_percentile.u.f = fio_uint64_to_double(le64_to_cpu(src->latency_percentile.u.i));
    894 }
    895 
    896 static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
    897 {
    898 	int i;
    899 
    900 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
    901 		dst->max_run[i]		= le64_to_cpu(src->max_run[i]);
    902 		dst->min_run[i]		= le64_to_cpu(src->min_run[i]);
    903 		dst->max_bw[i]		= le64_to_cpu(src->max_bw[i]);
    904 		dst->min_bw[i]		= le64_to_cpu(src->min_bw[i]);
    905 		dst->io_kb[i]		= le64_to_cpu(src->io_kb[i]);
    906 		dst->agg[i]		= le64_to_cpu(src->agg[i]);
    907 	}
    908 
    909 	dst->kb_base	= le32_to_cpu(src->kb_base);
    910 	dst->unit_base	= le32_to_cpu(src->unit_base);
    911 	dst->groupid	= le32_to_cpu(src->groupid);
    912 	dst->unified_rw_rep	= le32_to_cpu(src->unified_rw_rep);
    913 }
    914 
    915 static void json_object_add_client_info(struct json_object *obj,
    916 					struct fio_client *client)
    917 {
    918 	const char *hostname = client->hostname ? client->hostname : "";
    919 
    920 	json_object_add_value_string(obj, "hostname", hostname);
    921 	json_object_add_value_int(obj, "port", client->port);
    922 }
    923 
    924 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd)
    925 {
    926 	struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
    927 	struct json_object *tsobj;
    928 
    929 	tsobj = show_thread_status(&p->ts, &p->rs);
    930 	client->did_stat = 1;
    931 	if (tsobj) {
    932 		json_object_add_client_info(tsobj, client);
    933 		json_array_add_value_object(clients_array, tsobj);
    934 	}
    935 
    936 	if (sum_stat_clients <= 1)
    937 		return;
    938 
    939 	sum_thread_stats(&client_ts, &p->ts, sum_stat_nr);
    940 	sum_group_stats(&client_gs, &p->rs);
    941 
    942 	client_ts.members++;
    943 	client_ts.thread_number = p->ts.thread_number;
    944 	client_ts.groupid = p->ts.groupid;
    945 	client_ts.unified_rw_rep = p->ts.unified_rw_rep;
    946 
    947 	if (++sum_stat_nr == sum_stat_clients) {
    948 		strcpy(client_ts.name, "All clients");
    949 		tsobj = show_thread_status(&client_ts, &client_gs);
    950 		if (tsobj) {
    951 			json_object_add_client_info(tsobj, client);
    952 			json_array_add_value_object(clients_array, tsobj);
    953 		}
    954 	}
    955 }
    956 
    957 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd)
    958 {
    959 	struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
    960 
    961 	show_group_stats(gs);
    962 }
    963 
    964 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd)
    965 {
    966 	struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload;
    967 	const char *buf = (const char *) pdu->buf;
    968 	const char *name;
    969 	int fio_unused ret;
    970 
    971 	name = client->name ? client->name : client->hostname;
    972 
    973 	if (!client->skip_newline)
    974 		fprintf(f_out, "<%s> ", name);
    975 	ret = fwrite(buf, pdu->buf_len, 1, f_out);
    976 	fflush(f_out);
    977 	client->skip_newline = strchr(buf, '\n') == NULL;
    978 }
    979 
    980 static void convert_agg(struct disk_util_agg *agg)
    981 {
    982 	int i;
    983 
    984 	for (i = 0; i < 2; i++) {
    985 		agg->ios[i]	= le64_to_cpu(agg->ios[i]);
    986 		agg->merges[i]	= le64_to_cpu(agg->merges[i]);
    987 		agg->sectors[i]	= le64_to_cpu(agg->sectors[i]);
    988 		agg->ticks[i]	= le64_to_cpu(agg->ticks[i]);
    989 	}
    990 
    991 	agg->io_ticks		= le64_to_cpu(agg->io_ticks);
    992 	agg->time_in_queue	= le64_to_cpu(agg->time_in_queue);
    993 	agg->slavecount		= le32_to_cpu(agg->slavecount);
    994 	agg->max_util.u.f	= fio_uint64_to_double(le64_to_cpu(agg->max_util.u.i));
    995 }
    996 
    997 static void convert_dus(struct disk_util_stat *dus)
    998 {
    999 	int i;
   1000 
   1001 	for (i = 0; i < 2; i++) {
   1002 		dus->s.ios[i]		= le64_to_cpu(dus->s.ios[i]);
   1003 		dus->s.merges[i]	= le64_to_cpu(dus->s.merges[i]);
   1004 		dus->s.sectors[i]	= le64_to_cpu(dus->s.sectors[i]);
   1005 		dus->s.ticks[i]		= le64_to_cpu(dus->s.ticks[i]);
   1006 	}
   1007 
   1008 	dus->s.io_ticks		= le64_to_cpu(dus->s.io_ticks);
   1009 	dus->s.time_in_queue	= le64_to_cpu(dus->s.time_in_queue);
   1010 	dus->s.msec		= le64_to_cpu(dus->s.msec);
   1011 }
   1012 
   1013 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd)
   1014 {
   1015 	struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
   1016 
   1017 	if (!client->disk_stats_shown) {
   1018 		client->disk_stats_shown = 1;
   1019 		log_info("\nDisk stats (read/write):\n");
   1020 	}
   1021 
   1022 	if (output_format == FIO_OUTPUT_JSON) {
   1023 		struct json_object *duobj;
   1024 		json_array_add_disk_util(&du->dus, &du->agg, du_array);
   1025 		duobj = json_array_last_value_object(du_array);
   1026 		json_object_add_client_info(duobj, client);
   1027 	} else
   1028 		print_disk_util(&du->dus, &du->agg, output_format == FIO_OUTPUT_TERSE);
   1029 }
   1030 
   1031 static void convert_jobs_eta(struct jobs_eta *je)
   1032 {
   1033 	int i;
   1034 
   1035 	je->nr_running		= le32_to_cpu(je->nr_running);
   1036 	je->nr_ramp		= le32_to_cpu(je->nr_ramp);
   1037 	je->nr_pending		= le32_to_cpu(je->nr_pending);
   1038 	je->nr_setting_up	= le32_to_cpu(je->nr_setting_up);
   1039 	je->files_open		= le32_to_cpu(je->files_open);
   1040 
   1041 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
   1042 		je->m_rate[i]	= le32_to_cpu(je->m_rate[i]);
   1043 		je->t_rate[i]	= le32_to_cpu(je->t_rate[i]);
   1044 		je->m_iops[i]	= le32_to_cpu(je->m_iops[i]);
   1045 		je->t_iops[i]	= le32_to_cpu(je->t_iops[i]);
   1046 		je->rate[i]	= le32_to_cpu(je->rate[i]);
   1047 		je->iops[i]	= le32_to_cpu(je->iops[i]);
   1048 	}
   1049 
   1050 	je->elapsed_sec		= le64_to_cpu(je->elapsed_sec);
   1051 	je->eta_sec		= le64_to_cpu(je->eta_sec);
   1052 	je->nr_threads		= le32_to_cpu(je->nr_threads);
   1053 	je->is_pow2		= le32_to_cpu(je->is_pow2);
   1054 	je->unit_base		= le32_to_cpu(je->unit_base);
   1055 }
   1056 
   1057 void fio_client_sum_jobs_eta(struct jobs_eta *dst, struct jobs_eta *je)
   1058 {
   1059 	int i;
   1060 
   1061 	dst->nr_running		+= je->nr_running;
   1062 	dst->nr_ramp		+= je->nr_ramp;
   1063 	dst->nr_pending		+= je->nr_pending;
   1064 	dst->nr_setting_up	+= je->nr_setting_up;
   1065 	dst->files_open		+= je->files_open;
   1066 
   1067 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
   1068 		dst->m_rate[i]	+= je->m_rate[i];
   1069 		dst->t_rate[i]	+= je->t_rate[i];
   1070 		dst->m_iops[i]	+= je->m_iops[i];
   1071 		dst->t_iops[i]	+= je->t_iops[i];
   1072 		dst->rate[i]	+= je->rate[i];
   1073 		dst->iops[i]	+= je->iops[i];
   1074 	}
   1075 
   1076 	dst->elapsed_sec	+= je->elapsed_sec;
   1077 
   1078 	if (je->eta_sec > dst->eta_sec)
   1079 		dst->eta_sec = je->eta_sec;
   1080 
   1081 	dst->nr_threads		+= je->nr_threads;
   1082 
   1083 	/*
   1084 	 * This wont be correct for multiple strings, but at least it
   1085 	 * works for the basic cases.
   1086 	 */
   1087 	strcpy((char *) dst->run_str, (char *) je->run_str);
   1088 }
   1089 
   1090 void fio_client_dec_jobs_eta(struct client_eta *eta, client_eta_op eta_fn)
   1091 {
   1092 	if (!--eta->pending) {
   1093 		eta_fn(&eta->eta);
   1094 		free(eta);
   1095 	}
   1096 }
   1097 
   1098 static void remove_reply_cmd(struct fio_client *client, struct fio_net_cmd *cmd)
   1099 {
   1100 	struct fio_net_cmd_reply *reply = NULL;
   1101 	struct flist_head *entry;
   1102 
   1103 	flist_for_each(entry, &client->cmd_list) {
   1104 		reply = flist_entry(entry, struct fio_net_cmd_reply, list);
   1105 
   1106 		if (cmd->tag == (uintptr_t) reply)
   1107 			break;
   1108 
   1109 		reply = NULL;
   1110 	}
   1111 
   1112 	if (!reply) {
   1113 		log_err("fio: client: unable to find matching tag (%llx)\n", (unsigned long long) cmd->tag);
   1114 		return;
   1115 	}
   1116 
   1117 	flist_del(&reply->list);
   1118 	cmd->tag = reply->saved_tag;
   1119 	free(reply);
   1120 }
   1121 
   1122 int fio_client_wait_for_reply(struct fio_client *client, uint64_t tag)
   1123 {
   1124 	do {
   1125 		struct fio_net_cmd_reply *reply = NULL;
   1126 		struct flist_head *entry;
   1127 
   1128 		flist_for_each(entry, &client->cmd_list) {
   1129 			reply = flist_entry(entry, struct fio_net_cmd_reply, list);
   1130 
   1131 			if (tag == (uintptr_t) reply)
   1132 				break;
   1133 
   1134 			reply = NULL;
   1135 		}
   1136 
   1137 		if (!reply)
   1138 			break;
   1139 
   1140 		usleep(1000);
   1141 	} while (1);
   1142 
   1143 	return 0;
   1144 }
   1145 
   1146 static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd)
   1147 {
   1148 	struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
   1149 	struct client_eta *eta = (struct client_eta *) (uintptr_t) cmd->tag;
   1150 
   1151 	dprint(FD_NET, "client: got eta tag %p, %d\n", eta, eta->pending);
   1152 
   1153 	assert(client->eta_in_flight == eta);
   1154 
   1155 	client->eta_in_flight = NULL;
   1156 	flist_del_init(&client->eta_list);
   1157 
   1158 	if (client->ops->jobs_eta)
   1159 		client->ops->jobs_eta(client, je);
   1160 
   1161 	fio_client_sum_jobs_eta(&eta->eta, je);
   1162 	fio_client_dec_jobs_eta(eta, client->ops->eta);
   1163 }
   1164 
   1165 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
   1166 {
   1167 	struct cmd_probe_reply_pdu *probe = (struct cmd_probe_reply_pdu *) cmd->payload;
   1168 	const char *os, *arch;
   1169 	char bit[16];
   1170 
   1171 	os = fio_get_os_string(probe->os);
   1172 	if (!os)
   1173 		os = "unknown";
   1174 
   1175 	arch = fio_get_arch_string(probe->arch);
   1176 	if (!arch)
   1177 		os = "unknown";
   1178 
   1179 	sprintf(bit, "%d-bit", probe->bpp * 8);
   1180 	probe->flags = le64_to_cpu(probe->flags);
   1181 
   1182 	log_info("hostname=%s, be=%u, %s, os=%s, arch=%s, fio=%s, flags=%lx\n",
   1183 		probe->hostname, probe->bigendian, bit, os, arch,
   1184 		probe->fio_version, (unsigned long) probe->flags);
   1185 
   1186 	if (!client->name)
   1187 		client->name = strdup((char *) probe->hostname);
   1188 }
   1189 
   1190 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd)
   1191 {
   1192 	struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
   1193 
   1194 	client->state = Client_started;
   1195 	client->jobs = le32_to_cpu(pdu->jobs);
   1196 	client->nr_stat = le32_to_cpu(pdu->stat_outputs);
   1197 
   1198 	sum_stat_clients += client->nr_stat;
   1199 }
   1200 
   1201 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd)
   1202 {
   1203 	if (client->error)
   1204 		log_info("client <%s>: exited with error %d\n", client->hostname, client->error);
   1205 }
   1206 
   1207 static void convert_stop(struct fio_net_cmd *cmd)
   1208 {
   1209 	struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
   1210 
   1211 	pdu->error = le32_to_cpu(pdu->error);
   1212 }
   1213 
   1214 static void convert_text(struct fio_net_cmd *cmd)
   1215 {
   1216 	struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload;
   1217 
   1218 	pdu->level	= le32_to_cpu(pdu->level);
   1219 	pdu->buf_len	= le32_to_cpu(pdu->buf_len);
   1220 	pdu->log_sec	= le64_to_cpu(pdu->log_sec);
   1221 	pdu->log_usec	= le64_to_cpu(pdu->log_usec);
   1222 }
   1223 
   1224 static struct cmd_iolog_pdu *convert_iolog_gz(struct fio_net_cmd *cmd,
   1225 					      struct cmd_iolog_pdu *pdu)
   1226 {
   1227 #ifdef CONFIG_ZLIB
   1228 	struct cmd_iolog_pdu *ret;
   1229 	z_stream stream;
   1230 	uint32_t nr_samples;
   1231 	size_t total;
   1232 	void *p;
   1233 
   1234 	stream.zalloc = Z_NULL;
   1235 	stream.zfree = Z_NULL;
   1236 	stream.opaque = Z_NULL;
   1237 	stream.avail_in = 0;
   1238 	stream.next_in = Z_NULL;
   1239 
   1240 	if (inflateInit(&stream) != Z_OK)
   1241 		return NULL;
   1242 
   1243 	/*
   1244 	 * Get header first, it's not compressed
   1245 	 */
   1246 	nr_samples = le64_to_cpu(pdu->nr_samples);
   1247 
   1248 	total = nr_samples * __log_entry_sz(le32_to_cpu(pdu->log_offset));
   1249 	ret = malloc(total + sizeof(*pdu));
   1250 	ret->nr_samples = nr_samples;
   1251 
   1252 	memcpy(ret, pdu, sizeof(*pdu));
   1253 
   1254 	p = (void *) ret + sizeof(*pdu);
   1255 
   1256 	stream.avail_in = cmd->pdu_len - sizeof(*pdu);
   1257 	stream.next_in = (void *) pdu + sizeof(*pdu);
   1258 	while (stream.avail_in) {
   1259 		unsigned int this_chunk = 65536;
   1260 		unsigned int this_len;
   1261 		int err;
   1262 
   1263 		if (this_chunk > total)
   1264 			this_chunk = total;
   1265 
   1266 		stream.avail_out = this_chunk;
   1267 		stream.next_out = p;
   1268 		err = inflate(&stream, Z_NO_FLUSH);
   1269 		/* may be Z_OK, or Z_STREAM_END */
   1270 		if (err < 0) {
   1271 			log_err("fio: inflate error %d\n", err);
   1272 			free(ret);
   1273 			ret = NULL;
   1274 			goto err;
   1275 		}
   1276 
   1277 		this_len = this_chunk - stream.avail_out;
   1278 		p += this_len;
   1279 		total -= this_len;
   1280 	}
   1281 
   1282 err:
   1283 	inflateEnd(&stream);
   1284 	return ret;
   1285 #else
   1286 	return NULL;
   1287 #endif
   1288 }
   1289 
   1290 /*
   1291  * This has been compressed on the server side, since it can be big.
   1292  * Uncompress here.
   1293  */
   1294 static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd)
   1295 {
   1296 	struct cmd_iolog_pdu *pdu = (struct cmd_iolog_pdu *) cmd->payload;
   1297 	struct cmd_iolog_pdu *ret;
   1298 	uint64_t i;
   1299 	void *samples;
   1300 
   1301 	/*
   1302 	 * Convert if compressed and we support it. If it's not
   1303 	 * compressed, we need not do anything.
   1304 	 */
   1305 	if (le32_to_cpu(pdu->compressed)) {
   1306 #ifndef CONFIG_ZLIB
   1307 		log_err("fio: server sent compressed data by mistake\n");
   1308 		return NULL;
   1309 #endif
   1310 		ret = convert_iolog_gz(cmd, pdu);
   1311 		if (!ret) {
   1312 			log_err("fio: failed decompressing log\n");
   1313 			return NULL;
   1314 		}
   1315 	} else
   1316 		ret = pdu;
   1317 
   1318 	ret->nr_samples		= le64_to_cpu(ret->nr_samples);
   1319 	ret->thread_number	= le32_to_cpu(ret->thread_number);
   1320 	ret->log_type		= le32_to_cpu(ret->log_type);
   1321 	ret->compressed		= le32_to_cpu(ret->compressed);
   1322 	ret->log_offset		= le32_to_cpu(ret->log_offset);
   1323 
   1324 	samples = &ret->samples[0];
   1325 	for (i = 0; i < ret->nr_samples; i++) {
   1326 		struct io_sample *s;
   1327 
   1328 		s = __get_sample(samples, ret->log_offset, i);
   1329 		s->time		= le64_to_cpu(s->time);
   1330 		s->val		= le64_to_cpu(s->val);
   1331 		s->__ddir	= le32_to_cpu(s->__ddir);
   1332 		s->bs		= le32_to_cpu(s->bs);
   1333 
   1334 		if (ret->log_offset) {
   1335 			struct io_sample_offset *so = (void *) s;
   1336 
   1337 			so->offset = le64_to_cpu(so->offset);
   1338 		}
   1339 	}
   1340 
   1341 	return ret;
   1342 }
   1343 
   1344 static void sendfile_reply(int fd, struct cmd_sendfile_reply *rep,
   1345 			   size_t size, uint64_t tag)
   1346 {
   1347 	rep->error = cpu_to_le32(rep->error);
   1348 	fio_net_send_cmd(fd, FIO_NET_CMD_SENDFILE, rep, size, &tag, NULL);
   1349 }
   1350 
   1351 static int send_file(struct fio_client *client, struct cmd_sendfile *pdu,
   1352 		     uint64_t tag)
   1353 {
   1354 	struct cmd_sendfile_reply *rep;
   1355 	struct stat sb;
   1356 	size_t size;
   1357 	int fd;
   1358 
   1359 	size = sizeof(*rep);
   1360 	rep = malloc(size);
   1361 
   1362 	if (stat((char *)pdu->path, &sb) < 0) {
   1363 fail:
   1364 		rep->error = errno;
   1365 		sendfile_reply(client->fd, rep, size, tag);
   1366 		free(rep);
   1367 		return 1;
   1368 	}
   1369 
   1370 	size += sb.st_size;
   1371 	rep = realloc(rep, size);
   1372 	rep->size = cpu_to_le32((uint32_t) sb.st_size);
   1373 
   1374 	fd = open((char *)pdu->path, O_RDONLY);
   1375 	if (fd == -1 )
   1376 		goto fail;
   1377 
   1378 	rep->error = read_data(fd, &rep->data, sb.st_size);
   1379 	sendfile_reply(client->fd, rep, size, tag);
   1380 	free(rep);
   1381 	close(fd);
   1382 	return 0;
   1383 }
   1384 
   1385 int fio_handle_client(struct fio_client *client)
   1386 {
   1387 	struct client_ops *ops = client->ops;
   1388 	struct fio_net_cmd *cmd;
   1389 
   1390 	dprint(FD_NET, "client: handle %s\n", client->hostname);
   1391 
   1392 	cmd = fio_net_recv_cmd(client->fd);
   1393 	if (!cmd)
   1394 		return 0;
   1395 
   1396 	dprint(FD_NET, "client: got cmd op %s from %s (pdu=%u)\n",
   1397 		fio_server_op(cmd->opcode), client->hostname, cmd->pdu_len);
   1398 
   1399 	switch (cmd->opcode) {
   1400 	case FIO_NET_CMD_QUIT:
   1401 		if (ops->quit)
   1402 			ops->quit(client, cmd);
   1403 		remove_client(client);
   1404 		break;
   1405 	case FIO_NET_CMD_TEXT:
   1406 		convert_text(cmd);
   1407 		ops->text(client, cmd);
   1408 		break;
   1409 	case FIO_NET_CMD_DU: {
   1410 		struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
   1411 
   1412 		convert_dus(&du->dus);
   1413 		convert_agg(&du->agg);
   1414 
   1415 		ops->disk_util(client, cmd);
   1416 		break;
   1417 		}
   1418 	case FIO_NET_CMD_TS: {
   1419 		struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
   1420 
   1421 		convert_ts(&p->ts, &p->ts);
   1422 		convert_gs(&p->rs, &p->rs);
   1423 
   1424 		ops->thread_status(client, cmd);
   1425 		break;
   1426 		}
   1427 	case FIO_NET_CMD_GS: {
   1428 		struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
   1429 
   1430 		convert_gs(gs, gs);
   1431 
   1432 		ops->group_stats(client, cmd);
   1433 		break;
   1434 		}
   1435 	case FIO_NET_CMD_ETA: {
   1436 		struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
   1437 
   1438 		remove_reply_cmd(client, cmd);
   1439 		convert_jobs_eta(je);
   1440 		handle_eta(client, cmd);
   1441 		break;
   1442 		}
   1443 	case FIO_NET_CMD_PROBE:
   1444 		remove_reply_cmd(client, cmd);
   1445 		ops->probe(client, cmd);
   1446 		break;
   1447 	case FIO_NET_CMD_SERVER_START:
   1448 		client->state = Client_running;
   1449 		if (ops->job_start)
   1450 			ops->job_start(client, cmd);
   1451 		break;
   1452 	case FIO_NET_CMD_START: {
   1453 		struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
   1454 
   1455 		pdu->jobs = le32_to_cpu(pdu->jobs);
   1456 		ops->start(client, cmd);
   1457 		break;
   1458 		}
   1459 	case FIO_NET_CMD_STOP: {
   1460 		struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
   1461 
   1462 		convert_stop(cmd);
   1463 		client->state = Client_stopped;
   1464 		client->error = le32_to_cpu(pdu->error);
   1465 		client->signal = le32_to_cpu(pdu->signal);
   1466 		ops->stop(client, cmd);
   1467 		break;
   1468 		}
   1469 	case FIO_NET_CMD_ADD_JOB: {
   1470 		struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload;
   1471 
   1472 		client->thread_number = le32_to_cpu(pdu->thread_number);
   1473 		client->groupid = le32_to_cpu(pdu->groupid);
   1474 
   1475 		if (ops->add_job)
   1476 			ops->add_job(client, cmd);
   1477 		break;
   1478 		}
   1479 	case FIO_NET_CMD_IOLOG:
   1480 		if (ops->iolog) {
   1481 			struct cmd_iolog_pdu *pdu;
   1482 
   1483 			pdu = convert_iolog(cmd);
   1484 			ops->iolog(client, pdu);
   1485 		}
   1486 		break;
   1487 	case FIO_NET_CMD_UPDATE_JOB:
   1488 		ops->update_job(client, cmd);
   1489 		remove_reply_cmd(client, cmd);
   1490 		break;
   1491 	case FIO_NET_CMD_VTRIGGER: {
   1492 		struct all_io_list *pdu = (struct all_io_list *) cmd->payload;
   1493 		char buf[64];
   1494 
   1495 		__verify_save_state(pdu, server_name(client, buf, sizeof(buf)));
   1496 		exec_trigger(trigger_cmd);
   1497 		break;
   1498 		}
   1499 	case FIO_NET_CMD_SENDFILE: {
   1500 		struct cmd_sendfile *pdu = (struct cmd_sendfile *) cmd->payload;
   1501 		send_file(client, pdu, cmd->tag);
   1502 		break;
   1503 		}
   1504 	default:
   1505 		log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode));
   1506 		break;
   1507 	}
   1508 
   1509 	free(cmd);
   1510 	return 1;
   1511 }
   1512 
   1513 int fio_clients_send_trigger(const char *cmd)
   1514 {
   1515 	struct flist_head *entry;
   1516 	struct fio_client *client;
   1517 	size_t slen;
   1518 
   1519 	dprint(FD_NET, "client: send vtrigger: %s\n", cmd);
   1520 
   1521 	if (!cmd)
   1522 		slen = 0;
   1523 	else
   1524 		slen = strlen(cmd);
   1525 
   1526 	flist_for_each(entry, &client_list) {
   1527 		struct cmd_vtrigger_pdu *pdu;
   1528 
   1529 		client = flist_entry(entry, struct fio_client, list);
   1530 
   1531 		pdu = malloc(sizeof(*pdu) + slen);
   1532 		pdu->len = cpu_to_le16((uint16_t) slen);
   1533 		if (slen)
   1534 			memcpy(pdu->cmd, cmd, slen);
   1535 		fio_net_send_cmd(client->fd, FIO_NET_CMD_VTRIGGER, pdu,
   1536 					sizeof(*pdu) + slen, NULL, NULL);
   1537 		free(pdu);
   1538 	}
   1539 
   1540 	return 0;
   1541 }
   1542 
   1543 static void request_client_etas(struct client_ops *ops)
   1544 {
   1545 	struct fio_client *client;
   1546 	struct flist_head *entry;
   1547 	struct client_eta *eta;
   1548 	int skipped = 0;
   1549 
   1550 	dprint(FD_NET, "client: request eta (%d)\n", nr_clients);
   1551 
   1552 	eta = calloc(1, sizeof(*eta) + __THREAD_RUNSTR_SZ(REAL_MAX_JOBS));
   1553 	eta->pending = nr_clients;
   1554 
   1555 	flist_for_each(entry, &client_list) {
   1556 		client = flist_entry(entry, struct fio_client, list);
   1557 
   1558 		if (!flist_empty(&client->eta_list)) {
   1559 			skipped++;
   1560 			continue;
   1561 		}
   1562 		if (client->state != Client_running)
   1563 			continue;
   1564 
   1565 		assert(!client->eta_in_flight);
   1566 		flist_add_tail(&client->eta_list, &eta_list);
   1567 		client->eta_in_flight = eta;
   1568 		fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA,
   1569 					(uintptr_t) eta, &client->cmd_list);
   1570 	}
   1571 
   1572 	while (skipped--)
   1573 		fio_client_dec_jobs_eta(eta, ops->eta);
   1574 
   1575 	dprint(FD_NET, "client: requested eta tag %p\n", eta);
   1576 }
   1577 
   1578 static int client_check_cmd_timeout(struct fio_client *client,
   1579 				    struct timeval *now)
   1580 {
   1581 	struct fio_net_cmd_reply *reply;
   1582 	struct flist_head *entry, *tmp;
   1583 	int ret = 0;
   1584 
   1585 	flist_for_each_safe(entry, tmp, &client->cmd_list) {
   1586 		reply = flist_entry(entry, struct fio_net_cmd_reply, list);
   1587 
   1588 		if (mtime_since(&reply->tv, now) < FIO_NET_CLIENT_TIMEOUT)
   1589 			continue;
   1590 
   1591 		log_err("fio: client %s, timeout on cmd %s\n", client->hostname,
   1592 						fio_server_op(reply->opcode));
   1593 		flist_del(&reply->list);
   1594 		free(reply);
   1595 		ret = 1;
   1596 	}
   1597 
   1598 	return flist_empty(&client->cmd_list) && ret;
   1599 }
   1600 
   1601 static int fio_check_clients_timed_out(void)
   1602 {
   1603 	struct fio_client *client;
   1604 	struct flist_head *entry, *tmp;
   1605 	struct timeval tv;
   1606 	int ret = 0;
   1607 
   1608 	fio_gettime(&tv, NULL);
   1609 
   1610 	flist_for_each_safe(entry, tmp, &client_list) {
   1611 		client = flist_entry(entry, struct fio_client, list);
   1612 
   1613 		if (flist_empty(&client->cmd_list))
   1614 			continue;
   1615 
   1616 		if (!client_check_cmd_timeout(client, &tv))
   1617 			continue;
   1618 
   1619 		if (client->ops->timed_out)
   1620 			client->ops->timed_out(client);
   1621 		else
   1622 			log_err("fio: client %s timed out\n", client->hostname);
   1623 
   1624 		client->error = ETIMEDOUT;
   1625 		remove_client(client);
   1626 		ret = 1;
   1627 	}
   1628 
   1629 	return ret;
   1630 }
   1631 
   1632 int fio_handle_clients(struct client_ops *ops)
   1633 {
   1634 	struct pollfd *pfds;
   1635 	int i, ret = 0, retval = 0;
   1636 
   1637 	fio_gettime(&eta_tv, NULL);
   1638 
   1639 	pfds = malloc(nr_clients * sizeof(struct pollfd));
   1640 
   1641 	init_thread_stat(&client_ts);
   1642 	init_group_run_stat(&client_gs);
   1643 
   1644 	while (!exit_backend && nr_clients) {
   1645 		struct flist_head *entry, *tmp;
   1646 		struct fio_client *client;
   1647 
   1648 		i = 0;
   1649 		flist_for_each_safe(entry, tmp, &client_list) {
   1650 			client = flist_entry(entry, struct fio_client, list);
   1651 
   1652 			if (!client->sent_job && !client->ops->stay_connected &&
   1653 			    flist_empty(&client->cmd_list)) {
   1654 				remove_client(client);
   1655 				continue;
   1656 			}
   1657 
   1658 			pfds[i].fd = client->fd;
   1659 			pfds[i].events = POLLIN;
   1660 			i++;
   1661 		}
   1662 
   1663 		if (!nr_clients)
   1664 			break;
   1665 
   1666 		assert(i == nr_clients);
   1667 
   1668 		do {
   1669 			struct timeval tv;
   1670 			int timeout;
   1671 
   1672 			fio_gettime(&tv, NULL);
   1673 			if (mtime_since(&eta_tv, &tv) >= 900) {
   1674 				request_client_etas(ops);
   1675 				memcpy(&eta_tv, &tv, sizeof(tv));
   1676 
   1677 				if (fio_check_clients_timed_out())
   1678 					break;
   1679 			}
   1680 
   1681 			check_trigger_file();
   1682 
   1683 			timeout = min(100u, ops->eta_msec);
   1684 
   1685 			ret = poll(pfds, nr_clients, timeout);
   1686 			if (ret < 0) {
   1687 				if (errno == EINTR)
   1688 					continue;
   1689 				log_err("fio: poll clients: %s\n", strerror(errno));
   1690 				break;
   1691 			} else if (!ret)
   1692 				continue;
   1693 		} while (ret <= 0);
   1694 
   1695 		for (i = 0; i < nr_clients; i++) {
   1696 			if (!(pfds[i].revents & POLLIN))
   1697 				continue;
   1698 
   1699 			client = find_client_by_fd(pfds[i].fd);
   1700 			if (!client) {
   1701 				log_err("fio: unknown client fd %ld\n", (long) pfds[i].fd);
   1702 				continue;
   1703 			}
   1704 			if (!fio_handle_client(client)) {
   1705 				log_info("client: host=%s disconnected\n",
   1706 						client->hostname);
   1707 				remove_client(client);
   1708 				retval = 1;
   1709 			} else if (client->error)
   1710 				retval = 1;
   1711 			fio_put_client(client);
   1712 		}
   1713 	}
   1714 
   1715 	fio_client_json_fini();
   1716 
   1717 	free(pfds);
   1718 	return retval || error_clients;
   1719 }
   1720