Home | History | Annotate | Download | only in fio
      1 #include <stdio.h>
      2 #include <stdlib.h>
      3 #include <stdarg.h>
      4 #include <unistd.h>
      5 #include <limits.h>
      6 #include <errno.h>
      7 #include <sys/poll.h>
      8 #include <sys/types.h>
      9 #include <sys/wait.h>
     10 #include <sys/socket.h>
     11 #include <sys/stat.h>
     12 #include <sys/un.h>
     13 #include <sys/uio.h>
     14 #include <netinet/in.h>
     15 #include <arpa/inet.h>
     16 #include <netdb.h>
     17 #include <syslog.h>
     18 #include <signal.h>
     19 #ifdef CONFIG_ZLIB
     20 #include <zlib.h>
     21 #endif
     22 
     23 #include "fio.h"
     24 #include "server.h"
     25 #include "crc/crc16.h"
     26 #include "lib/ieee754.h"
     27 #include "verify.h"
     28 #include "smalloc.h"
     29 
     30 int fio_net_port = FIO_NET_PORT;
     31 
     32 int exit_backend = 0;
     33 
     34 static int server_fd = -1;
     35 static char *fio_server_arg;
     36 static char *bind_sock;
     37 static struct sockaddr_in saddr_in;
     38 static struct sockaddr_in6 saddr_in6;
     39 static int use_ipv6;
     40 #ifdef CONFIG_ZLIB
     41 static unsigned int has_zlib = 1;
     42 #else
     43 static unsigned int has_zlib = 0;
     44 #endif
     45 static unsigned int use_zlib;
     46 static char me[128];
     47 
     48 struct fio_fork_item {
     49 	struct flist_head list;
     50 	int exitval;
     51 	int signal;
     52 	int exited;
     53 	pid_t pid;
     54 };
     55 
     56 struct cmd_reply {
     57 	struct fio_mutex lock;
     58 	void *data;
     59 	size_t size;
     60 	int error;
     61 };
     62 
     63 static const char *fio_server_ops[FIO_NET_CMD_NR] = {
     64 	"",
     65 	"QUIT",
     66 	"EXIT",
     67 	"JOB",
     68 	"JOBLINE",
     69 	"TEXT",
     70 	"TS",
     71 	"GS",
     72 	"SEND_ETA",
     73 	"ETA",
     74 	"PROBE",
     75 	"START",
     76 	"STOP",
     77 	"DISK_UTIL",
     78 	"SERVER_START",
     79 	"ADD_JOB",
     80 	"RUN",
     81 	"IOLOG",
     82 	"UPDATE_JOB",
     83 	"LOAD_FILE",
     84 	"VTRIGGER",
     85 	"SENDFILE",
     86 };
     87 
     88 const char *fio_server_op(unsigned int op)
     89 {
     90 	static char buf[32];
     91 
     92 	if (op < FIO_NET_CMD_NR)
     93 		return fio_server_ops[op];
     94 
     95 	sprintf(buf, "UNKNOWN/%d", op);
     96 	return buf;
     97 }
     98 
     99 static ssize_t iov_total_len(const struct iovec *iov, int count)
    100 {
    101 	ssize_t ret = 0;
    102 
    103 	while (count--) {
    104 		ret += iov->iov_len;
    105 		iov++;
    106 	}
    107 
    108 	return ret;
    109 }
    110 
    111 static int fio_sendv_data(int sk, struct iovec *iov, int count)
    112 {
    113 	ssize_t total_len = iov_total_len(iov, count);
    114 	ssize_t ret;
    115 
    116 	do {
    117 		ret = writev(sk, iov, count);
    118 		if (ret > 0) {
    119 			total_len -= ret;
    120 			if (!total_len)
    121 				break;
    122 
    123 			while (ret) {
    124 				if (ret >= iov->iov_len) {
    125 					ret -= iov->iov_len;
    126 					iov++;
    127 					continue;
    128 				}
    129 				iov->iov_base += ret;
    130 				iov->iov_len -= ret;
    131 				ret = 0;
    132 			}
    133 		} else if (!ret)
    134 			break;
    135 		else if (errno == EAGAIN || errno == EINTR)
    136 			continue;
    137 		else
    138 			break;
    139 	} while (!exit_backend);
    140 
    141 	if (!total_len)
    142 		return 0;
    143 
    144 	if (errno)
    145 		return -errno;
    146 
    147 	return 1;
    148 }
    149 
    150 int fio_send_data(int sk, const void *p, unsigned int len)
    151 {
    152 	struct iovec iov = { .iov_base = (void *) p, .iov_len = len };
    153 
    154 	assert(len <= sizeof(struct fio_net_cmd) + FIO_SERVER_MAX_FRAGMENT_PDU);
    155 
    156 	return fio_sendv_data(sk, &iov, 1);
    157 }
    158 
    159 int fio_recv_data(int sk, void *p, unsigned int len)
    160 {
    161 	do {
    162 		int ret = recv(sk, p, len, MSG_WAITALL);
    163 
    164 		if (ret > 0) {
    165 			len -= ret;
    166 			if (!len)
    167 				break;
    168 			p += ret;
    169 			continue;
    170 		} else if (!ret)
    171 			break;
    172 		else if (errno == EAGAIN || errno == EINTR)
    173 			continue;
    174 		else
    175 			break;
    176 	} while (!exit_backend);
    177 
    178 	if (!len)
    179 		return 0;
    180 
    181 	return -1;
    182 }
    183 
    184 static int verify_convert_cmd(struct fio_net_cmd *cmd)
    185 {
    186 	uint16_t crc;
    187 
    188 	cmd->cmd_crc16 = le16_to_cpu(cmd->cmd_crc16);
    189 	cmd->pdu_crc16 = le16_to_cpu(cmd->pdu_crc16);
    190 
    191 	crc = fio_crc16(cmd, FIO_NET_CMD_CRC_SZ);
    192 	if (crc != cmd->cmd_crc16) {
    193 		log_err("fio: server bad crc on command (got %x, wanted %x)\n",
    194 				cmd->cmd_crc16, crc);
    195 		return 1;
    196 	}
    197 
    198 	cmd->version	= le16_to_cpu(cmd->version);
    199 	cmd->opcode	= le16_to_cpu(cmd->opcode);
    200 	cmd->flags	= le32_to_cpu(cmd->flags);
    201 	cmd->tag	= le64_to_cpu(cmd->tag);
    202 	cmd->pdu_len	= le32_to_cpu(cmd->pdu_len);
    203 
    204 	switch (cmd->version) {
    205 	case FIO_SERVER_VER:
    206 		break;
    207 	default:
    208 		log_err("fio: bad server cmd version %d\n", cmd->version);
    209 		return 1;
    210 	}
    211 
    212 	if (cmd->pdu_len > FIO_SERVER_MAX_FRAGMENT_PDU) {
    213 		log_err("fio: command payload too large: %u\n", cmd->pdu_len);
    214 		return 1;
    215 	}
    216 
    217 	return 0;
    218 }
    219 
    220 /*
    221  * Read (and defragment, if necessary) incoming commands
    222  */
    223 struct fio_net_cmd *fio_net_recv_cmd(int sk)
    224 {
    225 	struct fio_net_cmd cmd, *tmp, *cmdret = NULL;
    226 	size_t cmd_size = 0, pdu_offset = 0;
    227 	uint16_t crc;
    228 	int ret, first = 1;
    229 	void *pdu = NULL;
    230 
    231 	do {
    232 		ret = fio_recv_data(sk, &cmd, sizeof(cmd));
    233 		if (ret)
    234 			break;
    235 
    236 		/* We have a command, verify it and swap if need be */
    237 		ret = verify_convert_cmd(&cmd);
    238 		if (ret)
    239 			break;
    240 
    241 		if (first) {
    242 			/* if this is text, add room for \0 at the end */
    243 			cmd_size = sizeof(cmd) + cmd.pdu_len + 1;
    244 			assert(!cmdret);
    245 		} else
    246 			cmd_size += cmd.pdu_len;
    247 
    248 		if (cmd_size / 1024 > FIO_SERVER_MAX_CMD_MB * 1024) {
    249 			log_err("fio: cmd+pdu too large (%llu)\n", (unsigned long long) cmd_size);
    250 			ret = 1;
    251 			break;
    252 		}
    253 
    254 		tmp = realloc(cmdret, cmd_size);
    255 		if (!tmp) {
    256 			log_err("fio: server failed allocating cmd\n");
    257 			ret = 1;
    258 			break;
    259 		}
    260 		cmdret = tmp;
    261 
    262 		if (first)
    263 			memcpy(cmdret, &cmd, sizeof(cmd));
    264 		else if (cmdret->opcode != cmd.opcode) {
    265 			log_err("fio: fragment opcode mismatch (%d != %d)\n",
    266 					cmdret->opcode, cmd.opcode);
    267 			ret = 1;
    268 			break;
    269 		}
    270 
    271 		if (!cmd.pdu_len)
    272 			break;
    273 
    274 		/* There's payload, get it */
    275 		pdu = (void *) cmdret->payload + pdu_offset;
    276 		ret = fio_recv_data(sk, pdu, cmd.pdu_len);
    277 		if (ret)
    278 			break;
    279 
    280 		/* Verify payload crc */
    281 		crc = fio_crc16(pdu, cmd.pdu_len);
    282 		if (crc != cmd.pdu_crc16) {
    283 			log_err("fio: server bad crc on payload ");
    284 			log_err("(got %x, wanted %x)\n", cmd.pdu_crc16, crc);
    285 			ret = 1;
    286 			break;
    287 		}
    288 
    289 		pdu_offset += cmd.pdu_len;
    290 		if (!first)
    291 			cmdret->pdu_len += cmd.pdu_len;
    292 		first = 0;
    293 	} while (cmd.flags & FIO_NET_CMD_F_MORE);
    294 
    295 	if (ret) {
    296 		free(cmdret);
    297 		cmdret = NULL;
    298 	} else if (cmdret) {
    299 		/* zero-terminate text input */
    300 		if (cmdret->pdu_len) {
    301 			if (cmdret->opcode == FIO_NET_CMD_TEXT) {
    302 				struct cmd_text_pdu *__pdu = (struct cmd_text_pdu *) cmdret->payload;
    303 				char *buf = (char *) __pdu->buf;
    304 
    305 				buf[__pdu->buf_len] = '\0';
    306 			} else if (cmdret->opcode == FIO_NET_CMD_JOB) {
    307 				struct cmd_job_pdu *__pdu = (struct cmd_job_pdu *) cmdret->payload;
    308 				char *buf = (char *) __pdu->buf;
    309 				int len = le32_to_cpu(__pdu->buf_len);
    310 
    311 				buf[len] = '\0';
    312 			}
    313 		}
    314 
    315 		/* frag flag is internal */
    316 		cmdret->flags &= ~FIO_NET_CMD_F_MORE;
    317 	}
    318 
    319 	return cmdret;
    320 }
    321 
    322 static void add_reply(uint64_t tag, struct flist_head *list)
    323 {
    324 	struct fio_net_cmd_reply *reply;
    325 
    326 	reply = (struct fio_net_cmd_reply *) (uintptr_t) tag;
    327 	flist_add_tail(&reply->list, list);
    328 }
    329 
    330 static uint64_t alloc_reply(uint64_t tag, uint16_t opcode)
    331 {
    332 	struct fio_net_cmd_reply *reply;
    333 
    334 	reply = calloc(1, sizeof(*reply));
    335 	INIT_FLIST_HEAD(&reply->list);
    336 	fio_gettime(&reply->tv, NULL);
    337 	reply->saved_tag = tag;
    338 	reply->opcode = opcode;
    339 
    340 	return (uintptr_t) reply;
    341 }
    342 
    343 static void free_reply(uint64_t tag)
    344 {
    345 	struct fio_net_cmd_reply *reply;
    346 
    347 	reply = (struct fio_net_cmd_reply *) (uintptr_t) tag;
    348 	free(reply);
    349 }
    350 
    351 void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu)
    352 {
    353 	uint32_t pdu_len;
    354 
    355 	cmd->cmd_crc16 = __cpu_to_le16(fio_crc16(cmd, FIO_NET_CMD_CRC_SZ));
    356 
    357 	pdu_len = le32_to_cpu(cmd->pdu_len);
    358 	cmd->pdu_crc16 = __cpu_to_le16(fio_crc16(pdu, pdu_len));
    359 }
    360 
    361 void fio_net_cmd_crc(struct fio_net_cmd *cmd)
    362 {
    363 	fio_net_cmd_crc_pdu(cmd, cmd->payload);
    364 }
    365 
    366 int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size,
    367 		     uint64_t *tagptr, struct flist_head *list)
    368 {
    369 	struct fio_net_cmd *cmd = NULL;
    370 	size_t this_len, cur_len = 0;
    371 	uint64_t tag;
    372 	int ret;
    373 
    374 	if (list) {
    375 		assert(tagptr);
    376 		tag = *tagptr = alloc_reply(*tagptr, opcode);
    377 	} else
    378 		tag = tagptr ? *tagptr : 0;
    379 
    380 	do {
    381 		this_len = size;
    382 		if (this_len > FIO_SERVER_MAX_FRAGMENT_PDU)
    383 			this_len = FIO_SERVER_MAX_FRAGMENT_PDU;
    384 
    385 		if (!cmd || cur_len < sizeof(*cmd) + this_len) {
    386 			if (cmd)
    387 				free(cmd);
    388 
    389 			cur_len = sizeof(*cmd) + this_len;
    390 			cmd = malloc(cur_len);
    391 		}
    392 
    393 		fio_init_net_cmd(cmd, opcode, buf, this_len, tag);
    394 
    395 		if (this_len < size)
    396 			cmd->flags = __cpu_to_le32(FIO_NET_CMD_F_MORE);
    397 
    398 		fio_net_cmd_crc(cmd);
    399 
    400 		ret = fio_send_data(fd, cmd, sizeof(*cmd) + this_len);
    401 		size -= this_len;
    402 		buf += this_len;
    403 	} while (!ret && size);
    404 
    405 	if (list) {
    406 		if (ret)
    407 			free_reply(tag);
    408 		else
    409 			add_reply(tag, list);
    410 	}
    411 
    412 	if (cmd)
    413 		free(cmd);
    414 
    415 	return ret;
    416 }
    417 
    418 static int fio_net_send_simple_stack_cmd(int sk, uint16_t opcode, uint64_t tag)
    419 {
    420 	struct fio_net_cmd cmd;
    421 
    422 	fio_init_net_cmd(&cmd, opcode, NULL, 0, tag);
    423 	fio_net_cmd_crc(&cmd);
    424 
    425 	return fio_send_data(sk, &cmd, sizeof(cmd));
    426 }
    427 
    428 /*
    429  * If 'list' is non-NULL, then allocate and store the sent command for
    430  * later verification.
    431  */
    432 int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag,
    433 			    struct flist_head *list)
    434 {
    435 	int ret;
    436 
    437 	if (list)
    438 		tag = alloc_reply(tag, opcode);
    439 
    440 	ret = fio_net_send_simple_stack_cmd(sk, opcode, tag);
    441 	if (ret) {
    442 		if (list)
    443 			free_reply(tag);
    444 
    445 		return ret;
    446 	}
    447 
    448 	if (list)
    449 		add_reply(tag, list);
    450 
    451 	return 0;
    452 }
    453 
    454 int fio_net_send_quit(int sk)
    455 {
    456 	dprint(FD_NET, "server: sending quit\n");
    457 
    458 	return fio_net_send_simple_cmd(sk, FIO_NET_CMD_QUIT, 0, NULL);
    459 }
    460 
    461 static int fio_net_send_ack(int sk, struct fio_net_cmd *cmd, int error,
    462 			    int signal)
    463 {
    464 	struct cmd_end_pdu epdu;
    465 	uint64_t tag = 0;
    466 
    467 	if (cmd)
    468 		tag = cmd->tag;
    469 
    470 	epdu.error = __cpu_to_le32(error);
    471 	epdu.signal = __cpu_to_le32(signal);
    472 	return fio_net_send_cmd(sk, FIO_NET_CMD_STOP, &epdu, sizeof(epdu), &tag, NULL);
    473 }
    474 
    475 int fio_net_send_stop(int sk, int error, int signal)
    476 {
    477 	dprint(FD_NET, "server: sending stop (%d, %d)\n", error, signal);
    478 	return fio_net_send_ack(sk, NULL, error, signal);
    479 }
    480 
    481 static void fio_server_add_fork_item(pid_t pid, struct flist_head *list)
    482 {
    483 	struct fio_fork_item *ffi;
    484 
    485 	ffi = malloc(sizeof(*ffi));
    486 	ffi->exitval = 0;
    487 	ffi->signal = 0;
    488 	ffi->exited = 0;
    489 	ffi->pid = pid;
    490 	flist_add_tail(&ffi->list, list);
    491 }
    492 
    493 static void fio_server_add_conn_pid(struct flist_head *conn_list, pid_t pid)
    494 {
    495 	dprint(FD_NET, "server: forked off connection job (pid=%u)\n", (int) pid);
    496 	fio_server_add_fork_item(pid, conn_list);
    497 }
    498 
    499 static void fio_server_add_job_pid(struct flist_head *job_list, pid_t pid)
    500 {
    501 	dprint(FD_NET, "server: forked off job job (pid=%u)\n", (int) pid);
    502 	fio_server_add_fork_item(pid, job_list);
    503 }
    504 
    505 static void fio_server_check_fork_item(struct fio_fork_item *ffi)
    506 {
    507 	int ret, status;
    508 
    509 	ret = waitpid(ffi->pid, &status, WNOHANG);
    510 	if (ret < 0) {
    511 		if (errno == ECHILD) {
    512 			log_err("fio: connection pid %u disappeared\n", (int) ffi->pid);
    513 			ffi->exited = 1;
    514 		} else
    515 			log_err("fio: waitpid: %s\n", strerror(errno));
    516 	} else if (ret == ffi->pid) {
    517 		if (WIFSIGNALED(status)) {
    518 			ffi->signal = WTERMSIG(status);
    519 			ffi->exited = 1;
    520 		}
    521 		if (WIFEXITED(status)) {
    522 			if (WEXITSTATUS(status))
    523 				ffi->exitval = WEXITSTATUS(status);
    524 			ffi->exited = 1;
    525 		}
    526 	}
    527 }
    528 
    529 static void fio_server_fork_item_done(struct fio_fork_item *ffi)
    530 {
    531 	dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int) ffi->pid, ffi->signal, ffi->exitval);
    532 
    533 	/*
    534 	 * Fold STOP and QUIT...
    535 	 */
    536 	fio_net_send_stop(server_fd, ffi->exitval, ffi->signal);
    537 	fio_net_send_quit(server_fd);
    538 	flist_del(&ffi->list);
    539 	free(ffi);
    540 }
    541 
    542 static void fio_server_check_fork_items(struct flist_head *list)
    543 {
    544 	struct flist_head *entry, *tmp;
    545 	struct fio_fork_item *ffi;
    546 
    547 	flist_for_each_safe(entry, tmp, list) {
    548 		ffi = flist_entry(entry, struct fio_fork_item, list);
    549 
    550 		fio_server_check_fork_item(ffi);
    551 
    552 		if (ffi->exited)
    553 			fio_server_fork_item_done(ffi);
    554 	}
    555 }
    556 
    557 static void fio_server_check_jobs(struct flist_head *job_list)
    558 {
    559 	fio_server_check_fork_items(job_list);
    560 }
    561 
    562 static void fio_server_check_conns(struct flist_head *conn_list)
    563 {
    564 	fio_server_check_fork_items(conn_list);
    565 }
    566 
    567 static int handle_load_file_cmd(struct fio_net_cmd *cmd)
    568 {
    569 	struct cmd_load_file_pdu *pdu = (struct cmd_load_file_pdu *) cmd->payload;
    570 	void *file_name = pdu->file;
    571 	struct cmd_start_pdu spdu;
    572 
    573 	dprint(FD_NET, "server: loading local file %s\n", (char *) file_name);
    574 
    575 	pdu->name_len = le16_to_cpu(pdu->name_len);
    576 	pdu->client_type = le16_to_cpu(pdu->client_type);
    577 
    578 	if (parse_jobs_ini(file_name, 0, 0, pdu->client_type)) {
    579 		fio_net_send_quit(server_fd);
    580 		return -1;
    581 	}
    582 
    583 	spdu.jobs = cpu_to_le32(thread_number);
    584 	spdu.stat_outputs = cpu_to_le32(stat_number);
    585 	fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL);
    586 	return 0;
    587 }
    588 
    589 static int handle_run_cmd(struct flist_head *job_list, struct fio_net_cmd *cmd)
    590 {
    591 	pid_t pid;
    592 	int ret;
    593 
    594 	fio_time_init();
    595 	set_genesis_time();
    596 
    597 	pid = fork();
    598 	if (pid) {
    599 		fio_server_add_job_pid(job_list, pid);
    600 		return 0;
    601 	}
    602 
    603 	ret = fio_backend();
    604 	free_threads_shm();
    605 	_exit(ret);
    606 }
    607 
    608 static int handle_job_cmd(struct fio_net_cmd *cmd)
    609 {
    610 	struct cmd_job_pdu *pdu = (struct cmd_job_pdu *) cmd->payload;
    611 	void *buf = pdu->buf;
    612 	struct cmd_start_pdu spdu;
    613 
    614 	pdu->buf_len = le32_to_cpu(pdu->buf_len);
    615 	pdu->client_type = le32_to_cpu(pdu->client_type);
    616 
    617 	if (parse_jobs_ini(buf, 1, 0, pdu->client_type)) {
    618 		fio_net_send_quit(server_fd);
    619 		return -1;
    620 	}
    621 
    622 	spdu.jobs = cpu_to_le32(thread_number);
    623 	spdu.stat_outputs = cpu_to_le32(stat_number);
    624 	fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL);
    625 	return 0;
    626 }
    627 
    628 static int handle_jobline_cmd(struct fio_net_cmd *cmd)
    629 {
    630 	void *pdu = cmd->payload;
    631 	struct cmd_single_line_pdu *cslp;
    632 	struct cmd_line_pdu *clp;
    633 	unsigned long offset;
    634 	struct cmd_start_pdu spdu;
    635 	char **argv;
    636 	int i;
    637 
    638 	clp = pdu;
    639 	clp->lines = le16_to_cpu(clp->lines);
    640 	clp->client_type = le16_to_cpu(clp->client_type);
    641 	argv = malloc(clp->lines * sizeof(char *));
    642 	offset = sizeof(*clp);
    643 
    644 	dprint(FD_NET, "server: %d command line args\n", clp->lines);
    645 
    646 	for (i = 0; i < clp->lines; i++) {
    647 		cslp = pdu + offset;
    648 		argv[i] = (char *) cslp->text;
    649 
    650 		offset += sizeof(*cslp) + le16_to_cpu(cslp->len);
    651 		dprint(FD_NET, "server: %d: %s\n", i, argv[i]);
    652 	}
    653 
    654 	if (parse_cmd_line(clp->lines, argv, clp->client_type)) {
    655 		fio_net_send_quit(server_fd);
    656 		free(argv);
    657 		return -1;
    658 	}
    659 
    660 	free(argv);
    661 
    662 	spdu.jobs = cpu_to_le32(thread_number);
    663 	spdu.stat_outputs = cpu_to_le32(stat_number);
    664 	fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL);
    665 	return 0;
    666 }
    667 
    668 static int handle_probe_cmd(struct fio_net_cmd *cmd)
    669 {
    670 	struct cmd_client_probe_pdu *pdu = (struct cmd_client_probe_pdu *) cmd->payload;
    671 	struct cmd_probe_reply_pdu probe;
    672 	uint64_t tag = cmd->tag;
    673 
    674 	dprint(FD_NET, "server: sending probe reply\n");
    675 
    676 	strcpy(me, (char *) pdu->server);
    677 
    678 	memset(&probe, 0, sizeof(probe));
    679 	gethostname((char *) probe.hostname, sizeof(probe.hostname));
    680 #ifdef CONFIG_BIG_ENDIAN
    681 	probe.bigendian = 1;
    682 #endif
    683 	strncpy((char *) probe.fio_version, fio_version_string, sizeof(probe.fio_version));
    684 
    685 	probe.os	= FIO_OS;
    686 	probe.arch	= FIO_ARCH;
    687 	probe.bpp	= sizeof(void *);
    688 	probe.cpus	= __cpu_to_le32(cpus_online());
    689 
    690 	/*
    691 	 * If the client supports compression and we do too, then enable it
    692 	 */
    693 	if (has_zlib && le64_to_cpu(pdu->flags) & FIO_PROBE_FLAG_ZLIB) {
    694 		probe.flags = __cpu_to_le64(FIO_PROBE_FLAG_ZLIB);
    695 		use_zlib = 1;
    696 	} else {
    697 		probe.flags = 0;
    698 		use_zlib = 0;
    699 	}
    700 
    701 	return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), &tag, NULL);
    702 }
    703 
    704 static int handle_send_eta_cmd(struct fio_net_cmd *cmd)
    705 {
    706 	struct jobs_eta *je;
    707 	uint64_t tag = cmd->tag;
    708 	size_t size;
    709 	int i;
    710 
    711 	je = get_jobs_eta(1, &size);
    712 	if (!je)
    713 		return 0;
    714 
    715 	dprint(FD_NET, "server sending status\n");
    716 
    717 	je->nr_running		= cpu_to_le32(je->nr_running);
    718 	je->nr_ramp		= cpu_to_le32(je->nr_ramp);
    719 	je->nr_pending		= cpu_to_le32(je->nr_pending);
    720 	je->nr_setting_up	= cpu_to_le32(je->nr_setting_up);
    721 	je->files_open		= cpu_to_le32(je->files_open);
    722 
    723 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
    724 		je->m_rate[i]	= cpu_to_le32(je->m_rate[i]);
    725 		je->t_rate[i]	= cpu_to_le32(je->t_rate[i]);
    726 		je->m_iops[i]	= cpu_to_le32(je->m_iops[i]);
    727 		je->t_iops[i]	= cpu_to_le32(je->t_iops[i]);
    728 		je->rate[i]	= cpu_to_le32(je->rate[i]);
    729 		je->iops[i]	= cpu_to_le32(je->iops[i]);
    730 	}
    731 
    732 	je->elapsed_sec		= cpu_to_le64(je->elapsed_sec);
    733 	je->eta_sec		= cpu_to_le64(je->eta_sec);
    734 	je->nr_threads		= cpu_to_le32(je->nr_threads);
    735 	je->is_pow2		= cpu_to_le32(je->is_pow2);
    736 	je->unit_base		= cpu_to_le32(je->unit_base);
    737 
    738 	fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, je, size, &tag, NULL);
    739 	free(je);
    740 	return 0;
    741 }
    742 
    743 static int send_update_job_reply(int fd, uint64_t __tag, int error)
    744 {
    745 	uint64_t tag = __tag;
    746 	uint32_t pdu_error;
    747 
    748 	pdu_error = __cpu_to_le32(error);
    749 	return fio_net_send_cmd(fd, FIO_NET_CMD_UPDATE_JOB, &pdu_error, sizeof(pdu_error), &tag, NULL);
    750 }
    751 
    752 static int handle_update_job_cmd(struct fio_net_cmd *cmd)
    753 {
    754 	struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload;
    755 	struct thread_data *td;
    756 	uint32_t tnumber;
    757 
    758 	tnumber = le32_to_cpu(pdu->thread_number);
    759 
    760 	dprint(FD_NET, "server: updating options for job %u\n", tnumber);
    761 
    762 	if (!tnumber || tnumber > thread_number) {
    763 		send_update_job_reply(server_fd, cmd->tag, ENODEV);
    764 		return 0;
    765 	}
    766 
    767 	td = &threads[tnumber - 1];
    768 	convert_thread_options_to_cpu(&td->o, &pdu->top);
    769 	send_update_job_reply(server_fd, cmd->tag, 0);
    770 	return 0;
    771 }
    772 
    773 static int handle_trigger_cmd(struct fio_net_cmd *cmd)
    774 {
    775 	struct cmd_vtrigger_pdu *pdu = (struct cmd_vtrigger_pdu *) cmd->payload;
    776 	char *buf = (char *) pdu->cmd;
    777 	struct all_io_list *rep;
    778 	size_t sz;
    779 
    780 	pdu->len = le16_to_cpu(pdu->len);
    781 	buf[pdu->len] = '\0';
    782 
    783 	rep = get_all_io_list(IO_LIST_ALL, &sz);
    784 	if (!rep) {
    785 		struct all_io_list state;
    786 
    787 		state.threads = cpu_to_le64((uint64_t) 0);
    788 		fio_net_send_cmd(server_fd, FIO_NET_CMD_VTRIGGER, &state, sizeof(state), NULL, NULL);
    789 	} else {
    790 		fio_net_send_cmd(server_fd, FIO_NET_CMD_VTRIGGER, rep, sz, NULL, NULL);
    791 		free(rep);
    792 	}
    793 
    794 	exec_trigger(buf);
    795 	return 0;
    796 }
    797 
    798 static int handle_command(struct flist_head *job_list, struct fio_net_cmd *cmd)
    799 {
    800 	int ret;
    801 
    802 	dprint(FD_NET, "server: got op [%s], pdu=%u, tag=%llx\n",
    803 			fio_server_op(cmd->opcode), cmd->pdu_len,
    804 			(unsigned long long) cmd->tag);
    805 
    806 	switch (cmd->opcode) {
    807 	case FIO_NET_CMD_QUIT:
    808 		fio_terminate_threads(TERMINATE_ALL);
    809 		return -1;
    810 	case FIO_NET_CMD_EXIT:
    811 		exit_backend = 1;
    812 		return -1;
    813 	case FIO_NET_CMD_LOAD_FILE:
    814 		ret = handle_load_file_cmd(cmd);
    815 		break;
    816 	case FIO_NET_CMD_JOB:
    817 		ret = handle_job_cmd(cmd);
    818 		break;
    819 	case FIO_NET_CMD_JOBLINE:
    820 		ret = handle_jobline_cmd(cmd);
    821 		break;
    822 	case FIO_NET_CMD_PROBE:
    823 		ret = handle_probe_cmd(cmd);
    824 		break;
    825 	case FIO_NET_CMD_SEND_ETA:
    826 		ret = handle_send_eta_cmd(cmd);
    827 		break;
    828 	case FIO_NET_CMD_RUN:
    829 		ret = handle_run_cmd(job_list, cmd);
    830 		break;
    831 	case FIO_NET_CMD_UPDATE_JOB:
    832 		ret = handle_update_job_cmd(cmd);
    833 		break;
    834 	case FIO_NET_CMD_VTRIGGER:
    835 		ret = handle_trigger_cmd(cmd);
    836 		break;
    837 	case FIO_NET_CMD_SENDFILE: {
    838 		struct cmd_sendfile_reply *in;
    839 		struct cmd_reply *rep;
    840 
    841 		rep = (struct cmd_reply *) (uintptr_t) cmd->tag;
    842 
    843 		in = (struct cmd_sendfile_reply *) cmd->payload;
    844 		in->size = le32_to_cpu(in->size);
    845 		in->error = le32_to_cpu(in->error);
    846 		if (in->error) {
    847 			ret = 1;
    848 			rep->error = in->error;
    849 		} else {
    850 			ret = 0;
    851 			rep->data = smalloc(in->size);
    852 			if (!rep->data) {
    853 				ret = 1;
    854 				rep->error = ENOMEM;
    855 			} else {
    856 				rep->size = in->size;
    857 				memcpy(rep->data, in->data, in->size);
    858 			}
    859 		}
    860 		fio_mutex_up(&rep->lock);
    861 		break;
    862 		}
    863 	default:
    864 		log_err("fio: unknown opcode: %s\n", fio_server_op(cmd->opcode));
    865 		ret = 1;
    866 	}
    867 
    868 	return ret;
    869 }
    870 
    871 static int handle_connection(int sk)
    872 {
    873 	struct fio_net_cmd *cmd = NULL;
    874 	FLIST_HEAD(job_list);
    875 	int ret = 0;
    876 
    877 	reset_fio_state();
    878 	server_fd = sk;
    879 
    880 	/* read forever */
    881 	while (!exit_backend) {
    882 		struct pollfd pfd = {
    883 			.fd	= sk,
    884 			.events	= POLLIN,
    885 		};
    886 
    887 		ret = 0;
    888 		do {
    889 			int timeout = 1000;
    890 
    891 			if (!flist_empty(&job_list))
    892 				timeout = 100;
    893 
    894 			ret = poll(&pfd, 1, timeout);
    895 			if (ret < 0) {
    896 				if (errno == EINTR)
    897 					break;
    898 				log_err("fio: poll: %s\n", strerror(errno));
    899 				break;
    900 			} else if (!ret) {
    901 				fio_server_check_jobs(&job_list);
    902 				continue;
    903 			}
    904 
    905 			if (pfd.revents & POLLIN)
    906 				break;
    907 			if (pfd.revents & (POLLERR|POLLHUP)) {
    908 				ret = 1;
    909 				break;
    910 			}
    911 		} while (!exit_backend);
    912 
    913 		fio_server_check_jobs(&job_list);
    914 
    915 		if (ret < 0)
    916 			break;
    917 
    918 		cmd = fio_net_recv_cmd(sk);
    919 		if (!cmd) {
    920 			ret = -1;
    921 			break;
    922 		}
    923 
    924 		ret = handle_command(&job_list, cmd);
    925 		if (ret)
    926 			break;
    927 
    928 		free(cmd);
    929 		cmd = NULL;
    930 	}
    931 
    932 	if (cmd)
    933 		free(cmd);
    934 
    935 	close(sk);
    936 	_exit(ret);
    937 }
    938 
    939 static int accept_loop(int listen_sk)
    940 {
    941 	struct sockaddr_in addr;
    942 	struct sockaddr_in6 addr6;
    943 	socklen_t len = use_ipv6 ? sizeof(addr6) : sizeof(addr);
    944 	struct pollfd pfd;
    945 	int ret = 0, sk, exitval = 0;
    946 	FLIST_HEAD(conn_list);
    947 
    948 	dprint(FD_NET, "server enter accept loop\n");
    949 
    950 	fio_set_fd_nonblocking(listen_sk, "server");
    951 
    952 	while (!exit_backend) {
    953 		const char *from;
    954 		char buf[64];
    955 		pid_t pid;
    956 
    957 		pfd.fd = listen_sk;
    958 		pfd.events = POLLIN;
    959 		do {
    960 			int timeout = 1000;
    961 
    962 			if (!flist_empty(&conn_list))
    963 				timeout = 100;
    964 
    965 			ret = poll(&pfd, 1, timeout);
    966 			if (ret < 0) {
    967 				if (errno == EINTR)
    968 					break;
    969 				log_err("fio: poll: %s\n", strerror(errno));
    970 				break;
    971 			} else if (!ret) {
    972 				fio_server_check_conns(&conn_list);
    973 				continue;
    974 			}
    975 
    976 			if (pfd.revents & POLLIN)
    977 				break;
    978 		} while (!exit_backend);
    979 
    980 		fio_server_check_conns(&conn_list);
    981 
    982 		if (exit_backend || ret < 0)
    983 			break;
    984 
    985 		if (use_ipv6)
    986 			sk = accept(listen_sk, (struct sockaddr *) &addr6, &len);
    987 		else
    988 			sk = accept(listen_sk, (struct sockaddr *) &addr, &len);
    989 
    990 		if (sk < 0) {
    991 			log_err("fio: accept: %s\n", strerror(errno));
    992 			return -1;
    993 		}
    994 
    995 		if (use_ipv6)
    996 			from = inet_ntop(AF_INET6, (struct sockaddr *) &addr6.sin6_addr, buf, sizeof(buf));
    997 		else
    998 			from = inet_ntop(AF_INET, (struct sockaddr *) &addr.sin_addr, buf, sizeof(buf));
    999 
   1000 		dprint(FD_NET, "server: connect from %s\n", from);
   1001 
   1002 		pid = fork();
   1003 		if (pid) {
   1004 			close(sk);
   1005 			fio_server_add_conn_pid(&conn_list, pid);
   1006 			continue;
   1007 		}
   1008 
   1009 		/* exits */
   1010 		handle_connection(sk);
   1011 	}
   1012 
   1013 	return exitval;
   1014 }
   1015 
   1016 int fio_server_text_output(int level, const char *buf, size_t len)
   1017 {
   1018 	struct cmd_text_pdu *pdu;
   1019 	unsigned int tlen;
   1020 	struct timeval tv;
   1021 
   1022 	if (server_fd == -1)
   1023 		return log_local_buf(buf, len);
   1024 
   1025 	tlen = sizeof(*pdu) + len;
   1026 	pdu = malloc(tlen);
   1027 
   1028 	pdu->level	= __cpu_to_le32(level);
   1029 	pdu->buf_len	= __cpu_to_le32(len);
   1030 
   1031 	gettimeofday(&tv, NULL);
   1032 	pdu->log_sec	= __cpu_to_le64(tv.tv_sec);
   1033 	pdu->log_usec	= __cpu_to_le64(tv.tv_usec);
   1034 
   1035 	memcpy(pdu->buf, buf, len);
   1036 
   1037 	fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, pdu, tlen, NULL, NULL);
   1038 	free(pdu);
   1039 	return len;
   1040 }
   1041 
   1042 static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
   1043 {
   1044 	dst->max_val	= cpu_to_le64(src->max_val);
   1045 	dst->min_val	= cpu_to_le64(src->min_val);
   1046 	dst->samples	= cpu_to_le64(src->samples);
   1047 
   1048 	/*
   1049 	 * Encode to IEEE 754 for network transfer
   1050 	 */
   1051 	dst->mean.u.i	= cpu_to_le64(fio_double_to_uint64(src->mean.u.f));
   1052 	dst->S.u.i	= cpu_to_le64(fio_double_to_uint64(src->S.u.f));
   1053 }
   1054 
   1055 static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
   1056 {
   1057 	int i;
   1058 
   1059 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
   1060 		dst->max_run[i]		= cpu_to_le64(src->max_run[i]);
   1061 		dst->min_run[i]		= cpu_to_le64(src->min_run[i]);
   1062 		dst->max_bw[i]		= cpu_to_le64(src->max_bw[i]);
   1063 		dst->min_bw[i]		= cpu_to_le64(src->min_bw[i]);
   1064 		dst->io_kb[i]		= cpu_to_le64(src->io_kb[i]);
   1065 		dst->agg[i]		= cpu_to_le64(src->agg[i]);
   1066 	}
   1067 
   1068 	dst->kb_base	= cpu_to_le32(src->kb_base);
   1069 	dst->unit_base	= cpu_to_le32(src->unit_base);
   1070 	dst->groupid	= cpu_to_le32(src->groupid);
   1071 	dst->unified_rw_rep	= cpu_to_le32(src->unified_rw_rep);
   1072 }
   1073 
   1074 /*
   1075  * Send a CMD_TS, which packs struct thread_stat and group_run_stats
   1076  * into a single payload.
   1077  */
   1078 void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs)
   1079 {
   1080 	struct cmd_ts_pdu p;
   1081 	int i, j;
   1082 
   1083 	dprint(FD_NET, "server sending end stats\n");
   1084 
   1085 	memset(&p, 0, sizeof(p));
   1086 
   1087 	strncpy(p.ts.name, ts->name, FIO_JOBNAME_SIZE - 1);
   1088 	strncpy(p.ts.verror, ts->verror, FIO_VERROR_SIZE - 1);
   1089 	strncpy(p.ts.description, ts->description, FIO_JOBDESC_SIZE - 1);
   1090 
   1091 	p.ts.error		= cpu_to_le32(ts->error);
   1092 	p.ts.thread_number	= cpu_to_le32(ts->thread_number);
   1093 	p.ts.groupid		= cpu_to_le32(ts->groupid);
   1094 	p.ts.pid		= cpu_to_le32(ts->pid);
   1095 	p.ts.members		= cpu_to_le32(ts->members);
   1096 	p.ts.unified_rw_rep	= cpu_to_le32(ts->unified_rw_rep);
   1097 
   1098 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
   1099 		convert_io_stat(&p.ts.clat_stat[i], &ts->clat_stat[i]);
   1100 		convert_io_stat(&p.ts.slat_stat[i], &ts->slat_stat[i]);
   1101 		convert_io_stat(&p.ts.lat_stat[i], &ts->lat_stat[i]);
   1102 		convert_io_stat(&p.ts.bw_stat[i], &ts->bw_stat[i]);
   1103 	}
   1104 
   1105 	p.ts.usr_time		= cpu_to_le64(ts->usr_time);
   1106 	p.ts.sys_time		= cpu_to_le64(ts->sys_time);
   1107 	p.ts.ctx		= cpu_to_le64(ts->ctx);
   1108 	p.ts.minf		= cpu_to_le64(ts->minf);
   1109 	p.ts.majf		= cpu_to_le64(ts->majf);
   1110 	p.ts.clat_percentiles	= cpu_to_le64(ts->clat_percentiles);
   1111 	p.ts.percentile_precision = cpu_to_le64(ts->percentile_precision);
   1112 
   1113 	for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
   1114 		fio_fp64_t *src = &ts->percentile_list[i];
   1115 		fio_fp64_t *dst = &p.ts.percentile_list[i];
   1116 
   1117 		dst->u.i = cpu_to_le64(fio_double_to_uint64(src->u.f));
   1118 	}
   1119 
   1120 	for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
   1121 		p.ts.io_u_map[i]	= cpu_to_le32(ts->io_u_map[i]);
   1122 		p.ts.io_u_submit[i]	= cpu_to_le32(ts->io_u_submit[i]);
   1123 		p.ts.io_u_complete[i]	= cpu_to_le32(ts->io_u_complete[i]);
   1124 	}
   1125 
   1126 	for (i = 0; i < FIO_IO_U_LAT_U_NR; i++) {
   1127 		p.ts.io_u_lat_u[i]	= cpu_to_le32(ts->io_u_lat_u[i]);
   1128 		p.ts.io_u_lat_m[i]	= cpu_to_le32(ts->io_u_lat_m[i]);
   1129 	}
   1130 
   1131 	for (i = 0; i < DDIR_RWDIR_CNT; i++)
   1132 		for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
   1133 			p.ts.io_u_plat[i][j] = cpu_to_le32(ts->io_u_plat[i][j]);
   1134 
   1135 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
   1136 		p.ts.total_io_u[i]	= cpu_to_le64(ts->total_io_u[i]);
   1137 		p.ts.short_io_u[i]	= cpu_to_le64(ts->short_io_u[i]);
   1138 		p.ts.drop_io_u[i]	= cpu_to_le64(ts->drop_io_u[i]);
   1139 	}
   1140 
   1141 	p.ts.total_submit	= cpu_to_le64(ts->total_submit);
   1142 	p.ts.total_complete	= cpu_to_le64(ts->total_complete);
   1143 
   1144 	for (i = 0; i < DDIR_RWDIR_CNT; i++) {
   1145 		p.ts.io_bytes[i]	= cpu_to_le64(ts->io_bytes[i]);
   1146 		p.ts.runtime[i]		= cpu_to_le64(ts->runtime[i]);
   1147 	}
   1148 
   1149 	p.ts.total_run_time	= cpu_to_le64(ts->total_run_time);
   1150 	p.ts.continue_on_error	= cpu_to_le16(ts->continue_on_error);
   1151 	p.ts.total_err_count	= cpu_to_le64(ts->total_err_count);
   1152 	p.ts.first_error	= cpu_to_le32(ts->first_error);
   1153 	p.ts.kb_base		= cpu_to_le32(ts->kb_base);
   1154 	p.ts.unit_base		= cpu_to_le32(ts->unit_base);
   1155 
   1156 	p.ts.latency_depth	= cpu_to_le32(ts->latency_depth);
   1157 	p.ts.latency_target	= cpu_to_le64(ts->latency_target);
   1158 	p.ts.latency_window	= cpu_to_le64(ts->latency_window);
   1159 	p.ts.latency_percentile.u.i = cpu_to_le64(fio_double_to_uint64(ts->latency_percentile.u.f));
   1160 
   1161 	convert_gs(&p.rs, rs);
   1162 
   1163 	fio_net_send_cmd(server_fd, FIO_NET_CMD_TS, &p, sizeof(p), NULL, NULL);
   1164 }
   1165 
   1166 void fio_server_send_gs(struct group_run_stats *rs)
   1167 {
   1168 	struct group_run_stats gs;
   1169 
   1170 	dprint(FD_NET, "server sending group run stats\n");
   1171 
   1172 	convert_gs(&gs, rs);
   1173 	fio_net_send_cmd(server_fd, FIO_NET_CMD_GS, &gs, sizeof(gs), NULL, NULL);
   1174 }
   1175 
   1176 static void convert_agg(struct disk_util_agg *dst, struct disk_util_agg *src)
   1177 {
   1178 	int i;
   1179 
   1180 	for (i = 0; i < 2; i++) {
   1181 		dst->ios[i]	= cpu_to_le64(src->ios[i]);
   1182 		dst->merges[i]	= cpu_to_le64(src->merges[i]);
   1183 		dst->sectors[i]	= cpu_to_le64(src->sectors[i]);
   1184 		dst->ticks[i]	= cpu_to_le64(src->ticks[i]);
   1185 	}
   1186 
   1187 	dst->io_ticks		= cpu_to_le64(src->io_ticks);
   1188 	dst->time_in_queue	= cpu_to_le64(src->time_in_queue);
   1189 	dst->slavecount		= cpu_to_le32(src->slavecount);
   1190 	dst->max_util.u.i	= cpu_to_le64(fio_double_to_uint64(src->max_util.u.f));
   1191 }
   1192 
   1193 static void convert_dus(struct disk_util_stat *dst, struct disk_util_stat *src)
   1194 {
   1195 	int i;
   1196 
   1197 	dst->name[FIO_DU_NAME_SZ - 1] = '\0';
   1198 	strncpy((char *) dst->name, (char *) src->name, FIO_DU_NAME_SZ - 1);
   1199 
   1200 	for (i = 0; i < 2; i++) {
   1201 		dst->s.ios[i]		= cpu_to_le64(src->s.ios[i]);
   1202 		dst->s.merges[i]	= cpu_to_le64(src->s.merges[i]);
   1203 		dst->s.sectors[i]	= cpu_to_le64(src->s.sectors[i]);
   1204 		dst->s.ticks[i]		= cpu_to_le64(src->s.ticks[i]);
   1205 	}
   1206 
   1207 	dst->s.io_ticks		= cpu_to_le64(src->s.io_ticks);
   1208 	dst->s.time_in_queue	= cpu_to_le64(src->s.time_in_queue);
   1209 	dst->s.msec		= cpu_to_le64(src->s.msec);
   1210 }
   1211 
   1212 void fio_server_send_du(void)
   1213 {
   1214 	struct disk_util *du;
   1215 	struct flist_head *entry;
   1216 	struct cmd_du_pdu pdu;
   1217 
   1218 	dprint(FD_NET, "server: sending disk_util %d\n", !flist_empty(&disk_list));
   1219 
   1220 	memset(&pdu, 0, sizeof(pdu));
   1221 
   1222 	flist_for_each(entry, &disk_list) {
   1223 		du = flist_entry(entry, struct disk_util, list);
   1224 
   1225 		convert_dus(&pdu.dus, &du->dus);
   1226 		convert_agg(&pdu.agg, &du->agg);
   1227 
   1228 		fio_net_send_cmd(server_fd, FIO_NET_CMD_DU, &pdu, sizeof(pdu), NULL, NULL);
   1229 	}
   1230 }
   1231 
   1232 /*
   1233  * Send a command with a separate PDU, not inlined in the command
   1234  */
   1235 static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf,
   1236 				off_t size, uint64_t tag, uint32_t flags)
   1237 {
   1238 	struct fio_net_cmd cmd;
   1239 	struct iovec iov[2];
   1240 
   1241 	iov[0].iov_base = (void *) &cmd;
   1242 	iov[0].iov_len = sizeof(cmd);
   1243 	iov[1].iov_base = (void *) buf;
   1244 	iov[1].iov_len = size;
   1245 
   1246 	__fio_init_net_cmd(&cmd, opcode, size, tag);
   1247 	cmd.flags = __cpu_to_le32(flags);
   1248 	fio_net_cmd_crc_pdu(&cmd, buf);
   1249 
   1250 	return fio_sendv_data(sk, iov, 2);
   1251 }
   1252 
   1253 static int fio_send_iolog_gz(struct cmd_iolog_pdu *pdu, struct io_log *log)
   1254 {
   1255 	int ret = 0;
   1256 #ifdef CONFIG_ZLIB
   1257 	z_stream stream;
   1258 	void *out_pdu;
   1259 
   1260 	/*
   1261 	 * Dirty - since the log is potentially huge, compress it into
   1262 	 * FIO_SERVER_MAX_FRAGMENT_PDU chunks and let the receiving
   1263 	 * side defragment it.
   1264 	 */
   1265 	out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU);
   1266 
   1267 	stream.zalloc = Z_NULL;
   1268 	stream.zfree = Z_NULL;
   1269 	stream.opaque = Z_NULL;
   1270 
   1271 	if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK) {
   1272 		ret = 1;
   1273 		goto err;
   1274 	}
   1275 
   1276 	stream.next_in = (void *) log->log;
   1277 	stream.avail_in = log->nr_samples * log_entry_sz(log);
   1278 
   1279 	do {
   1280 		unsigned int this_len, flags = 0;
   1281 
   1282 		stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
   1283 		stream.next_out = out_pdu;
   1284 		ret = deflate(&stream, Z_FINISH);
   1285 		/* may be Z_OK, or Z_STREAM_END */
   1286 		if (ret < 0)
   1287 			goto err_zlib;
   1288 
   1289 		this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
   1290 
   1291 		if (stream.avail_in)
   1292 			flags = FIO_NET_CMD_F_MORE;
   1293 
   1294 		ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG,
   1295 					   out_pdu, this_len, 0, flags);
   1296 		if (ret)
   1297 			goto err_zlib;
   1298 	} while (stream.avail_in);
   1299 
   1300 err_zlib:
   1301 	deflateEnd(&stream);
   1302 err:
   1303 	free(out_pdu);
   1304 #endif
   1305 	return ret;
   1306 }
   1307 
   1308 int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
   1309 {
   1310 	struct cmd_iolog_pdu pdu;
   1311 	int i, ret = 0;
   1312 
   1313 	pdu.nr_samples = cpu_to_le64(log->nr_samples);
   1314 	pdu.thread_number = cpu_to_le32(td->thread_number);
   1315 	pdu.log_type = cpu_to_le32(log->log_type);
   1316 	pdu.compressed = cpu_to_le32(use_zlib);
   1317 
   1318 	strncpy((char *) pdu.name, name, FIO_NET_NAME_MAX);
   1319 	pdu.name[FIO_NET_NAME_MAX - 1] = '\0';
   1320 
   1321 	for (i = 0; i < log->nr_samples; i++) {
   1322 		struct io_sample *s = get_sample(log, i);
   1323 
   1324 		s->time		= cpu_to_le64(s->time);
   1325 		s->val		= cpu_to_le64(s->val);
   1326 		s->__ddir	= cpu_to_le32(s->__ddir);
   1327 		s->bs		= cpu_to_le32(s->bs);
   1328 
   1329 		if (log->log_offset) {
   1330 			struct io_sample_offset *so = (void *) s;
   1331 
   1332 			so->offset = cpu_to_le64(so->offset);
   1333 		}
   1334 	}
   1335 
   1336 	/*
   1337 	 * Send header first, it's not compressed.
   1338 	 */
   1339 	ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, &pdu,
   1340 					sizeof(pdu), 0, FIO_NET_CMD_F_MORE);
   1341 	if (ret)
   1342 		return ret;
   1343 
   1344 	/*
   1345 	 * Now send actual log, compress if we can, otherwise just plain
   1346 	 */
   1347 	if (use_zlib)
   1348 		return fio_send_iolog_gz(&pdu, log);
   1349 
   1350 	return fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, log->log,
   1351 			log->nr_samples * log_entry_sz(log), 0, 0);
   1352 }
   1353 
   1354 void fio_server_send_add_job(struct thread_data *td)
   1355 {
   1356 	struct cmd_add_job_pdu pdu;
   1357 
   1358 	memset(&pdu, 0, sizeof(pdu));
   1359 	pdu.thread_number = cpu_to_le32(td->thread_number);
   1360 	pdu.groupid = cpu_to_le32(td->groupid);
   1361 	convert_thread_options_to_net(&pdu.top, &td->o);
   1362 
   1363 	fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL, NULL);
   1364 }
   1365 
   1366 void fio_server_send_start(struct thread_data *td)
   1367 {
   1368 	assert(server_fd != -1);
   1369 
   1370 	fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL);
   1371 }
   1372 
   1373 int fio_server_get_verify_state(const char *name, int threadnumber,
   1374 				void **datap)
   1375 {
   1376 	struct thread_io_list *s;
   1377 	struct cmd_sendfile out;
   1378 	struct cmd_reply *rep;
   1379 	uint64_t tag;
   1380 	void *data;
   1381 
   1382 	dprint(FD_NET, "server: request verify state\n");
   1383 
   1384 	rep = smalloc(sizeof(*rep));
   1385 	if (!rep) {
   1386 		log_err("fio: smalloc pool too small\n");
   1387 		return 1;
   1388 	}
   1389 
   1390 	__fio_mutex_init(&rep->lock, FIO_MUTEX_LOCKED);
   1391 	rep->data = NULL;
   1392 	rep->error = 0;
   1393 
   1394 	verify_state_gen_name((char *) out.path, sizeof(out.path), name, me,
   1395 				threadnumber);
   1396 	tag = (uint64_t) (uintptr_t) rep;
   1397 	fio_net_send_cmd(server_fd, FIO_NET_CMD_SENDFILE, &out, sizeof(out),
   1398 				&tag, NULL);
   1399 
   1400 	/*
   1401 	 * Wait for the backend to receive the reply
   1402 	 */
   1403 	if (fio_mutex_down_timeout(&rep->lock, 10)) {
   1404 		log_err("fio: timed out waiting for reply\n");
   1405 		goto fail;
   1406 	}
   1407 
   1408 	if (rep->error) {
   1409 		log_err("fio: failure on receiving state file: %s\n", strerror(rep->error));
   1410 fail:
   1411 		*datap = NULL;
   1412 		sfree(rep);
   1413 		fio_net_send_quit(server_fd);
   1414 		return 1;
   1415 	}
   1416 
   1417 	/*
   1418 	 * The format is verify_state_hdr, then thread_io_list. Verify
   1419 	 * the header, and the thread_io_list checksum
   1420 	 */
   1421 	s = rep->data + sizeof(struct verify_state_hdr);
   1422 	if (verify_state_hdr(rep->data, s))
   1423 		goto fail;
   1424 
   1425 	/*
   1426 	 * Don't need the header from now, copy just the thread_io_list
   1427 	 */
   1428 	rep->size -= sizeof(struct verify_state_hdr);
   1429 	data = malloc(rep->size);
   1430 	memcpy(data, s, rep->size);
   1431 	*datap = data;
   1432 
   1433 	sfree(rep->data);
   1434 	__fio_mutex_remove(&rep->lock);
   1435 	sfree(rep);
   1436 	return 0;
   1437 }
   1438 
   1439 static int fio_init_server_ip(void)
   1440 {
   1441 	struct sockaddr *addr;
   1442 	socklen_t socklen;
   1443 	char buf[80];
   1444 	const char *str;
   1445 	int sk, opt;
   1446 
   1447 	if (use_ipv6)
   1448 		sk = socket(AF_INET6, SOCK_STREAM, 0);
   1449 	else
   1450 		sk = socket(AF_INET, SOCK_STREAM, 0);
   1451 
   1452 	if (sk < 0) {
   1453 		log_err("fio: socket: %s\n", strerror(errno));
   1454 		return -1;
   1455 	}
   1456 
   1457 	opt = 1;
   1458 	if (setsockopt(sk, SOL_SOCKET, SO_REUSEADDR, (void *)&opt, sizeof(opt)) < 0) {
   1459 		log_err("fio: setsockopt: %s\n", strerror(errno));
   1460 		close(sk);
   1461 		return -1;
   1462 	}
   1463 #ifdef SO_REUSEPORT
   1464 	if (setsockopt(sk, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
   1465 		log_err("fio: setsockopt: %s\n", strerror(errno));
   1466 		close(sk);
   1467 		return -1;
   1468 	}
   1469 #endif
   1470 
   1471 	if (use_ipv6) {
   1472 		const void *src = &saddr_in6.sin6_addr;
   1473 
   1474 		addr = (struct sockaddr *) &saddr_in6;
   1475 		socklen = sizeof(saddr_in6);
   1476 		saddr_in6.sin6_family = AF_INET6;
   1477 		str = inet_ntop(AF_INET6, src, buf, sizeof(buf));
   1478 	} else {
   1479 		const void *src = &saddr_in.sin_addr;
   1480 
   1481 		addr = (struct sockaddr *) &saddr_in;
   1482 		socklen = sizeof(saddr_in);
   1483 		saddr_in.sin_family = AF_INET;
   1484 		str = inet_ntop(AF_INET, src, buf, sizeof(buf));
   1485 	}
   1486 
   1487 	if (bind(sk, addr, socklen) < 0) {
   1488 		log_err("fio: bind: %s\n", strerror(errno));
   1489 		log_info("fio: failed with IPv%c %s\n", use_ipv6 ? '6' : '4', str);
   1490 		close(sk);
   1491 		return -1;
   1492 	}
   1493 
   1494 	return sk;
   1495 }
   1496 
   1497 static int fio_init_server_sock(void)
   1498 {
   1499 	struct sockaddr_un addr;
   1500 	socklen_t len;
   1501 	mode_t mode;
   1502 	int sk;
   1503 
   1504 	sk = socket(AF_UNIX, SOCK_STREAM, 0);
   1505 	if (sk < 0) {
   1506 		log_err("fio: socket: %s\n", strerror(errno));
   1507 		return -1;
   1508 	}
   1509 
   1510 	mode = umask(000);
   1511 
   1512 	memset(&addr, 0, sizeof(addr));
   1513 	addr.sun_family = AF_UNIX;
   1514 	strncpy(addr.sun_path, bind_sock, sizeof(addr.sun_path) - 1);
   1515 
   1516 	len = sizeof(addr.sun_family) + strlen(bind_sock) + 1;
   1517 
   1518 	if (bind(sk, (struct sockaddr *) &addr, len) < 0) {
   1519 		log_err("fio: bind: %s\n", strerror(errno));
   1520 		close(sk);
   1521 		return -1;
   1522 	}
   1523 
   1524 	umask(mode);
   1525 	return sk;
   1526 }
   1527 
   1528 static int fio_init_server_connection(void)
   1529 {
   1530 	char bind_str[128];
   1531 	int sk;
   1532 
   1533 	dprint(FD_NET, "starting server\n");
   1534 
   1535 	if (!bind_sock)
   1536 		sk = fio_init_server_ip();
   1537 	else
   1538 		sk = fio_init_server_sock();
   1539 
   1540 	if (sk < 0)
   1541 		return sk;
   1542 
   1543 	memset(bind_str, 0, sizeof(bind_str));
   1544 
   1545 	if (!bind_sock) {
   1546 		char *p, port[16];
   1547 		const void *src;
   1548 		int af;
   1549 
   1550 		if (use_ipv6) {
   1551 			af = AF_INET6;
   1552 			src = &saddr_in6.sin6_addr;
   1553 		} else {
   1554 			af = AF_INET;
   1555 			src = &saddr_in.sin_addr;
   1556 		}
   1557 
   1558 		p = (char *) inet_ntop(af, src, bind_str, sizeof(bind_str));
   1559 
   1560 		sprintf(port, ",%u", fio_net_port);
   1561 		if (p)
   1562 			strcat(p, port);
   1563 		else
   1564 			strncpy(bind_str, port, sizeof(bind_str) - 1);
   1565 	} else
   1566 		strncpy(bind_str, bind_sock, sizeof(bind_str) - 1);
   1567 
   1568 	log_info("fio: server listening on %s\n", bind_str);
   1569 
   1570 	if (listen(sk, 0) < 0) {
   1571 		log_err("fio: listen: %s\n", strerror(errno));
   1572 		close(sk);
   1573 		return -1;
   1574 	}
   1575 
   1576 	return sk;
   1577 }
   1578 
   1579 int fio_server_parse_host(const char *host, int ipv6, struct in_addr *inp,
   1580 			  struct in6_addr *inp6)
   1581 
   1582 {
   1583 	int ret = 0;
   1584 
   1585 	if (ipv6)
   1586 		ret = inet_pton(AF_INET6, host, inp6);
   1587 	else
   1588 		ret = inet_pton(AF_INET, host, inp);
   1589 
   1590 	if (ret != 1) {
   1591 		struct addrinfo hints, *res;
   1592 
   1593 		memset(&hints, 0, sizeof(hints));
   1594 		hints.ai_family = ipv6 ? AF_INET6 : AF_INET;
   1595 		hints.ai_socktype = SOCK_STREAM;
   1596 
   1597 		ret = getaddrinfo(host, NULL, &hints, &res);
   1598 		if (ret) {
   1599 			log_err("fio: failed to resolve <%s> (%s)\n", host,
   1600 					gai_strerror(ret));
   1601 			return 1;
   1602 		}
   1603 
   1604 		if (ipv6)
   1605 			memcpy(inp6, &((struct sockaddr_in6 *) res->ai_addr)->sin6_addr, sizeof(*inp6));
   1606 		else
   1607 			memcpy(inp, &((struct sockaddr_in *) res->ai_addr)->sin_addr, sizeof(*inp));
   1608 
   1609 		ret = 1;
   1610 		freeaddrinfo(res);
   1611 	}
   1612 
   1613 	return !(ret == 1);
   1614 }
   1615 
   1616 /*
   1617  * Parse a host/ip/port string. Reads from 'str'.
   1618  *
   1619  * Outputs:
   1620  *
   1621  * For IPv4:
   1622  *	*ptr is the host, *port is the port, inp is the destination.
   1623  * For IPv6:
   1624  *	*ptr is the host, *port is the port, inp6 is the dest, and *ipv6 is 1.
   1625  * For local domain sockets:
   1626  *	*ptr is the filename, *is_sock is 1.
   1627  */
   1628 int fio_server_parse_string(const char *str, char **ptr, int *is_sock,
   1629 			    int *port, struct in_addr *inp,
   1630 			    struct in6_addr *inp6, int *ipv6)
   1631 {
   1632 	const char *host = str;
   1633 	char *portp;
   1634 	int lport = 0;
   1635 
   1636 	*ptr = NULL;
   1637 	*is_sock = 0;
   1638 	*port = fio_net_port;
   1639 	*ipv6 = 0;
   1640 
   1641 	if (!strncmp(str, "sock:", 5)) {
   1642 		*ptr = strdup(str + 5);
   1643 		*is_sock = 1;
   1644 
   1645 		return 0;
   1646 	}
   1647 
   1648 	/*
   1649 	 * Is it ip:<ip or host>:port
   1650 	 */
   1651 	if (!strncmp(host, "ip:", 3))
   1652 		host += 3;
   1653 	else if (!strncmp(host, "ip4:", 4))
   1654 		host += 4;
   1655 	else if (!strncmp(host, "ip6:", 4)) {
   1656 		host += 4;
   1657 		*ipv6 = 1;
   1658 	} else if (host[0] == ':') {
   1659 		/* String is :port */
   1660 		host++;
   1661 		lport = atoi(host);
   1662 		if (!lport || lport > 65535) {
   1663 			log_err("fio: bad server port %u\n", lport);
   1664 			return 1;
   1665 		}
   1666 		/* no hostname given, we are done */
   1667 		*port = lport;
   1668 		return 0;
   1669 	}
   1670 
   1671 	/*
   1672 	 * If no port seen yet, check if there's a last ',' at the end
   1673 	 */
   1674 	if (!lport) {
   1675 		portp = strchr(host, ',');
   1676 		if (portp) {
   1677 			*portp = '\0';
   1678 			portp++;
   1679 			lport = atoi(portp);
   1680 			if (!lport || lport > 65535) {
   1681 				log_err("fio: bad server port %u\n", lport);
   1682 				return 1;
   1683 			}
   1684 		}
   1685 	}
   1686 
   1687 	if (lport)
   1688 		*port = lport;
   1689 
   1690 	if (!strlen(host))
   1691 		return 0;
   1692 
   1693 	*ptr = strdup(host);
   1694 
   1695 	if (fio_server_parse_host(*ptr, *ipv6, inp, inp6)) {
   1696 		free(*ptr);
   1697 		*ptr = NULL;
   1698 		return 1;
   1699 	}
   1700 
   1701 	if (*port == 0)
   1702 		*port = fio_net_port;
   1703 
   1704 	return 0;
   1705 }
   1706 
   1707 /*
   1708  * Server arg should be one of:
   1709  *
   1710  * sock:/path/to/socket
   1711  *   ip:1.2.3.4
   1712  *      1.2.3.4
   1713  *
   1714  * Where sock uses unix domain sockets, and ip binds the server to
   1715  * a specific interface. If no arguments are given to the server, it
   1716  * uses IP and binds to 0.0.0.0.
   1717  *
   1718  */
   1719 static int fio_handle_server_arg(void)
   1720 {
   1721 	int port = fio_net_port;
   1722 	int is_sock, ret = 0;
   1723 
   1724 	saddr_in.sin_addr.s_addr = htonl(INADDR_ANY);
   1725 
   1726 	if (!fio_server_arg)
   1727 		goto out;
   1728 
   1729 	ret = fio_server_parse_string(fio_server_arg, &bind_sock, &is_sock,
   1730 					&port, &saddr_in.sin_addr,
   1731 					&saddr_in6.sin6_addr, &use_ipv6);
   1732 
   1733 	if (!is_sock && bind_sock) {
   1734 		free(bind_sock);
   1735 		bind_sock = NULL;
   1736 	}
   1737 
   1738 out:
   1739 	fio_net_port = port;
   1740 	saddr_in.sin_port = htons(port);
   1741 	saddr_in6.sin6_port = htons(port);
   1742 	return ret;
   1743 }
   1744 
   1745 static void sig_int(int sig)
   1746 {
   1747 	if (bind_sock)
   1748 		unlink(bind_sock);
   1749 }
   1750 
   1751 static void set_sig_handlers(void)
   1752 {
   1753 	struct sigaction act;
   1754 
   1755 	memset(&act, 0, sizeof(act));
   1756 	act.sa_handler = sig_int;
   1757 	act.sa_flags = SA_RESTART;
   1758 	sigaction(SIGINT, &act, NULL);
   1759 }
   1760 
   1761 static int fio_server(void)
   1762 {
   1763 	int sk, ret;
   1764 
   1765 	dprint(FD_NET, "starting server\n");
   1766 
   1767 	if (fio_handle_server_arg())
   1768 		return -1;
   1769 
   1770 	sk = fio_init_server_connection();
   1771 	if (sk < 0)
   1772 		return -1;
   1773 
   1774 	set_sig_handlers();
   1775 
   1776 	ret = accept_loop(sk);
   1777 
   1778 	close(sk);
   1779 
   1780 	if (fio_server_arg) {
   1781 		free(fio_server_arg);
   1782 		fio_server_arg = NULL;
   1783 	}
   1784 	if (bind_sock)
   1785 		free(bind_sock);
   1786 
   1787 	return ret;
   1788 }
   1789 
   1790 void fio_server_got_signal(int signal)
   1791 {
   1792 	if (signal == SIGPIPE)
   1793 		server_fd = -1;
   1794 	else {
   1795 		log_info("\nfio: terminating on signal %d\n", signal);
   1796 		exit_backend = 1;
   1797 	}
   1798 }
   1799 
   1800 static int check_existing_pidfile(const char *pidfile)
   1801 {
   1802 	struct stat sb;
   1803 	char buf[16];
   1804 	pid_t pid;
   1805 	FILE *f;
   1806 
   1807 	if (stat(pidfile, &sb))
   1808 		return 0;
   1809 
   1810 	f = fopen(pidfile, "r");
   1811 	if (!f)
   1812 		return 0;
   1813 
   1814 	if (fread(buf, sb.st_size, 1, f) <= 0) {
   1815 		fclose(f);
   1816 		return 1;
   1817 	}
   1818 	fclose(f);
   1819 
   1820 	pid = atoi(buf);
   1821 	if (kill(pid, SIGCONT) < 0)
   1822 		return errno != ESRCH;
   1823 
   1824 	return 1;
   1825 }
   1826 
   1827 static int write_pid(pid_t pid, const char *pidfile)
   1828 {
   1829 	FILE *fpid;
   1830 
   1831 	fpid = fopen(pidfile, "w");
   1832 	if (!fpid) {
   1833 		log_err("fio: failed opening pid file %s\n", pidfile);
   1834 		return 1;
   1835 	}
   1836 
   1837 	fprintf(fpid, "%u\n", (unsigned int) pid);
   1838 	fclose(fpid);
   1839 	return 0;
   1840 }
   1841 
   1842 /*
   1843  * If pidfile is specified, background us.
   1844  */
   1845 int fio_start_server(char *pidfile)
   1846 {
   1847 	pid_t pid;
   1848 	int ret;
   1849 
   1850 #if defined(WIN32)
   1851 	WSADATA wsd;
   1852 	WSAStartup(MAKEWORD(2, 2), &wsd);
   1853 #endif
   1854 
   1855 	if (!pidfile)
   1856 		return fio_server();
   1857 
   1858 	if (check_existing_pidfile(pidfile)) {
   1859 		log_err("fio: pidfile %s exists and server appears alive\n",
   1860 								pidfile);
   1861 		free(pidfile);
   1862 		return -1;
   1863 	}
   1864 
   1865 	pid = fork();
   1866 	if (pid < 0) {
   1867 		log_err("fio: failed server fork: %s", strerror(errno));
   1868 		free(pidfile);
   1869 		return -1;
   1870 	} else if (pid) {
   1871 		ret = write_pid(pid, pidfile);
   1872 		free(pidfile);
   1873 		_exit(ret);
   1874 	}
   1875 
   1876 	setsid();
   1877 	openlog("fio", LOG_NDELAY|LOG_NOWAIT|LOG_PID, LOG_USER);
   1878 	log_syslog = 1;
   1879 	close(STDIN_FILENO);
   1880 	close(STDOUT_FILENO);
   1881 	close(STDERR_FILENO);
   1882 	f_out = NULL;
   1883 	f_err = NULL;
   1884 
   1885 	ret = fio_server();
   1886 
   1887 	closelog();
   1888 	unlink(pidfile);
   1889 	free(pidfile);
   1890 	return ret;
   1891 }
   1892 
   1893 void fio_server_set_arg(const char *arg)
   1894 {
   1895 	fio_server_arg = strdup(arg);
   1896 }
   1897