Home | History | Annotate | Download | only in fio
      1 /*
      2  * Code related to writing an iolog of what a thread is doing, and to
      3  * later read that back and replay
      4  */
      5 #include <stdio.h>
      6 #include <stdlib.h>
      7 #include <libgen.h>
      8 #include <assert.h>
      9 #include <sys/types.h>
     10 #include <sys/stat.h>
     11 #include <unistd.h>
     12 #ifdef CONFIG_ZLIB
     13 #include <zlib.h>
     14 #endif
     15 
     16 #include "flist.h"
     17 #include "fio.h"
     18 #include "verify.h"
     19 #include "trim.h"
     20 #include "filelock.h"
     21 #include "lib/tp.h"
     22 
     23 static const char iolog_ver2[] = "fio version 2 iolog";
     24 
     25 void queue_io_piece(struct thread_data *td, struct io_piece *ipo)
     26 {
     27 	flist_add_tail(&ipo->list, &td->io_log_list);
     28 	td->total_io_size += ipo->len;
     29 }
     30 
     31 void log_io_u(const struct thread_data *td, const struct io_u *io_u)
     32 {
     33 	if (!td->o.write_iolog_file)
     34 		return;
     35 
     36 	fprintf(td->iolog_f, "%s %s %llu %lu\n", io_u->file->file_name,
     37 						io_ddir_name(io_u->ddir),
     38 						io_u->offset, io_u->buflen);
     39 }
     40 
     41 void log_file(struct thread_data *td, struct fio_file *f,
     42 	      enum file_log_act what)
     43 {
     44 	const char *act[] = { "add", "open", "close" };
     45 
     46 	assert(what < 3);
     47 
     48 	if (!td->o.write_iolog_file)
     49 		return;
     50 
     51 
     52 	/*
     53 	 * this happens on the pre-open/close done before the job starts
     54 	 */
     55 	if (!td->iolog_f)
     56 		return;
     57 
     58 	fprintf(td->iolog_f, "%s %s\n", f->file_name, act[what]);
     59 }
     60 
     61 static void iolog_delay(struct thread_data *td, unsigned long delay)
     62 {
     63 	uint64_t usec = utime_since_now(&td->last_issue);
     64 	uint64_t this_delay;
     65 	struct timeval tv;
     66 
     67 	if (delay < td->time_offset) {
     68 		td->time_offset = 0;
     69 		return;
     70 	}
     71 
     72 	delay -= td->time_offset;
     73 	if (delay < usec)
     74 		return;
     75 
     76 	delay -= usec;
     77 
     78 	fio_gettime(&tv, NULL);
     79 	while (delay && !td->terminate) {
     80 		this_delay = delay;
     81 		if (this_delay > 500000)
     82 			this_delay = 500000;
     83 
     84 		usec_sleep(td, this_delay);
     85 		delay -= this_delay;
     86 	}
     87 
     88 	usec = utime_since_now(&tv);
     89 	if (usec > delay)
     90 		td->time_offset = usec - delay;
     91 	else
     92 		td->time_offset = 0;
     93 }
     94 
     95 static int ipo_special(struct thread_data *td, struct io_piece *ipo)
     96 {
     97 	struct fio_file *f;
     98 	int ret;
     99 
    100 	/*
    101 	 * Not a special ipo
    102 	 */
    103 	if (ipo->ddir != DDIR_INVAL)
    104 		return 0;
    105 
    106 	f = td->files[ipo->fileno];
    107 
    108 	switch (ipo->file_action) {
    109 	case FIO_LOG_OPEN_FILE:
    110 		ret = td_io_open_file(td, f);
    111 		if (!ret)
    112 			break;
    113 		td_verror(td, ret, "iolog open file");
    114 		return -1;
    115 	case FIO_LOG_CLOSE_FILE:
    116 		td_io_close_file(td, f);
    117 		break;
    118 	case FIO_LOG_UNLINK_FILE:
    119 		td_io_unlink_file(td, f);
    120 		break;
    121 	default:
    122 		log_err("fio: bad file action %d\n", ipo->file_action);
    123 		break;
    124 	}
    125 
    126 	return 1;
    127 }
    128 
    129 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
    130 {
    131 	struct io_piece *ipo;
    132 	unsigned long elapsed;
    133 
    134 	while (!flist_empty(&td->io_log_list)) {
    135 		int ret;
    136 
    137 		ipo = flist_first_entry(&td->io_log_list, struct io_piece, list);
    138 		flist_del(&ipo->list);
    139 		remove_trim_entry(td, ipo);
    140 
    141 		ret = ipo_special(td, ipo);
    142 		if (ret < 0) {
    143 			free(ipo);
    144 			break;
    145 		} else if (ret > 0) {
    146 			free(ipo);
    147 			continue;
    148 		}
    149 
    150 		io_u->ddir = ipo->ddir;
    151 		if (ipo->ddir != DDIR_WAIT) {
    152 			io_u->offset = ipo->offset;
    153 			io_u->buflen = ipo->len;
    154 			io_u->file = td->files[ipo->fileno];
    155 			get_file(io_u->file);
    156 			dprint(FD_IO, "iolog: get %llu/%lu/%s\n", io_u->offset,
    157 						io_u->buflen, io_u->file->file_name);
    158 			if (ipo->delay)
    159 				iolog_delay(td, ipo->delay);
    160 		} else {
    161 			elapsed = mtime_since_genesis();
    162 			if (ipo->delay > elapsed)
    163 				usec_sleep(td, (ipo->delay - elapsed) * 1000);
    164 		}
    165 
    166 		free(ipo);
    167 
    168 		if (io_u->ddir != DDIR_WAIT)
    169 			return 0;
    170 	}
    171 
    172 	td->done = 1;
    173 	return 1;
    174 }
    175 
    176 void prune_io_piece_log(struct thread_data *td)
    177 {
    178 	struct io_piece *ipo;
    179 	struct rb_node *n;
    180 
    181 	while ((n = rb_first(&td->io_hist_tree)) != NULL) {
    182 		ipo = rb_entry(n, struct io_piece, rb_node);
    183 		rb_erase(n, &td->io_hist_tree);
    184 		remove_trim_entry(td, ipo);
    185 		td->io_hist_len--;
    186 		free(ipo);
    187 	}
    188 
    189 	while (!flist_empty(&td->io_hist_list)) {
    190 		ipo = flist_first_entry(&td->io_hist_list, struct io_piece, list);
    191 		flist_del(&ipo->list);
    192 		remove_trim_entry(td, ipo);
    193 		td->io_hist_len--;
    194 		free(ipo);
    195 	}
    196 }
    197 
    198 /*
    199  * log a successful write, so we can unwind the log for verify
    200  */
    201 void log_io_piece(struct thread_data *td, struct io_u *io_u)
    202 {
    203 	struct rb_node **p, *parent;
    204 	struct io_piece *ipo, *__ipo;
    205 
    206 	ipo = malloc(sizeof(struct io_piece));
    207 	init_ipo(ipo);
    208 	ipo->file = io_u->file;
    209 	ipo->offset = io_u->offset;
    210 	ipo->len = io_u->buflen;
    211 	ipo->numberio = io_u->numberio;
    212 	ipo->flags = IP_F_IN_FLIGHT;
    213 
    214 	io_u->ipo = ipo;
    215 
    216 	if (io_u_should_trim(td, io_u)) {
    217 		flist_add_tail(&ipo->trim_list, &td->trim_list);
    218 		td->trim_entries++;
    219 	}
    220 
    221 	/*
    222 	 * We don't need to sort the entries, if:
    223 	 *
    224 	 *	Sequential writes, or
    225 	 *	Random writes that lay out the file as it goes along
    226 	 *
    227 	 * For both these cases, just reading back data in the order we
    228 	 * wrote it out is the fastest.
    229 	 *
    230 	 * One exception is if we don't have a random map AND we are doing
    231 	 * verifies, in that case we need to check for duplicate blocks and
    232 	 * drop the old one, which we rely on the rb insert/lookup for
    233 	 * handling.
    234 	 */
    235 	if (((!td->o.verifysort) || !td_random(td) || !td->o.overwrite) &&
    236 	      (file_randommap(td, ipo->file) || td->o.verify == VERIFY_NONE)) {
    237 		INIT_FLIST_HEAD(&ipo->list);
    238 		flist_add_tail(&ipo->list, &td->io_hist_list);
    239 		ipo->flags |= IP_F_ONLIST;
    240 		td->io_hist_len++;
    241 		return;
    242 	}
    243 
    244 	RB_CLEAR_NODE(&ipo->rb_node);
    245 
    246 	/*
    247 	 * Sort the entry into the verification list
    248 	 */
    249 restart:
    250 	p = &td->io_hist_tree.rb_node;
    251 	parent = NULL;
    252 	while (*p) {
    253 		int overlap = 0;
    254 		parent = *p;
    255 
    256 		__ipo = rb_entry(parent, struct io_piece, rb_node);
    257 		if (ipo->file < __ipo->file)
    258 			p = &(*p)->rb_left;
    259 		else if (ipo->file > __ipo->file)
    260 			p = &(*p)->rb_right;
    261 		else if (ipo->offset < __ipo->offset) {
    262 			p = &(*p)->rb_left;
    263 			overlap = ipo->offset + ipo->len > __ipo->offset;
    264 		}
    265 		else if (ipo->offset > __ipo->offset) {
    266 			p = &(*p)->rb_right;
    267 			overlap = __ipo->offset + __ipo->len > ipo->offset;
    268 		}
    269 		else
    270 			overlap = 1;
    271 
    272 		if (overlap) {
    273 			dprint(FD_IO, "iolog: overlap %llu/%lu, %llu/%lu",
    274 				__ipo->offset, __ipo->len,
    275 				ipo->offset, ipo->len);
    276 			td->io_hist_len--;
    277 			rb_erase(parent, &td->io_hist_tree);
    278 			remove_trim_entry(td, __ipo);
    279 			free(__ipo);
    280 			goto restart;
    281 		}
    282 	}
    283 
    284 	rb_link_node(&ipo->rb_node, parent, p);
    285 	rb_insert_color(&ipo->rb_node, &td->io_hist_tree);
    286 	ipo->flags |= IP_F_ONRB;
    287 	td->io_hist_len++;
    288 }
    289 
    290 void unlog_io_piece(struct thread_data *td, struct io_u *io_u)
    291 {
    292 	struct io_piece *ipo = io_u->ipo;
    293 
    294 	if (!ipo)
    295 		return;
    296 
    297 	if (ipo->flags & IP_F_ONRB)
    298 		rb_erase(&ipo->rb_node, &td->io_hist_tree);
    299 	else if (ipo->flags & IP_F_ONLIST)
    300 		flist_del(&ipo->list);
    301 
    302 	free(ipo);
    303 	io_u->ipo = NULL;
    304 	td->io_hist_len--;
    305 }
    306 
    307 void trim_io_piece(struct thread_data *td, const struct io_u *io_u)
    308 {
    309 	struct io_piece *ipo = io_u->ipo;
    310 
    311 	if (!ipo)
    312 		return;
    313 
    314 	ipo->len = io_u->xfer_buflen - io_u->resid;
    315 }
    316 
    317 void write_iolog_close(struct thread_data *td)
    318 {
    319 	fflush(td->iolog_f);
    320 	fclose(td->iolog_f);
    321 	free(td->iolog_buf);
    322 	td->iolog_f = NULL;
    323 	td->iolog_buf = NULL;
    324 }
    325 
    326 /*
    327  * Read version 2 iolog data. It is enhanced to include per-file logging,
    328  * syncs, etc.
    329  */
    330 static int read_iolog2(struct thread_data *td, FILE *f)
    331 {
    332 	unsigned long long offset;
    333 	unsigned int bytes;
    334 	int reads, writes, waits, fileno = 0, file_action = 0; /* stupid gcc */
    335 	char *fname, *act;
    336 	char *str, *p;
    337 	enum fio_ddir rw;
    338 
    339 	free_release_files(td);
    340 
    341 	/*
    342 	 * Read in the read iolog and store it, reuse the infrastructure
    343 	 * for doing verifications.
    344 	 */
    345 	str = malloc(4096);
    346 	fname = malloc(256+16);
    347 	act = malloc(256+16);
    348 
    349 	reads = writes = waits = 0;
    350 	while ((p = fgets(str, 4096, f)) != NULL) {
    351 		struct io_piece *ipo;
    352 		int r;
    353 
    354 		r = sscanf(p, "%256s %256s %llu %u", fname, act, &offset,
    355 									&bytes);
    356 		if (r == 4) {
    357 			/*
    358 			 * Check action first
    359 			 */
    360 			if (!strcmp(act, "wait"))
    361 				rw = DDIR_WAIT;
    362 			else if (!strcmp(act, "read"))
    363 				rw = DDIR_READ;
    364 			else if (!strcmp(act, "write"))
    365 				rw = DDIR_WRITE;
    366 			else if (!strcmp(act, "sync"))
    367 				rw = DDIR_SYNC;
    368 			else if (!strcmp(act, "datasync"))
    369 				rw = DDIR_DATASYNC;
    370 			else if (!strcmp(act, "trim"))
    371 				rw = DDIR_TRIM;
    372 			else {
    373 				log_err("fio: bad iolog file action: %s\n",
    374 									act);
    375 				continue;
    376 			}
    377 			fileno = get_fileno(td, fname);
    378 		} else if (r == 2) {
    379 			rw = DDIR_INVAL;
    380 			if (!strcmp(act, "add")) {
    381 				fileno = add_file(td, fname, 0, 1);
    382 				file_action = FIO_LOG_ADD_FILE;
    383 				continue;
    384 			} else if (!strcmp(act, "open")) {
    385 				fileno = get_fileno(td, fname);
    386 				file_action = FIO_LOG_OPEN_FILE;
    387 			} else if (!strcmp(act, "close")) {
    388 				fileno = get_fileno(td, fname);
    389 				file_action = FIO_LOG_CLOSE_FILE;
    390 			} else {
    391 				log_err("fio: bad iolog file action: %s\n",
    392 									act);
    393 				continue;
    394 			}
    395 		} else {
    396 			log_err("bad iolog2: %s", p);
    397 			continue;
    398 		}
    399 
    400 		if (rw == DDIR_READ)
    401 			reads++;
    402 		else if (rw == DDIR_WRITE) {
    403 			/*
    404 			 * Don't add a write for ro mode
    405 			 */
    406 			if (read_only)
    407 				continue;
    408 			writes++;
    409 		} else if (rw == DDIR_WAIT) {
    410 			waits++;
    411 		} else if (rw == DDIR_INVAL) {
    412 		} else if (!ddir_sync(rw)) {
    413 			log_err("bad ddir: %d\n", rw);
    414 			continue;
    415 		}
    416 
    417 		/*
    418 		 * Make note of file
    419 		 */
    420 		ipo = malloc(sizeof(*ipo));
    421 		init_ipo(ipo);
    422 		ipo->ddir = rw;
    423 		if (rw == DDIR_WAIT) {
    424 			ipo->delay = offset;
    425 		} else {
    426 			ipo->offset = offset;
    427 			ipo->len = bytes;
    428 			if (rw != DDIR_INVAL && bytes > td->o.max_bs[rw])
    429 				td->o.max_bs[rw] = bytes;
    430 			ipo->fileno = fileno;
    431 			ipo->file_action = file_action;
    432 			td->o.size += bytes;
    433 		}
    434 
    435 		queue_io_piece(td, ipo);
    436 	}
    437 
    438 	free(str);
    439 	free(act);
    440 	free(fname);
    441 
    442 	if (writes && read_only) {
    443 		log_err("fio: <%s> skips replay of %d writes due to"
    444 			" read-only\n", td->o.name, writes);
    445 		writes = 0;
    446 	}
    447 
    448 	if (!reads && !writes && !waits)
    449 		return 1;
    450 	else if (reads && !writes)
    451 		td->o.td_ddir = TD_DDIR_READ;
    452 	else if (!reads && writes)
    453 		td->o.td_ddir = TD_DDIR_WRITE;
    454 	else
    455 		td->o.td_ddir = TD_DDIR_RW;
    456 
    457 	return 0;
    458 }
    459 
    460 /*
    461  * open iolog, check version, and call appropriate parser
    462  */
    463 static int init_iolog_read(struct thread_data *td)
    464 {
    465 	char buffer[256], *p;
    466 	FILE *f;
    467 	int ret;
    468 
    469 	f = fopen(td->o.read_iolog_file, "r");
    470 	if (!f) {
    471 		perror("fopen read iolog");
    472 		return 1;
    473 	}
    474 
    475 	p = fgets(buffer, sizeof(buffer), f);
    476 	if (!p) {
    477 		td_verror(td, errno, "iolog read");
    478 		log_err("fio: unable to read iolog\n");
    479 		fclose(f);
    480 		return 1;
    481 	}
    482 
    483 	/*
    484 	 * version 2 of the iolog stores a specific string as the
    485 	 * first line, check for that
    486 	 */
    487 	if (!strncmp(iolog_ver2, buffer, strlen(iolog_ver2)))
    488 		ret = read_iolog2(td, f);
    489 	else {
    490 		log_err("fio: iolog version 1 is no longer supported\n");
    491 		ret = 1;
    492 	}
    493 
    494 	fclose(f);
    495 	return ret;
    496 }
    497 
    498 /*
    499  * Set up a log for storing io patterns.
    500  */
    501 static int init_iolog_write(struct thread_data *td)
    502 {
    503 	struct fio_file *ff;
    504 	FILE *f;
    505 	unsigned int i;
    506 
    507 	f = fopen(td->o.write_iolog_file, "a");
    508 	if (!f) {
    509 		perror("fopen write iolog");
    510 		return 1;
    511 	}
    512 
    513 	/*
    514 	 * That's it for writing, setup a log buffer and we're done.
    515 	  */
    516 	td->iolog_f = f;
    517 	td->iolog_buf = malloc(8192);
    518 	setvbuf(f, td->iolog_buf, _IOFBF, 8192);
    519 
    520 	/*
    521 	 * write our version line
    522 	 */
    523 	if (fprintf(f, "%s\n", iolog_ver2) < 0) {
    524 		perror("iolog init\n");
    525 		return 1;
    526 	}
    527 
    528 	/*
    529 	 * add all known files
    530 	 */
    531 	for_each_file(td, ff, i)
    532 		log_file(td, ff, FIO_LOG_ADD_FILE);
    533 
    534 	return 0;
    535 }
    536 
    537 int init_iolog(struct thread_data *td)
    538 {
    539 	int ret = 0;
    540 
    541 	if (td->o.read_iolog_file) {
    542 		int need_swap;
    543 
    544 		/*
    545 		 * Check if it's a blktrace file and load that if possible.
    546 		 * Otherwise assume it's a normal log file and load that.
    547 		 */
    548 		if (is_blktrace(td->o.read_iolog_file, &need_swap))
    549 			ret = load_blktrace(td, td->o.read_iolog_file, need_swap);
    550 		else
    551 			ret = init_iolog_read(td);
    552 	} else if (td->o.write_iolog_file)
    553 		ret = init_iolog_write(td);
    554 
    555 	if (ret)
    556 		td_verror(td, EINVAL, "failed initializing iolog");
    557 
    558 	return ret;
    559 }
    560 
    561 void setup_log(struct io_log **log, struct log_params *p,
    562 	       const char *filename)
    563 {
    564 	struct io_log *l;
    565 
    566 	l = calloc(1, sizeof(*l));
    567 	l->nr_samples = 0;
    568 	l->max_samples = 1024;
    569 	l->log_type = p->log_type;
    570 	l->log_offset = p->log_offset;
    571 	l->log_gz = p->log_gz;
    572 	l->log_gz_store = p->log_gz_store;
    573 	l->log = malloc(l->max_samples * log_entry_sz(l));
    574 	l->avg_msec = p->avg_msec;
    575 	l->filename = strdup(filename);
    576 	l->td = p->td;
    577 
    578 	if (l->log_offset)
    579 		l->log_ddir_mask = LOG_OFFSET_SAMPLE_BIT;
    580 
    581 	INIT_FLIST_HEAD(&l->chunk_list);
    582 
    583 	if (l->log_gz && !p->td)
    584 		l->log_gz = 0;
    585 	else if (l->log_gz) {
    586 		pthread_mutex_init(&l->chunk_lock, NULL);
    587 		p->td->flags |= TD_F_COMPRESS_LOG;
    588 	}
    589 
    590 	*log = l;
    591 }
    592 
    593 #ifdef CONFIG_SETVBUF
    594 static void *set_file_buffer(FILE *f)
    595 {
    596 	size_t size = 1048576;
    597 	void *buf;
    598 
    599 	buf = malloc(size);
    600 	setvbuf(f, buf, _IOFBF, size);
    601 	return buf;
    602 }
    603 
    604 static void clear_file_buffer(void *buf)
    605 {
    606 	free(buf);
    607 }
    608 #else
    609 static void *set_file_buffer(FILE *f)
    610 {
    611 	return NULL;
    612 }
    613 
    614 static void clear_file_buffer(void *buf)
    615 {
    616 }
    617 #endif
    618 
    619 void free_log(struct io_log *log)
    620 {
    621 	free(log->log);
    622 	free(log->filename);
    623 	free(log);
    624 }
    625 
    626 static void flush_samples(FILE *f, void *samples, uint64_t sample_size)
    627 {
    628 	struct io_sample *s;
    629 	int log_offset;
    630 	uint64_t i, nr_samples;
    631 
    632 	if (!sample_size)
    633 		return;
    634 
    635 	s = __get_sample(samples, 0, 0);
    636 	log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
    637 
    638 	nr_samples = sample_size / __log_entry_sz(log_offset);
    639 
    640 	for (i = 0; i < nr_samples; i++) {
    641 		s = __get_sample(samples, log_offset, i);
    642 
    643 		if (!log_offset) {
    644 			fprintf(f, "%lu, %lu, %u, %u\n",
    645 					(unsigned long) s->time,
    646 					(unsigned long) s->val,
    647 					io_sample_ddir(s), s->bs);
    648 		} else {
    649 			struct io_sample_offset *so = (void *) s;
    650 
    651 			fprintf(f, "%lu, %lu, %u, %u, %llu\n",
    652 					(unsigned long) s->time,
    653 					(unsigned long) s->val,
    654 					io_sample_ddir(s), s->bs,
    655 					(unsigned long long) so->offset);
    656 		}
    657 	}
    658 }
    659 
    660 #ifdef CONFIG_ZLIB
    661 
    662 struct iolog_flush_data {
    663 	struct tp_work work;
    664 	struct io_log *log;
    665 	void *samples;
    666 	uint64_t nr_samples;
    667 };
    668 
    669 struct iolog_compress {
    670 	struct flist_head list;
    671 	void *buf;
    672 	size_t len;
    673 	unsigned int seq;
    674 };
    675 
    676 #define GZ_CHUNK	131072
    677 
    678 static struct iolog_compress *get_new_chunk(unsigned int seq)
    679 {
    680 	struct iolog_compress *c;
    681 
    682 	c = malloc(sizeof(*c));
    683 	INIT_FLIST_HEAD(&c->list);
    684 	c->buf = malloc(GZ_CHUNK);
    685 	c->len = 0;
    686 	c->seq = seq;
    687 	return c;
    688 }
    689 
    690 static void free_chunk(struct iolog_compress *ic)
    691 {
    692 	free(ic->buf);
    693 	free(ic);
    694 }
    695 
    696 static int z_stream_init(z_stream *stream, int gz_hdr)
    697 {
    698 	int wbits = 15;
    699 
    700 	stream->zalloc = Z_NULL;
    701 	stream->zfree = Z_NULL;
    702 	stream->opaque = Z_NULL;
    703 	stream->next_in = Z_NULL;
    704 
    705 	/*
    706 	 * zlib magic - add 32 for auto-detection of gz header or not,
    707 	 * if we decide to store files in a gzip friendly format.
    708 	 */
    709 	if (gz_hdr)
    710 		wbits += 32;
    711 
    712 	if (inflateInit2(stream, wbits) != Z_OK)
    713 		return 1;
    714 
    715 	return 0;
    716 }
    717 
    718 struct inflate_chunk_iter {
    719 	unsigned int seq;
    720 	int err;
    721 	void *buf;
    722 	size_t buf_size;
    723 	size_t buf_used;
    724 	size_t chunk_sz;
    725 };
    726 
    727 static void finish_chunk(z_stream *stream, FILE *f,
    728 			 struct inflate_chunk_iter *iter)
    729 {
    730 	int ret;
    731 
    732 	ret = inflateEnd(stream);
    733 	if (ret != Z_OK)
    734 		log_err("fio: failed to end log inflation (%d)\n", ret);
    735 
    736 	flush_samples(f, iter->buf, iter->buf_used);
    737 	free(iter->buf);
    738 	iter->buf = NULL;
    739 	iter->buf_size = iter->buf_used = 0;
    740 }
    741 
    742 /*
    743  * Iterative chunk inflation. Handles cases where we cross into a new
    744  * sequence, doing flush finish of previous chunk if needed.
    745  */
    746 static size_t inflate_chunk(struct iolog_compress *ic, int gz_hdr, FILE *f,
    747 			    z_stream *stream, struct inflate_chunk_iter *iter)
    748 {
    749 	size_t ret;
    750 
    751 	dprint(FD_COMPRESS, "inflate chunk size=%lu, seq=%u",
    752 				(unsigned long) ic->len, ic->seq);
    753 
    754 	if (ic->seq != iter->seq) {
    755 		if (iter->seq)
    756 			finish_chunk(stream, f, iter);
    757 
    758 		z_stream_init(stream, gz_hdr);
    759 		iter->seq = ic->seq;
    760 	}
    761 
    762 	stream->avail_in = ic->len;
    763 	stream->next_in = ic->buf;
    764 
    765 	if (!iter->buf_size) {
    766 		iter->buf_size = iter->chunk_sz;
    767 		iter->buf = malloc(iter->buf_size);
    768 	}
    769 
    770 	while (stream->avail_in) {
    771 		size_t this_out = iter->buf_size - iter->buf_used;
    772 		int err;
    773 
    774 		stream->avail_out = this_out;
    775 		stream->next_out = iter->buf + iter->buf_used;
    776 
    777 		err = inflate(stream, Z_NO_FLUSH);
    778 		if (err < 0) {
    779 			log_err("fio: failed inflating log: %d\n", err);
    780 			iter->err = err;
    781 			break;
    782 		}
    783 
    784 		iter->buf_used += this_out - stream->avail_out;
    785 
    786 		if (!stream->avail_out) {
    787 			iter->buf_size += iter->chunk_sz;
    788 			iter->buf = realloc(iter->buf, iter->buf_size);
    789 			continue;
    790 		}
    791 
    792 		if (err == Z_STREAM_END)
    793 			break;
    794 	}
    795 
    796 	ret = (void *) stream->next_in - ic->buf;
    797 
    798 	dprint(FD_COMPRESS, "inflated to size=%lu\n", (unsigned long) ret);
    799 
    800 	return ret;
    801 }
    802 
    803 /*
    804  * Inflate stored compressed chunks, or write them directly to the log
    805  * file if so instructed.
    806  */
    807 static int inflate_gz_chunks(struct io_log *log, FILE *f)
    808 {
    809 	struct inflate_chunk_iter iter = { .chunk_sz = log->log_gz, };
    810 	z_stream stream;
    811 
    812 	while (!flist_empty(&log->chunk_list)) {
    813 		struct iolog_compress *ic;
    814 
    815 		ic = flist_first_entry(&log->chunk_list, struct iolog_compress, list);
    816 		flist_del(&ic->list);
    817 
    818 		if (log->log_gz_store) {
    819 			size_t ret;
    820 
    821 			dprint(FD_COMPRESS, "log write chunk size=%lu, "
    822 				"seq=%u\n", (unsigned long) ic->len, ic->seq);
    823 
    824 			ret = fwrite(ic->buf, ic->len, 1, f);
    825 			if (ret != 1 || ferror(f)) {
    826 				iter.err = errno;
    827 				log_err("fio: error writing compressed log\n");
    828 			}
    829 		} else
    830 			inflate_chunk(ic, log->log_gz_store, f, &stream, &iter);
    831 
    832 		free_chunk(ic);
    833 	}
    834 
    835 	if (iter.seq) {
    836 		finish_chunk(&stream, f, &iter);
    837 		free(iter.buf);
    838 	}
    839 
    840 	return iter.err;
    841 }
    842 
    843 /*
    844  * Open compressed log file and decompress the stored chunks and
    845  * write them to stdout. The chunks are stored sequentially in the
    846  * file, so we iterate over them and do them one-by-one.
    847  */
    848 int iolog_file_inflate(const char *file)
    849 {
    850 	struct inflate_chunk_iter iter = { .chunk_sz = 64 * 1024 * 1024, };
    851 	struct iolog_compress ic;
    852 	z_stream stream;
    853 	struct stat sb;
    854 	ssize_t ret;
    855 	size_t total;
    856 	void *buf;
    857 	FILE *f;
    858 
    859 	f = fopen(file, "r");
    860 	if (!f) {
    861 		perror("fopen");
    862 		return 1;
    863 	}
    864 
    865 	if (stat(file, &sb) < 0) {
    866 		fclose(f);
    867 		perror("stat");
    868 		return 1;
    869 	}
    870 
    871 	ic.buf = buf = malloc(sb.st_size);
    872 	ic.len = sb.st_size;
    873 	ic.seq = 1;
    874 
    875 	ret = fread(ic.buf, ic.len, 1, f);
    876 	if (ret < 0) {
    877 		perror("fread");
    878 		fclose(f);
    879 		free(buf);
    880 		return 1;
    881 	} else if (ret != 1) {
    882 		log_err("fio: short read on reading log\n");
    883 		fclose(f);
    884 		free(buf);
    885 		return 1;
    886 	}
    887 
    888 	fclose(f);
    889 
    890 	/*
    891 	 * Each chunk will return Z_STREAM_END. We don't know how many
    892 	 * chunks are in the file, so we just keep looping and incrementing
    893 	 * the sequence number until we have consumed the whole compressed
    894 	 * file.
    895 	 */
    896 	total = ic.len;
    897 	do {
    898 		size_t iret;
    899 
    900 		iret = inflate_chunk(&ic,  1, stdout, &stream, &iter);
    901 		total -= iret;
    902 		if (!total)
    903 			break;
    904 		if (iter.err)
    905 			break;
    906 
    907 		ic.seq++;
    908 		ic.len -= iret;
    909 		ic.buf += iret;
    910 	} while (1);
    911 
    912 	if (iter.seq) {
    913 		finish_chunk(&stream, stdout, &iter);
    914 		free(iter.buf);
    915 	}
    916 
    917 	free(buf);
    918 	return iter.err;
    919 }
    920 
    921 #else
    922 
    923 static int inflate_gz_chunks(struct io_log *log, FILE *f)
    924 {
    925 	return 0;
    926 }
    927 
    928 int iolog_file_inflate(const char *file)
    929 {
    930 	log_err("fio: log inflation not possible without zlib\n");
    931 	return 1;
    932 }
    933 
    934 #endif
    935 
    936 void flush_log(struct io_log *log)
    937 {
    938 	void *buf;
    939 	FILE *f;
    940 
    941 	f = fopen(log->filename, "w");
    942 	if (!f) {
    943 		perror("fopen log");
    944 		return;
    945 	}
    946 
    947 	buf = set_file_buffer(f);
    948 
    949 	inflate_gz_chunks(log, f);
    950 
    951 	flush_samples(f, log->log, log->nr_samples * log_entry_sz(log));
    952 
    953 	fclose(f);
    954 	clear_file_buffer(buf);
    955 }
    956 
    957 static int finish_log(struct thread_data *td, struct io_log *log, int trylock)
    958 {
    959 	if (td->tp_data)
    960 		iolog_flush(log, 1);
    961 
    962 	if (trylock) {
    963 		if (fio_trylock_file(log->filename))
    964 			return 1;
    965 	} else
    966 		fio_lock_file(log->filename);
    967 
    968 	if (td->client_type == FIO_CLIENT_TYPE_GUI)
    969 		fio_send_iolog(td, log, log->filename);
    970 	else
    971 		flush_log(log);
    972 
    973 	fio_unlock_file(log->filename);
    974 	free_log(log);
    975 	return 0;
    976 }
    977 
    978 #ifdef CONFIG_ZLIB
    979 
    980 /*
    981  * Invoked from our compress helper thread, when logging would have exceeded
    982  * the specified memory limitation. Compresses the previously stored
    983  * entries.
    984  */
    985 static int gz_work(struct tp_work *work)
    986 {
    987 	struct iolog_flush_data *data;
    988 	struct iolog_compress *c;
    989 	struct flist_head list;
    990 	unsigned int seq;
    991 	z_stream stream;
    992 	size_t total = 0;
    993 	int ret;
    994 
    995 	INIT_FLIST_HEAD(&list);
    996 
    997 	data = container_of(work, struct iolog_flush_data, work);
    998 
    999 	stream.zalloc = Z_NULL;
   1000 	stream.zfree = Z_NULL;
   1001 	stream.opaque = Z_NULL;
   1002 
   1003 	ret = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
   1004 	if (ret != Z_OK) {
   1005 		log_err("fio: failed to init gz stream\n");
   1006 		return 0;
   1007 	}
   1008 
   1009 	seq = ++data->log->chunk_seq;
   1010 
   1011 	stream.next_in = (void *) data->samples;
   1012 	stream.avail_in = data->nr_samples * log_entry_sz(data->log);
   1013 
   1014 	dprint(FD_COMPRESS, "deflate input size=%lu, seq=%u\n",
   1015 				(unsigned long) stream.avail_in, seq);
   1016 	do {
   1017 		c = get_new_chunk(seq);
   1018 		stream.avail_out = GZ_CHUNK;
   1019 		stream.next_out = c->buf;
   1020 		ret = deflate(&stream, Z_NO_FLUSH);
   1021 		if (ret < 0) {
   1022 			log_err("fio: deflate log (%d)\n", ret);
   1023 			free_chunk(c);
   1024 			goto err;
   1025 		}
   1026 
   1027 		c->len = GZ_CHUNK - stream.avail_out;
   1028 		flist_add_tail(&c->list, &list);
   1029 		total += c->len;
   1030 	} while (stream.avail_in);
   1031 
   1032 	stream.next_out = c->buf + c->len;
   1033 	stream.avail_out = GZ_CHUNK - c->len;
   1034 
   1035 	ret = deflate(&stream, Z_FINISH);
   1036 	if (ret == Z_STREAM_END)
   1037 		c->len = GZ_CHUNK - stream.avail_out;
   1038 	else {
   1039 		do {
   1040 			c = get_new_chunk(seq);
   1041 			stream.avail_out = GZ_CHUNK;
   1042 			stream.next_out = c->buf;
   1043 			ret = deflate(&stream, Z_FINISH);
   1044 			c->len = GZ_CHUNK - stream.avail_out;
   1045 			total += c->len;
   1046 			flist_add_tail(&c->list, &list);
   1047 		} while (ret != Z_STREAM_END);
   1048 	}
   1049 
   1050 	dprint(FD_COMPRESS, "deflated to size=%lu\n", (unsigned long) total);
   1051 
   1052 	ret = deflateEnd(&stream);
   1053 	if (ret != Z_OK)
   1054 		log_err("fio: deflateEnd %d\n", ret);
   1055 
   1056 	free(data->samples);
   1057 
   1058 	if (!flist_empty(&list)) {
   1059 		pthread_mutex_lock(&data->log->chunk_lock);
   1060 		flist_splice_tail(&list, &data->log->chunk_list);
   1061 		pthread_mutex_unlock(&data->log->chunk_lock);
   1062 	}
   1063 
   1064 	ret = 0;
   1065 done:
   1066 	if (work->wait) {
   1067 		work->done = 1;
   1068 		pthread_cond_signal(&work->cv);
   1069 	} else
   1070 		free(data);
   1071 
   1072 	return ret;
   1073 err:
   1074 	while (!flist_empty(&list)) {
   1075 		c = flist_first_entry(list.next, struct iolog_compress, list);
   1076 		flist_del(&c->list);
   1077 		free_chunk(c);
   1078 	}
   1079 	ret = 1;
   1080 	goto done;
   1081 }
   1082 
   1083 /*
   1084  * Queue work item to compress the existing log entries. We copy the
   1085  * samples, and reset the log sample count to 0 (so the logging will
   1086  * continue to use the memory associated with the log). If called with
   1087  * wait == 1, will not return until the log compression has completed.
   1088  */
   1089 int iolog_flush(struct io_log *log, int wait)
   1090 {
   1091 	struct tp_data *tdat = log->td->tp_data;
   1092 	struct iolog_flush_data *data;
   1093 	size_t sample_size;
   1094 
   1095 	data = malloc(sizeof(*data));
   1096 	if (!data)
   1097 		return 1;
   1098 
   1099 	data->log = log;
   1100 
   1101 	sample_size = log->nr_samples * log_entry_sz(log);
   1102 	data->samples = malloc(sample_size);
   1103 	if (!data->samples) {
   1104 		free(data);
   1105 		return 1;
   1106 	}
   1107 
   1108 	memcpy(data->samples, log->log, sample_size);
   1109 	data->nr_samples = log->nr_samples;
   1110 	data->work.fn = gz_work;
   1111 	log->nr_samples = 0;
   1112 
   1113 	if (wait) {
   1114 		pthread_mutex_init(&data->work.lock, NULL);
   1115 		pthread_cond_init(&data->work.cv, NULL);
   1116 		data->work.wait = 1;
   1117 	} else
   1118 		data->work.wait = 0;
   1119 
   1120 	data->work.prio = 1;
   1121 	tp_queue_work(tdat, &data->work);
   1122 
   1123 	if (wait) {
   1124 		pthread_mutex_lock(&data->work.lock);
   1125 		while (!data->work.done)
   1126 			pthread_cond_wait(&data->work.cv, &data->work.lock);
   1127 		pthread_mutex_unlock(&data->work.lock);
   1128 		free(data);
   1129 	}
   1130 
   1131 	return 0;
   1132 }
   1133 
   1134 #else
   1135 
   1136 int iolog_flush(struct io_log *log, int wait)
   1137 {
   1138 	return 1;
   1139 }
   1140 
   1141 #endif
   1142 
   1143 static int write_iops_log(struct thread_data *td, int try)
   1144 {
   1145 	struct io_log *log = td->iops_log;
   1146 
   1147 	if (!log)
   1148 		return 0;
   1149 
   1150 	return finish_log(td, log, try);
   1151 }
   1152 
   1153 static int write_slat_log(struct thread_data *td, int try)
   1154 {
   1155 	struct io_log *log = td->slat_log;
   1156 
   1157 	if (!log)
   1158 		return 0;
   1159 
   1160 	return finish_log(td, log, try);
   1161 }
   1162 
   1163 static int write_clat_log(struct thread_data *td, int try)
   1164 {
   1165 	struct io_log *log = td->clat_log;
   1166 
   1167 	if (!log)
   1168 		return 0;
   1169 
   1170 	return finish_log(td, log, try);
   1171 }
   1172 
   1173 static int write_lat_log(struct thread_data *td, int try)
   1174 {
   1175 	struct io_log *log = td->lat_log;
   1176 
   1177 	if (!log)
   1178 		return 0;
   1179 
   1180 	return finish_log(td, log, try);
   1181 }
   1182 
   1183 static int write_bandw_log(struct thread_data *td, int try)
   1184 {
   1185 	struct io_log *log = td->bw_log;
   1186 
   1187 	if (!log)
   1188 		return 0;
   1189 
   1190 	return finish_log(td, log, try);
   1191 }
   1192 
   1193 enum {
   1194 	BW_LOG_MASK	= 1,
   1195 	LAT_LOG_MASK	= 2,
   1196 	SLAT_LOG_MASK	= 4,
   1197 	CLAT_LOG_MASK	= 8,
   1198 	IOPS_LOG_MASK	= 16,
   1199 
   1200 	ALL_LOG_NR	= 5,
   1201 };
   1202 
   1203 struct log_type {
   1204 	unsigned int mask;
   1205 	int (*fn)(struct thread_data *, int);
   1206 };
   1207 
   1208 static struct log_type log_types[] = {
   1209 	{
   1210 		.mask	= BW_LOG_MASK,
   1211 		.fn	= write_bandw_log,
   1212 	},
   1213 	{
   1214 		.mask	= LAT_LOG_MASK,
   1215 		.fn	= write_lat_log,
   1216 	},
   1217 	{
   1218 		.mask	= SLAT_LOG_MASK,
   1219 		.fn	= write_slat_log,
   1220 	},
   1221 	{
   1222 		.mask	= CLAT_LOG_MASK,
   1223 		.fn	= write_clat_log,
   1224 	},
   1225 	{
   1226 		.mask	= IOPS_LOG_MASK,
   1227 		.fn	= write_iops_log,
   1228 	},
   1229 };
   1230 
   1231 void fio_writeout_logs(struct thread_data *td)
   1232 {
   1233 	unsigned int log_mask = 0;
   1234 	unsigned int log_left = ALL_LOG_NR;
   1235 	int old_state, i;
   1236 
   1237 	old_state = td_bump_runstate(td, TD_FINISHING);
   1238 
   1239 	finalize_logs(td);
   1240 
   1241 	while (log_left) {
   1242 		int prev_log_left = log_left;
   1243 
   1244 		for (i = 0; i < ALL_LOG_NR && log_left; i++) {
   1245 			struct log_type *lt = &log_types[i];
   1246 			int ret;
   1247 
   1248 			if (!(log_mask & lt->mask)) {
   1249 				ret = lt->fn(td, log_left != 1);
   1250 				if (!ret) {
   1251 					log_left--;
   1252 					log_mask |= lt->mask;
   1253 				}
   1254 			}
   1255 		}
   1256 
   1257 		if (prev_log_left == log_left)
   1258 			usleep(5000);
   1259 	}
   1260 
   1261 	td_restore_runstate(td, old_state);
   1262 }
   1263