Home | History | Annotate | Download | only in fio
      1 /*
      2  * fio - the flexible io tester
      3  *
      4  * Copyright (C) 2005 Jens Axboe <axboe (at) suse.de>
      5  * Copyright (C) 2006-2012 Jens Axboe <axboe (at) kernel.dk>
      6  *
      7  * The license below covers all files distributed with fio unless otherwise
      8  * noted in the file itself.
      9  *
     10  *  This program is free software; you can redistribute it and/or modify
     11  *  it under the terms of the GNU General Public License version 2 as
     12  *  published by the Free Software Foundation.
     13  *
     14  *  This program is distributed in the hope that it will be useful,
     15  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     16  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     17  *  GNU General Public License for more details.
     18  *
     19  *  You should have received a copy of the GNU General Public License
     20  *  along with this program; if not, write to the Free Software
     21  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
     22  *
     23  */
     24 #include <unistd.h>
     25 #include <fcntl.h>
     26 #include <string.h>
     27 #include <limits.h>
     28 #include <signal.h>
     29 #include <time.h>
     30 #include <locale.h>
     31 #include <assert.h>
     32 #include <time.h>
     33 #include <inttypes.h>
     34 #include <sys/stat.h>
     35 #include <sys/wait.h>
     36 #include <sys/ipc.h>
     37 #include <sys/mman.h>
     38 
     39 #include "fio.h"
     40 #ifndef FIO_NO_HAVE_SHM_H
     41 #include <sys/shm.h>
     42 #endif
     43 #include "hash.h"
     44 #include "smalloc.h"
     45 #include "verify.h"
     46 #include "trim.h"
     47 #include "diskutil.h"
     48 #include "cgroup.h"
     49 #include "profile.h"
     50 #include "lib/rand.h"
     51 #include "memalign.h"
     52 #include "server.h"
     53 #include "lib/getrusage.h"
     54 #include "idletime.h"
     55 #include "err.h"
     56 #include "lib/tp.h"
     57 
     58 static pthread_t helper_thread;
     59 static pthread_mutex_t helper_lock;
     60 pthread_cond_t helper_cond;
     61 int helper_do_stat = 0;
     62 
     63 static struct fio_mutex *startup_mutex;
     64 static struct flist_head *cgroup_list;
     65 static char *cgroup_mnt;
     66 static int exit_value;
     67 static volatile int fio_abort;
     68 static unsigned int nr_process = 0;
     69 static unsigned int nr_thread = 0;
     70 
     71 struct io_log *agg_io_log[DDIR_RWDIR_CNT];
     72 
     73 int groupid = 0;
     74 unsigned int thread_number = 0;
     75 unsigned int stat_number = 0;
     76 int shm_id = 0;
     77 int temp_stall_ts;
     78 unsigned long done_secs = 0;
     79 volatile int helper_exit = 0;
     80 
     81 #define PAGE_ALIGN(buf)	\
     82 	(char *) (((uintptr_t) (buf) + page_mask) & ~page_mask)
     83 
     84 #define JOB_START_TIMEOUT	(5 * 1000)
     85 
     86 static void sig_int(int sig)
     87 {
     88 	if (threads) {
     89 		if (is_backend)
     90 			fio_server_got_signal(sig);
     91 		else {
     92 			log_info("\nfio: terminating on signal %d\n", sig);
     93 			log_info_flush();
     94 			exit_value = 128;
     95 		}
     96 
     97 		fio_terminate_threads(TERMINATE_ALL);
     98 	}
     99 }
    100 
    101 static void sig_show_status(int sig)
    102 {
    103 	show_running_run_stats();
    104 }
    105 
    106 static void set_sig_handlers(void)
    107 {
    108 	struct sigaction act;
    109 
    110 	memset(&act, 0, sizeof(act));
    111 	act.sa_handler = sig_int;
    112 	act.sa_flags = SA_RESTART;
    113 	sigaction(SIGINT, &act, NULL);
    114 
    115 	memset(&act, 0, sizeof(act));
    116 	act.sa_handler = sig_int;
    117 	act.sa_flags = SA_RESTART;
    118 	sigaction(SIGTERM, &act, NULL);
    119 
    120 /* Windows uses SIGBREAK as a quit signal from other applications */
    121 #ifdef WIN32
    122 	memset(&act, 0, sizeof(act));
    123 	act.sa_handler = sig_int;
    124 	act.sa_flags = SA_RESTART;
    125 	sigaction(SIGBREAK, &act, NULL);
    126 #endif
    127 
    128 	memset(&act, 0, sizeof(act));
    129 	act.sa_handler = sig_show_status;
    130 	act.sa_flags = SA_RESTART;
    131 	sigaction(SIGUSR1, &act, NULL);
    132 
    133 	if (is_backend) {
    134 		memset(&act, 0, sizeof(act));
    135 		act.sa_handler = sig_int;
    136 		act.sa_flags = SA_RESTART;
    137 		sigaction(SIGPIPE, &act, NULL);
    138 	}
    139 }
    140 
    141 /*
    142  * Check if we are above the minimum rate given.
    143  */
    144 static int __check_min_rate(struct thread_data *td, struct timeval *now,
    145 			    enum fio_ddir ddir)
    146 {
    147 	unsigned long long bytes = 0;
    148 	unsigned long iops = 0;
    149 	unsigned long spent;
    150 	unsigned long rate;
    151 	unsigned int ratemin = 0;
    152 	unsigned int rate_iops = 0;
    153 	unsigned int rate_iops_min = 0;
    154 
    155 	assert(ddir_rw(ddir));
    156 
    157 	if (!td->o.ratemin[ddir] && !td->o.rate_iops_min[ddir])
    158 		return 0;
    159 
    160 	/*
    161 	 * allow a 2 second settle period in the beginning
    162 	 */
    163 	if (mtime_since(&td->start, now) < 2000)
    164 		return 0;
    165 
    166 	iops += td->this_io_blocks[ddir];
    167 	bytes += td->this_io_bytes[ddir];
    168 	ratemin += td->o.ratemin[ddir];
    169 	rate_iops += td->o.rate_iops[ddir];
    170 	rate_iops_min += td->o.rate_iops_min[ddir];
    171 
    172 	/*
    173 	 * if rate blocks is set, sample is running
    174 	 */
    175 	if (td->rate_bytes[ddir] || td->rate_blocks[ddir]) {
    176 		spent = mtime_since(&td->lastrate[ddir], now);
    177 		if (spent < td->o.ratecycle)
    178 			return 0;
    179 
    180 		if (td->o.rate[ddir]) {
    181 			/*
    182 			 * check bandwidth specified rate
    183 			 */
    184 			if (bytes < td->rate_bytes[ddir]) {
    185 				log_err("%s: min rate %u not met\n", td->o.name,
    186 								ratemin);
    187 				return 1;
    188 			} else {
    189 				if (spent)
    190 					rate = ((bytes - td->rate_bytes[ddir]) * 1000) / spent;
    191 				else
    192 					rate = 0;
    193 
    194 				if (rate < ratemin ||
    195 				    bytes < td->rate_bytes[ddir]) {
    196 					log_err("%s: min rate %u not met, got"
    197 						" %luKB/sec\n", td->o.name,
    198 							ratemin, rate);
    199 					return 1;
    200 				}
    201 			}
    202 		} else {
    203 			/*
    204 			 * checks iops specified rate
    205 			 */
    206 			if (iops < rate_iops) {
    207 				log_err("%s: min iops rate %u not met\n",
    208 						td->o.name, rate_iops);
    209 				return 1;
    210 			} else {
    211 				if (spent)
    212 					rate = ((iops - td->rate_blocks[ddir]) * 1000) / spent;
    213 				else
    214 					rate = 0;
    215 
    216 				if (rate < rate_iops_min ||
    217 				    iops < td->rate_blocks[ddir]) {
    218 					log_err("%s: min iops rate %u not met,"
    219 						" got %lu\n", td->o.name,
    220 							rate_iops_min, rate);
    221 				}
    222 			}
    223 		}
    224 	}
    225 
    226 	td->rate_bytes[ddir] = bytes;
    227 	td->rate_blocks[ddir] = iops;
    228 	memcpy(&td->lastrate[ddir], now, sizeof(*now));
    229 	return 0;
    230 }
    231 
    232 static int check_min_rate(struct thread_data *td, struct timeval *now,
    233 			  uint64_t *bytes_done)
    234 {
    235 	int ret = 0;
    236 
    237 	if (bytes_done[DDIR_READ])
    238 		ret |= __check_min_rate(td, now, DDIR_READ);
    239 	if (bytes_done[DDIR_WRITE])
    240 		ret |= __check_min_rate(td, now, DDIR_WRITE);
    241 	if (bytes_done[DDIR_TRIM])
    242 		ret |= __check_min_rate(td, now, DDIR_TRIM);
    243 
    244 	return ret;
    245 }
    246 
    247 /*
    248  * When job exits, we can cancel the in-flight IO if we are using async
    249  * io. Attempt to do so.
    250  */
    251 static void cleanup_pending_aio(struct thread_data *td)
    252 {
    253 	int r;
    254 
    255 	/*
    256 	 * get immediately available events, if any
    257 	 */
    258 	r = io_u_queued_complete(td, 0, NULL);
    259 	if (r < 0)
    260 		return;
    261 
    262 	/*
    263 	 * now cancel remaining active events
    264 	 */
    265 	if (td->io_ops->cancel) {
    266 		struct io_u *io_u;
    267 		int i;
    268 
    269 		io_u_qiter(&td->io_u_all, io_u, i) {
    270 			if (io_u->flags & IO_U_F_FLIGHT) {
    271 				r = td->io_ops->cancel(td, io_u);
    272 				if (!r)
    273 					put_io_u(td, io_u);
    274 			}
    275 		}
    276 	}
    277 
    278 	if (td->cur_depth)
    279 		r = io_u_queued_complete(td, td->cur_depth, NULL);
    280 }
    281 
    282 /*
    283  * Helper to handle the final sync of a file. Works just like the normal
    284  * io path, just does everything sync.
    285  */
    286 static int fio_io_sync(struct thread_data *td, struct fio_file *f)
    287 {
    288 	struct io_u *io_u = __get_io_u(td);
    289 	int ret;
    290 
    291 	if (!io_u)
    292 		return 1;
    293 
    294 	io_u->ddir = DDIR_SYNC;
    295 	io_u->file = f;
    296 
    297 	if (td_io_prep(td, io_u)) {
    298 		put_io_u(td, io_u);
    299 		return 1;
    300 	}
    301 
    302 requeue:
    303 	ret = td_io_queue(td, io_u);
    304 	if (ret < 0) {
    305 		td_verror(td, io_u->error, "td_io_queue");
    306 		put_io_u(td, io_u);
    307 		return 1;
    308 	} else if (ret == FIO_Q_QUEUED) {
    309 		if (io_u_queued_complete(td, 1, NULL) < 0)
    310 			return 1;
    311 	} else if (ret == FIO_Q_COMPLETED) {
    312 		if (io_u->error) {
    313 			td_verror(td, io_u->error, "td_io_queue");
    314 			return 1;
    315 		}
    316 
    317 		if (io_u_sync_complete(td, io_u, NULL) < 0)
    318 			return 1;
    319 	} else if (ret == FIO_Q_BUSY) {
    320 		if (td_io_commit(td))
    321 			return 1;
    322 		goto requeue;
    323 	}
    324 
    325 	return 0;
    326 }
    327 
    328 static int fio_file_fsync(struct thread_data *td, struct fio_file *f)
    329 {
    330 	int ret;
    331 
    332 	if (fio_file_open(f))
    333 		return fio_io_sync(td, f);
    334 
    335 	if (td_io_open_file(td, f))
    336 		return 1;
    337 
    338 	ret = fio_io_sync(td, f);
    339 	td_io_close_file(td, f);
    340 	return ret;
    341 }
    342 
    343 static inline void __update_tv_cache(struct thread_data *td)
    344 {
    345 	fio_gettime(&td->tv_cache, NULL);
    346 }
    347 
    348 static inline void update_tv_cache(struct thread_data *td)
    349 {
    350 	if ((++td->tv_cache_nr & td->tv_cache_mask) == td->tv_cache_mask)
    351 		__update_tv_cache(td);
    352 }
    353 
    354 static inline int runtime_exceeded(struct thread_data *td, struct timeval *t)
    355 {
    356 	if (in_ramp_time(td))
    357 		return 0;
    358 	if (!td->o.timeout)
    359 		return 0;
    360 	if (utime_since(&td->epoch, t) >= td->o.timeout)
    361 		return 1;
    362 
    363 	return 0;
    364 }
    365 
    366 static int break_on_this_error(struct thread_data *td, enum fio_ddir ddir,
    367 			       int *retptr)
    368 {
    369 	int ret = *retptr;
    370 
    371 	if (ret < 0 || td->error) {
    372 		int err = td->error;
    373 		enum error_type_bit eb;
    374 
    375 		if (ret < 0)
    376 			err = -ret;
    377 
    378 		eb = td_error_type(ddir, err);
    379 		if (!(td->o.continue_on_error & (1 << eb)))
    380 			return 1;
    381 
    382 		if (td_non_fatal_error(td, eb, err)) {
    383 		        /*
    384 		         * Continue with the I/Os in case of
    385 			 * a non fatal error.
    386 			 */
    387 			update_error_count(td, err);
    388 			td_clear_error(td);
    389 			*retptr = 0;
    390 			return 0;
    391 		} else if (td->o.fill_device && err == ENOSPC) {
    392 			/*
    393 			 * We expect to hit this error if
    394 			 * fill_device option is set.
    395 			 */
    396 			td_clear_error(td);
    397 			fio_mark_td_terminate(td);
    398 			return 1;
    399 		} else {
    400 			/*
    401 			 * Stop the I/O in case of a fatal
    402 			 * error.
    403 			 */
    404 			update_error_count(td, err);
    405 			return 1;
    406 		}
    407 	}
    408 
    409 	return 0;
    410 }
    411 
    412 static void check_update_rusage(struct thread_data *td)
    413 {
    414 	if (td->update_rusage) {
    415 		td->update_rusage = 0;
    416 		update_rusage_stat(td);
    417 		fio_mutex_up(td->rusage_sem);
    418 	}
    419 }
    420 
    421 static int wait_for_completions(struct thread_data *td, struct timeval *time,
    422 				uint64_t *bytes_done)
    423 {
    424 	const int full = queue_full(td);
    425 	int min_evts = 0;
    426 	int ret;
    427 
    428 	/*
    429 	 * if the queue is full, we MUST reap at least 1 event
    430 	 */
    431 	min_evts = min(td->o.iodepth_batch_complete, td->cur_depth);
    432 	if (full && !min_evts)
    433 		min_evts = 1;
    434 
    435 	if (time && (__should_check_rate(td, DDIR_READ) ||
    436 	    __should_check_rate(td, DDIR_WRITE) ||
    437 	    __should_check_rate(td, DDIR_TRIM)))
    438 		fio_gettime(time, NULL);
    439 
    440 	do {
    441 		ret = io_u_queued_complete(td, min_evts, bytes_done);
    442 		if (ret < 0)
    443 			break;
    444 	} while (full && (td->cur_depth > td->o.iodepth_low));
    445 
    446 	return ret;
    447 }
    448 
    449 /*
    450  * The main verify engine. Runs over the writes we previously submitted,
    451  * reads the blocks back in, and checks the crc/md5 of the data.
    452  */
    453 static void do_verify(struct thread_data *td, uint64_t verify_bytes)
    454 {
    455 	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
    456 	struct fio_file *f;
    457 	struct io_u *io_u;
    458 	int ret, min_events;
    459 	unsigned int i;
    460 
    461 	dprint(FD_VERIFY, "starting loop\n");
    462 
    463 	/*
    464 	 * sync io first and invalidate cache, to make sure we really
    465 	 * read from disk.
    466 	 */
    467 	for_each_file(td, f, i) {
    468 		if (!fio_file_open(f))
    469 			continue;
    470 		if (fio_io_sync(td, f))
    471 			break;
    472 		if (file_invalidate_cache(td, f))
    473 			break;
    474 	}
    475 
    476 	check_update_rusage(td);
    477 
    478 	if (td->error)
    479 		return;
    480 
    481 	td_set_runstate(td, TD_VERIFYING);
    482 
    483 	io_u = NULL;
    484 	while (!td->terminate) {
    485 		enum fio_ddir ddir;
    486 		int ret2, full;
    487 
    488 		update_tv_cache(td);
    489 		check_update_rusage(td);
    490 
    491 		if (runtime_exceeded(td, &td->tv_cache)) {
    492 			__update_tv_cache(td);
    493 			if (runtime_exceeded(td, &td->tv_cache)) {
    494 				fio_mark_td_terminate(td);
    495 				break;
    496 			}
    497 		}
    498 
    499 		if (flow_threshold_exceeded(td))
    500 			continue;
    501 
    502 		if (!td->o.experimental_verify) {
    503 			io_u = __get_io_u(td);
    504 			if (!io_u)
    505 				break;
    506 
    507 			if (get_next_verify(td, io_u)) {
    508 				put_io_u(td, io_u);
    509 				break;
    510 			}
    511 
    512 			if (td_io_prep(td, io_u)) {
    513 				put_io_u(td, io_u);
    514 				break;
    515 			}
    516 		} else {
    517 			if (ddir_rw_sum(bytes_done) + td->o.rw_min_bs > verify_bytes)
    518 				break;
    519 
    520 			while ((io_u = get_io_u(td)) != NULL) {
    521 				if (IS_ERR(io_u)) {
    522 					io_u = NULL;
    523 					ret = FIO_Q_BUSY;
    524 					goto reap;
    525 				}
    526 
    527 				/*
    528 				 * We are only interested in the places where
    529 				 * we wrote or trimmed IOs. Turn those into
    530 				 * reads for verification purposes.
    531 				 */
    532 				if (io_u->ddir == DDIR_READ) {
    533 					/*
    534 					 * Pretend we issued it for rwmix
    535 					 * accounting
    536 					 */
    537 					td->io_issues[DDIR_READ]++;
    538 					put_io_u(td, io_u);
    539 					continue;
    540 				} else if (io_u->ddir == DDIR_TRIM) {
    541 					io_u->ddir = DDIR_READ;
    542 					io_u->flags |= IO_U_F_TRIMMED;
    543 					break;
    544 				} else if (io_u->ddir == DDIR_WRITE) {
    545 					io_u->ddir = DDIR_READ;
    546 					break;
    547 				} else {
    548 					put_io_u(td, io_u);
    549 					continue;
    550 				}
    551 			}
    552 
    553 			if (!io_u)
    554 				break;
    555 		}
    556 
    557 		if (verify_state_should_stop(td, io_u)) {
    558 			put_io_u(td, io_u);
    559 			break;
    560 		}
    561 
    562 		if (td->o.verify_async)
    563 			io_u->end_io = verify_io_u_async;
    564 		else
    565 			io_u->end_io = verify_io_u;
    566 
    567 		ddir = io_u->ddir;
    568 		if (!td->o.disable_slat)
    569 			fio_gettime(&io_u->start_time, NULL);
    570 
    571 		ret = td_io_queue(td, io_u);
    572 		switch (ret) {
    573 		case FIO_Q_COMPLETED:
    574 			if (io_u->error) {
    575 				ret = -io_u->error;
    576 				clear_io_u(td, io_u);
    577 			} else if (io_u->resid) {
    578 				int bytes = io_u->xfer_buflen - io_u->resid;
    579 
    580 				/*
    581 				 * zero read, fail
    582 				 */
    583 				if (!bytes) {
    584 					td_verror(td, EIO, "full resid");
    585 					put_io_u(td, io_u);
    586 					break;
    587 				}
    588 
    589 				io_u->xfer_buflen = io_u->resid;
    590 				io_u->xfer_buf += bytes;
    591 				io_u->offset += bytes;
    592 
    593 				if (ddir_rw(io_u->ddir))
    594 					td->ts.short_io_u[io_u->ddir]++;
    595 
    596 				f = io_u->file;
    597 				if (io_u->offset == f->real_file_size)
    598 					goto sync_done;
    599 
    600 				requeue_io_u(td, &io_u);
    601 			} else {
    602 sync_done:
    603 				ret = io_u_sync_complete(td, io_u, bytes_done);
    604 				if (ret < 0)
    605 					break;
    606 			}
    607 			continue;
    608 		case FIO_Q_QUEUED:
    609 			break;
    610 		case FIO_Q_BUSY:
    611 			requeue_io_u(td, &io_u);
    612 			ret2 = td_io_commit(td);
    613 			if (ret2 < 0)
    614 				ret = ret2;
    615 			break;
    616 		default:
    617 			assert(ret < 0);
    618 			td_verror(td, -ret, "td_io_queue");
    619 			break;
    620 		}
    621 
    622 		if (break_on_this_error(td, ddir, &ret))
    623 			break;
    624 
    625 		/*
    626 		 * if we can queue more, do so. but check if there are
    627 		 * completed io_u's first. Note that we can get BUSY even
    628 		 * without IO queued, if the system is resource starved.
    629 		 */
    630 reap:
    631 		full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
    632 		if (full || !td->o.iodepth_batch_complete)
    633 			ret = wait_for_completions(td, NULL, bytes_done);
    634 
    635 		if (ret < 0)
    636 			break;
    637 	}
    638 
    639 	check_update_rusage(td);
    640 
    641 	if (!td->error) {
    642 		min_events = td->cur_depth;
    643 
    644 		if (min_events)
    645 			ret = io_u_queued_complete(td, min_events, NULL);
    646 	} else
    647 		cleanup_pending_aio(td);
    648 
    649 	td_set_runstate(td, TD_RUNNING);
    650 
    651 	dprint(FD_VERIFY, "exiting loop\n");
    652 }
    653 
    654 static unsigned int exceeds_number_ios(struct thread_data *td)
    655 {
    656 	unsigned long long number_ios;
    657 
    658 	if (!td->o.number_ios)
    659 		return 0;
    660 
    661 	number_ios = ddir_rw_sum(td->io_blocks);
    662 	number_ios += td->io_u_queued + td->io_u_in_flight;
    663 
    664 	return number_ios >= (td->o.number_ios * td->loops);
    665 }
    666 
    667 static int io_issue_bytes_exceeded(struct thread_data *td)
    668 {
    669 	unsigned long long bytes, limit;
    670 
    671 	if (td_rw(td))
    672 		bytes = td->io_issue_bytes[DDIR_READ] + td->io_issue_bytes[DDIR_WRITE];
    673 	else if (td_write(td))
    674 		bytes = td->io_issue_bytes[DDIR_WRITE];
    675 	else if (td_read(td))
    676 		bytes = td->io_issue_bytes[DDIR_READ];
    677 	else
    678 		bytes = td->io_issue_bytes[DDIR_TRIM];
    679 
    680 	if (td->o.io_limit)
    681 		limit = td->o.io_limit;
    682 	else
    683 		limit = td->o.size;
    684 
    685 	limit *= td->loops;
    686 	return bytes >= limit || exceeds_number_ios(td);
    687 }
    688 
    689 static int io_complete_bytes_exceeded(struct thread_data *td)
    690 {
    691 	unsigned long long bytes, limit;
    692 
    693 	if (td_rw(td))
    694 		bytes = td->this_io_bytes[DDIR_READ] + td->this_io_bytes[DDIR_WRITE];
    695 	else if (td_write(td))
    696 		bytes = td->this_io_bytes[DDIR_WRITE];
    697 	else if (td_read(td))
    698 		bytes = td->this_io_bytes[DDIR_READ];
    699 	else
    700 		bytes = td->this_io_bytes[DDIR_TRIM];
    701 
    702 	if (td->o.io_limit)
    703 		limit = td->o.io_limit;
    704 	else
    705 		limit = td->o.size;
    706 
    707 	limit *= td->loops;
    708 	return bytes >= limit || exceeds_number_ios(td);
    709 }
    710 
    711 /*
    712  * Main IO worker function. It retrieves io_u's to process and queues
    713  * and reaps them, checking for rate and errors along the way.
    714  *
    715  * Returns number of bytes written and trimmed.
    716  */
    717 static uint64_t do_io(struct thread_data *td)
    718 {
    719 	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
    720 	unsigned int i;
    721 	int ret = 0;
    722 	uint64_t total_bytes, bytes_issued = 0;
    723 
    724 	if (in_ramp_time(td))
    725 		td_set_runstate(td, TD_RAMP);
    726 	else
    727 		td_set_runstate(td, TD_RUNNING);
    728 
    729 	lat_target_init(td);
    730 
    731 	total_bytes = td->o.size;
    732 	/*
    733 	* Allow random overwrite workloads to write up to io_limit
    734 	* before starting verification phase as 'size' doesn't apply.
    735 	*/
    736 	if (td_write(td) && td_random(td) && td->o.norandommap)
    737 		total_bytes = max(total_bytes, (uint64_t) td->o.io_limit);
    738 	/*
    739 	 * If verify_backlog is enabled, we'll run the verify in this
    740 	 * handler as well. For that case, we may need up to twice the
    741 	 * amount of bytes.
    742 	 */
    743 	if (td->o.verify != VERIFY_NONE &&
    744 	   (td_write(td) && td->o.verify_backlog))
    745 		total_bytes += td->o.size;
    746 
    747 	while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
    748 		(!flist_empty(&td->trim_list)) || !io_issue_bytes_exceeded(td) ||
    749 		td->o.time_based) {
    750 		struct timeval comp_time;
    751 		struct io_u *io_u;
    752 		int ret2, full;
    753 		enum fio_ddir ddir;
    754 
    755 		check_update_rusage(td);
    756 
    757 		if (td->terminate || td->done)
    758 			break;
    759 
    760 		update_tv_cache(td);
    761 
    762 		if (runtime_exceeded(td, &td->tv_cache)) {
    763 			__update_tv_cache(td);
    764 			if (runtime_exceeded(td, &td->tv_cache)) {
    765 				fio_mark_td_terminate(td);
    766 				break;
    767 			}
    768 		}
    769 
    770 		if (flow_threshold_exceeded(td))
    771 			continue;
    772 
    773 		if (bytes_issued >= total_bytes)
    774 			break;
    775 
    776 		io_u = get_io_u(td);
    777 		if (IS_ERR_OR_NULL(io_u)) {
    778 			int err = PTR_ERR(io_u);
    779 
    780 			io_u = NULL;
    781 			if (err == -EBUSY) {
    782 				ret = FIO_Q_BUSY;
    783 				goto reap;
    784 			}
    785 			if (td->o.latency_target)
    786 				goto reap;
    787 			break;
    788 		}
    789 
    790 		ddir = io_u->ddir;
    791 
    792 		/*
    793 		 * Add verification end_io handler if:
    794 		 *	- Asked to verify (!td_rw(td))
    795 		 *	- Or the io_u is from our verify list (mixed write/ver)
    796 		 */
    797 		if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ &&
    798 		    ((io_u->flags & IO_U_F_VER_LIST) || !td_rw(td))) {
    799 
    800 			if (!td->o.verify_pattern_bytes) {
    801 				io_u->rand_seed = __rand(&td->verify_state);
    802 				if (sizeof(int) != sizeof(long *))
    803 					io_u->rand_seed *= __rand(&td->verify_state);
    804 			}
    805 
    806 			if (verify_state_should_stop(td, io_u)) {
    807 				put_io_u(td, io_u);
    808 				break;
    809 			}
    810 
    811 			if (td->o.verify_async)
    812 				io_u->end_io = verify_io_u_async;
    813 			else
    814 				io_u->end_io = verify_io_u;
    815 			td_set_runstate(td, TD_VERIFYING);
    816 		} else if (in_ramp_time(td))
    817 			td_set_runstate(td, TD_RAMP);
    818 		else
    819 			td_set_runstate(td, TD_RUNNING);
    820 
    821 		/*
    822 		 * Always log IO before it's issued, so we know the specific
    823 		 * order of it. The logged unit will track when the IO has
    824 		 * completed.
    825 		 */
    826 		if (td_write(td) && io_u->ddir == DDIR_WRITE &&
    827 		    td->o.do_verify &&
    828 		    td->o.verify != VERIFY_NONE &&
    829 		    !td->o.experimental_verify)
    830 			log_io_piece(td, io_u);
    831 
    832 		ret = td_io_queue(td, io_u);
    833 		switch (ret) {
    834 		case FIO_Q_COMPLETED:
    835 			if (io_u->error) {
    836 				ret = -io_u->error;
    837 				unlog_io_piece(td, io_u);
    838 				clear_io_u(td, io_u);
    839 			} else if (io_u->resid) {
    840 				int bytes = io_u->xfer_buflen - io_u->resid;
    841 				struct fio_file *f = io_u->file;
    842 
    843 				bytes_issued += bytes;
    844 
    845 				trim_io_piece(td, io_u);
    846 
    847 				/*
    848 				 * zero read, fail
    849 				 */
    850 				if (!bytes) {
    851 					unlog_io_piece(td, io_u);
    852 					td_verror(td, EIO, "full resid");
    853 					put_io_u(td, io_u);
    854 					break;
    855 				}
    856 
    857 				io_u->xfer_buflen = io_u->resid;
    858 				io_u->xfer_buf += bytes;
    859 				io_u->offset += bytes;
    860 
    861 				if (ddir_rw(io_u->ddir))
    862 					td->ts.short_io_u[io_u->ddir]++;
    863 
    864 				if (io_u->offset == f->real_file_size)
    865 					goto sync_done;
    866 
    867 				requeue_io_u(td, &io_u);
    868 			} else {
    869 sync_done:
    870 				if (__should_check_rate(td, DDIR_READ) ||
    871 				    __should_check_rate(td, DDIR_WRITE) ||
    872 				    __should_check_rate(td, DDIR_TRIM))
    873 					fio_gettime(&comp_time, NULL);
    874 
    875 				ret = io_u_sync_complete(td, io_u, bytes_done);
    876 				if (ret < 0)
    877 					break;
    878 				bytes_issued += io_u->xfer_buflen;
    879 			}
    880 			break;
    881 		case FIO_Q_QUEUED:
    882 			/*
    883 			 * if the engine doesn't have a commit hook,
    884 			 * the io_u is really queued. if it does have such
    885 			 * a hook, it has to call io_u_queued() itself.
    886 			 */
    887 			if (td->io_ops->commit == NULL)
    888 				io_u_queued(td, io_u);
    889 			bytes_issued += io_u->xfer_buflen;
    890 			break;
    891 		case FIO_Q_BUSY:
    892 			unlog_io_piece(td, io_u);
    893 			requeue_io_u(td, &io_u);
    894 			ret2 = td_io_commit(td);
    895 			if (ret2 < 0)
    896 				ret = ret2;
    897 			break;
    898 		default:
    899 			assert(ret < 0);
    900 			put_io_u(td, io_u);
    901 			break;
    902 		}
    903 
    904 		if (break_on_this_error(td, ddir, &ret))
    905 			break;
    906 
    907 		/*
    908 		 * See if we need to complete some commands. Note that we
    909 		 * can get BUSY even without IO queued, if the system is
    910 		 * resource starved.
    911 		 */
    912 reap:
    913 		full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
    914 		if (full || !td->o.iodepth_batch_complete)
    915 			ret = wait_for_completions(td, &comp_time, bytes_done);
    916 		if (ret < 0)
    917 			break;
    918 		if (!ddir_rw_sum(bytes_done) && !(td->io_ops->flags & FIO_NOIO))
    919 			continue;
    920 
    921 		if (!in_ramp_time(td) && should_check_rate(td, bytes_done)) {
    922 			if (check_min_rate(td, &comp_time, bytes_done)) {
    923 				if (exitall_on_terminate)
    924 					fio_terminate_threads(td->groupid);
    925 				td_verror(td, EIO, "check_min_rate");
    926 				break;
    927 			}
    928 		}
    929 		if (!in_ramp_time(td) && td->o.latency_target)
    930 			lat_target_check(td);
    931 
    932 		if (td->o.thinktime) {
    933 			unsigned long long b;
    934 
    935 			b = ddir_rw_sum(td->io_blocks);
    936 			if (!(b % td->o.thinktime_blocks)) {
    937 				int left;
    938 
    939 				io_u_quiesce(td);
    940 
    941 				if (td->o.thinktime_spin)
    942 					usec_spin(td->o.thinktime_spin);
    943 
    944 				left = td->o.thinktime - td->o.thinktime_spin;
    945 				if (left)
    946 					usec_sleep(td, left);
    947 			}
    948 		}
    949 	}
    950 
    951 	check_update_rusage(td);
    952 
    953 	if (td->trim_entries)
    954 		log_err("fio: %lu trim entries leaked?\n", td->trim_entries);
    955 
    956 	if (td->o.fill_device && td->error == ENOSPC) {
    957 		td->error = 0;
    958 		fio_mark_td_terminate(td);
    959 	}
    960 	if (!td->error) {
    961 		struct fio_file *f;
    962 
    963 		i = td->cur_depth;
    964 		if (i) {
    965 			ret = io_u_queued_complete(td, i, bytes_done);
    966 			if (td->o.fill_device && td->error == ENOSPC)
    967 				td->error = 0;
    968 		}
    969 
    970 		if (should_fsync(td) && td->o.end_fsync) {
    971 			td_set_runstate(td, TD_FSYNCING);
    972 
    973 			for_each_file(td, f, i) {
    974 				if (!fio_file_fsync(td, f))
    975 					continue;
    976 
    977 				log_err("fio: end_fsync failed for file %s\n",
    978 								f->file_name);
    979 			}
    980 		}
    981 	} else
    982 		cleanup_pending_aio(td);
    983 
    984 	/*
    985 	 * stop job if we failed doing any IO
    986 	 */
    987 	if (!ddir_rw_sum(td->this_io_bytes))
    988 		td->done = 1;
    989 
    990 	return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
    991 }
    992 
    993 static void cleanup_io_u(struct thread_data *td)
    994 {
    995 	struct io_u *io_u;
    996 
    997 	while ((io_u = io_u_qpop(&td->io_u_freelist)) != NULL) {
    998 
    999 		if (td->io_ops->io_u_free)
   1000 			td->io_ops->io_u_free(td, io_u);
   1001 
   1002 		fio_memfree(io_u, sizeof(*io_u));
   1003 	}
   1004 
   1005 	free_io_mem(td);
   1006 
   1007 	io_u_rexit(&td->io_u_requeues);
   1008 	io_u_qexit(&td->io_u_freelist);
   1009 	io_u_qexit(&td->io_u_all);
   1010 
   1011 	if (td->last_write_comp)
   1012 		sfree(td->last_write_comp);
   1013 }
   1014 
   1015 static int init_io_u(struct thread_data *td)
   1016 {
   1017 	struct io_u *io_u;
   1018 	unsigned int max_bs, min_write;
   1019 	int cl_align, i, max_units;
   1020 	int data_xfer = 1, err;
   1021 	char *p;
   1022 
   1023 	max_units = td->o.iodepth;
   1024 	max_bs = td_max_bs(td);
   1025 	min_write = td->o.min_bs[DDIR_WRITE];
   1026 	td->orig_buffer_size = (unsigned long long) max_bs
   1027 					* (unsigned long long) max_units;
   1028 
   1029 	if ((td->io_ops->flags & FIO_NOIO) || !(td_read(td) || td_write(td)))
   1030 		data_xfer = 0;
   1031 
   1032 	err = 0;
   1033 	err += io_u_rinit(&td->io_u_requeues, td->o.iodepth);
   1034 	err += io_u_qinit(&td->io_u_freelist, td->o.iodepth);
   1035 	err += io_u_qinit(&td->io_u_all, td->o.iodepth);
   1036 
   1037 	if (err) {
   1038 		log_err("fio: failed setting up IO queues\n");
   1039 		return 1;
   1040 	}
   1041 
   1042 	/*
   1043 	 * if we may later need to do address alignment, then add any
   1044 	 * possible adjustment here so that we don't cause a buffer
   1045 	 * overflow later. this adjustment may be too much if we get
   1046 	 * lucky and the allocator gives us an aligned address.
   1047 	 */
   1048 	if (td->o.odirect || td->o.mem_align || td->o.oatomic ||
   1049 	    (td->io_ops->flags & FIO_RAWIO))
   1050 		td->orig_buffer_size += page_mask + td->o.mem_align;
   1051 
   1052 	if (td->o.mem_type == MEM_SHMHUGE || td->o.mem_type == MEM_MMAPHUGE) {
   1053 		unsigned long bs;
   1054 
   1055 		bs = td->orig_buffer_size + td->o.hugepage_size - 1;
   1056 		td->orig_buffer_size = bs & ~(td->o.hugepage_size - 1);
   1057 	}
   1058 
   1059 	if (td->orig_buffer_size != (size_t) td->orig_buffer_size) {
   1060 		log_err("fio: IO memory too large. Reduce max_bs or iodepth\n");
   1061 		return 1;
   1062 	}
   1063 
   1064 	if (data_xfer && allocate_io_mem(td))
   1065 		return 1;
   1066 
   1067 	if (td->o.odirect || td->o.mem_align || td->o.oatomic ||
   1068 	    (td->io_ops->flags & FIO_RAWIO))
   1069 		p = PAGE_ALIGN(td->orig_buffer) + td->o.mem_align;
   1070 	else
   1071 		p = td->orig_buffer;
   1072 
   1073 	cl_align = os_cache_line_size();
   1074 
   1075 	for (i = 0; i < max_units; i++) {
   1076 		void *ptr;
   1077 
   1078 		if (td->terminate)
   1079 			return 1;
   1080 
   1081 		ptr = fio_memalign(cl_align, sizeof(*io_u));
   1082 		if (!ptr) {
   1083 			log_err("fio: unable to allocate aligned memory\n");
   1084 			break;
   1085 		}
   1086 
   1087 		io_u = ptr;
   1088 		memset(io_u, 0, sizeof(*io_u));
   1089 		INIT_FLIST_HEAD(&io_u->verify_list);
   1090 		dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i);
   1091 
   1092 		if (data_xfer) {
   1093 			io_u->buf = p;
   1094 			dprint(FD_MEM, "io_u %p, mem %p\n", io_u, io_u->buf);
   1095 
   1096 			if (td_write(td))
   1097 				io_u_fill_buffer(td, io_u, min_write, max_bs);
   1098 			if (td_write(td) && td->o.verify_pattern_bytes) {
   1099 				/*
   1100 				 * Fill the buffer with the pattern if we are
   1101 				 * going to be doing writes.
   1102 				 */
   1103 				fill_verify_pattern(td, io_u->buf, max_bs, io_u, 0, 0);
   1104 			}
   1105 		}
   1106 
   1107 		io_u->index = i;
   1108 		io_u->flags = IO_U_F_FREE;
   1109 		io_u_qpush(&td->io_u_freelist, io_u);
   1110 
   1111 		/*
   1112 		 * io_u never leaves this stack, used for iteration of all
   1113 		 * io_u buffers.
   1114 		 */
   1115 		io_u_qpush(&td->io_u_all, io_u);
   1116 
   1117 		if (td->io_ops->io_u_init) {
   1118 			int ret = td->io_ops->io_u_init(td, io_u);
   1119 
   1120 			if (ret) {
   1121 				log_err("fio: failed to init engine data: %d\n", ret);
   1122 				return 1;
   1123 			}
   1124 		}
   1125 
   1126 		p += max_bs;
   1127 	}
   1128 
   1129 	if (td->o.verify != VERIFY_NONE) {
   1130 		td->last_write_comp = scalloc(max_units, sizeof(uint64_t));
   1131 		if (!td->last_write_comp) {
   1132 			log_err("fio: failed to alloc write comp data\n");
   1133 			return 1;
   1134 		}
   1135 	}
   1136 
   1137 	return 0;
   1138 }
   1139 
   1140 static int switch_ioscheduler(struct thread_data *td)
   1141 {
   1142 	char tmp[256], tmp2[128];
   1143 	FILE *f;
   1144 	int ret;
   1145 
   1146 	if (td->io_ops->flags & FIO_DISKLESSIO)
   1147 		return 0;
   1148 
   1149 	sprintf(tmp, "%s/queue/scheduler", td->sysfs_root);
   1150 
   1151 	f = fopen(tmp, "r+");
   1152 	if (!f) {
   1153 		if (errno == ENOENT) {
   1154 			log_err("fio: os or kernel doesn't support IO scheduler"
   1155 				" switching\n");
   1156 			return 0;
   1157 		}
   1158 		td_verror(td, errno, "fopen iosched");
   1159 		return 1;
   1160 	}
   1161 
   1162 	/*
   1163 	 * Set io scheduler.
   1164 	 */
   1165 	ret = fwrite(td->o.ioscheduler, strlen(td->o.ioscheduler), 1, f);
   1166 	if (ferror(f) || ret != 1) {
   1167 		td_verror(td, errno, "fwrite");
   1168 		fclose(f);
   1169 		return 1;
   1170 	}
   1171 
   1172 	rewind(f);
   1173 
   1174 	/*
   1175 	 * Read back and check that the selected scheduler is now the default.
   1176 	 */
   1177 	ret = fread(tmp, sizeof(tmp), 1, f);
   1178 	if (ferror(f) || ret < 0) {
   1179 		td_verror(td, errno, "fread");
   1180 		fclose(f);
   1181 		return 1;
   1182 	}
   1183 	tmp[sizeof(tmp) - 1] = '\0';
   1184 
   1185 
   1186 	sprintf(tmp2, "[%s]", td->o.ioscheduler);
   1187 	if (!strstr(tmp, tmp2)) {
   1188 		log_err("fio: io scheduler %s not found\n", td->o.ioscheduler);
   1189 		td_verror(td, EINVAL, "iosched_switch");
   1190 		fclose(f);
   1191 		return 1;
   1192 	}
   1193 
   1194 	fclose(f);
   1195 	return 0;
   1196 }
   1197 
   1198 static int keep_running(struct thread_data *td)
   1199 {
   1200 	unsigned long long limit;
   1201 
   1202 	if (td->done)
   1203 		return 0;
   1204 	if (td->o.time_based)
   1205 		return 1;
   1206 	if (td->o.loops) {
   1207 		td->o.loops--;
   1208 		return 1;
   1209 	}
   1210 	if (exceeds_number_ios(td))
   1211 		return 0;
   1212 
   1213 	if (td->o.io_limit)
   1214 		limit = td->o.io_limit;
   1215 	else
   1216 		limit = td->o.size;
   1217 
   1218 	if (limit != -1ULL && ddir_rw_sum(td->io_bytes) < limit) {
   1219 		uint64_t diff;
   1220 
   1221 		/*
   1222 		 * If the difference is less than the minimum IO size, we
   1223 		 * are done.
   1224 		 */
   1225 		diff = limit - ddir_rw_sum(td->io_bytes);
   1226 		if (diff < td_max_bs(td))
   1227 			return 0;
   1228 
   1229 		if (fio_files_done(td))
   1230 			return 0;
   1231 
   1232 		return 1;
   1233 	}
   1234 
   1235 	return 0;
   1236 }
   1237 
   1238 static int exec_string(struct thread_options *o, const char *string, const char *mode)
   1239 {
   1240 	int ret, newlen = strlen(string) + strlen(o->name) + strlen(mode) + 9 + 1;
   1241 	char *str;
   1242 
   1243 	str = malloc(newlen);
   1244 	sprintf(str, "%s &> %s.%s.txt", string, o->name, mode);
   1245 
   1246 	log_info("%s : Saving output of %s in %s.%s.txt\n",o->name, mode, o->name, mode);
   1247 	ret = system(str);
   1248 	if (ret == -1)
   1249 		log_err("fio: exec of cmd <%s> failed\n", str);
   1250 
   1251 	free(str);
   1252 	return ret;
   1253 }
   1254 
   1255 /*
   1256  * Dry run to compute correct state of numberio for verification.
   1257  */
   1258 static uint64_t do_dry_run(struct thread_data *td)
   1259 {
   1260 	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
   1261 
   1262 	td_set_runstate(td, TD_RUNNING);
   1263 
   1264 	while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
   1265 		(!flist_empty(&td->trim_list)) || !io_complete_bytes_exceeded(td)) {
   1266 		struct io_u *io_u;
   1267 		int ret;
   1268 
   1269 		if (td->terminate || td->done)
   1270 			break;
   1271 
   1272 		io_u = get_io_u(td);
   1273 		if (!io_u)
   1274 			break;
   1275 
   1276 		io_u->flags |= IO_U_F_FLIGHT;
   1277 		io_u->error = 0;
   1278 		io_u->resid = 0;
   1279 		if (ddir_rw(acct_ddir(io_u)))
   1280 			td->io_issues[acct_ddir(io_u)]++;
   1281 		if (ddir_rw(io_u->ddir)) {
   1282 			io_u_mark_depth(td, 1);
   1283 			td->ts.total_io_u[io_u->ddir]++;
   1284 		}
   1285 
   1286 		if (td_write(td) && io_u->ddir == DDIR_WRITE &&
   1287 		    td->o.do_verify &&
   1288 		    td->o.verify != VERIFY_NONE &&
   1289 		    !td->o.experimental_verify)
   1290 			log_io_piece(td, io_u);
   1291 
   1292 		ret = io_u_sync_complete(td, io_u, bytes_done);
   1293 		(void) ret;
   1294 	}
   1295 
   1296 	return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
   1297 }
   1298 
   1299 /*
   1300  * Entry point for the thread based jobs. The process based jobs end up
   1301  * here as well, after a little setup.
   1302  */
   1303 static void *thread_main(void *data)
   1304 {
   1305 	unsigned long long elapsed;
   1306 	struct thread_data *td = data;
   1307 	struct thread_options *o = &td->o;
   1308 	pthread_condattr_t attr;
   1309 	int clear_state;
   1310 	int ret;
   1311 
   1312 	if (!o->use_thread) {
   1313 		setsid();
   1314 		td->pid = getpid();
   1315 	} else
   1316 		td->pid = gettid();
   1317 
   1318 	fio_local_clock_init(o->use_thread);
   1319 
   1320 	dprint(FD_PROCESS, "jobs pid=%d started\n", (int) td->pid);
   1321 
   1322 	if (is_backend)
   1323 		fio_server_send_start(td);
   1324 
   1325 	INIT_FLIST_HEAD(&td->io_log_list);
   1326 	INIT_FLIST_HEAD(&td->io_hist_list);
   1327 	INIT_FLIST_HEAD(&td->verify_list);
   1328 	INIT_FLIST_HEAD(&td->trim_list);
   1329 	INIT_FLIST_HEAD(&td->next_rand_list);
   1330 	pthread_mutex_init(&td->io_u_lock, NULL);
   1331 	td->io_hist_tree = RB_ROOT;
   1332 
   1333 	pthread_condattr_init(&attr);
   1334 	pthread_cond_init(&td->verify_cond, &attr);
   1335 	pthread_cond_init(&td->free_cond, &attr);
   1336 
   1337 	td_set_runstate(td, TD_INITIALIZED);
   1338 	dprint(FD_MUTEX, "up startup_mutex\n");
   1339 	fio_mutex_up(startup_mutex);
   1340 	dprint(FD_MUTEX, "wait on td->mutex\n");
   1341 	fio_mutex_down(td->mutex);
   1342 	dprint(FD_MUTEX, "done waiting on td->mutex\n");
   1343 
   1344 	/*
   1345 	 * A new gid requires privilege, so we need to do this before setting
   1346 	 * the uid.
   1347 	 */
   1348 	if (o->gid != -1U && setgid(o->gid)) {
   1349 		td_verror(td, errno, "setgid");
   1350 		goto err;
   1351 	}
   1352 	if (o->uid != -1U && setuid(o->uid)) {
   1353 		td_verror(td, errno, "setuid");
   1354 		goto err;
   1355 	}
   1356 
   1357 	/*
   1358 	 * If we have a gettimeofday() thread, make sure we exclude that
   1359 	 * thread from this job
   1360 	 */
   1361 	if (o->gtod_cpu)
   1362 		fio_cpu_clear(&o->cpumask, o->gtod_cpu);
   1363 
   1364 	/*
   1365 	 * Set affinity first, in case it has an impact on the memory
   1366 	 * allocations.
   1367 	 */
   1368 	if (fio_option_is_set(o, cpumask)) {
   1369 		if (o->cpus_allowed_policy == FIO_CPUS_SPLIT) {
   1370 			ret = fio_cpus_split(&o->cpumask, td->thread_number - 1);
   1371 			if (!ret) {
   1372 				log_err("fio: no CPUs set\n");
   1373 				log_err("fio: Try increasing number of available CPUs\n");
   1374 				td_verror(td, EINVAL, "cpus_split");
   1375 				goto err;
   1376 			}
   1377 		}
   1378 		ret = fio_setaffinity(td->pid, o->cpumask);
   1379 		if (ret == -1) {
   1380 			td_verror(td, errno, "cpu_set_affinity");
   1381 			goto err;
   1382 		}
   1383 	}
   1384 
   1385 #ifdef CONFIG_LIBNUMA
   1386 	/* numa node setup */
   1387 	if (fio_option_is_set(o, numa_cpunodes) ||
   1388 	    fio_option_is_set(o, numa_memnodes)) {
   1389 		struct bitmask *mask;
   1390 
   1391 		if (numa_available() < 0) {
   1392 			td_verror(td, errno, "Does not support NUMA API\n");
   1393 			goto err;
   1394 		}
   1395 
   1396 		if (fio_option_is_set(o, numa_cpunodes)) {
   1397 			mask = numa_parse_nodestring(o->numa_cpunodes);
   1398 			ret = numa_run_on_node_mask(mask);
   1399 			numa_free_nodemask(mask);
   1400 			if (ret == -1) {
   1401 				td_verror(td, errno, \
   1402 					"numa_run_on_node_mask failed\n");
   1403 				goto err;
   1404 			}
   1405 		}
   1406 
   1407 		if (fio_option_is_set(o, numa_memnodes)) {
   1408 			mask = NULL;
   1409 			if (o->numa_memnodes)
   1410 				mask = numa_parse_nodestring(o->numa_memnodes);
   1411 
   1412 			switch (o->numa_mem_mode) {
   1413 			case MPOL_INTERLEAVE:
   1414 				numa_set_interleave_mask(mask);
   1415 				break;
   1416 			case MPOL_BIND:
   1417 				numa_set_membind(mask);
   1418 				break;
   1419 			case MPOL_LOCAL:
   1420 				numa_set_localalloc();
   1421 				break;
   1422 			case MPOL_PREFERRED:
   1423 				numa_set_preferred(o->numa_mem_prefer_node);
   1424 				break;
   1425 			case MPOL_DEFAULT:
   1426 			default:
   1427 				break;
   1428 			}
   1429 
   1430 			if (mask)
   1431 				numa_free_nodemask(mask);
   1432 
   1433 		}
   1434 	}
   1435 #endif
   1436 
   1437 	if (fio_pin_memory(td))
   1438 		goto err;
   1439 
   1440 	/*
   1441 	 * May alter parameters that init_io_u() will use, so we need to
   1442 	 * do this first.
   1443 	 */
   1444 	if (init_iolog(td))
   1445 		goto err;
   1446 
   1447 	if (init_io_u(td))
   1448 		goto err;
   1449 
   1450 	if (o->verify_async && verify_async_init(td))
   1451 		goto err;
   1452 
   1453 	if (fio_option_is_set(o, ioprio) ||
   1454 	    fio_option_is_set(o, ioprio_class)) {
   1455 		ret = ioprio_set(IOPRIO_WHO_PROCESS, 0, o->ioprio_class, o->ioprio);
   1456 		if (ret == -1) {
   1457 			td_verror(td, errno, "ioprio_set");
   1458 			goto err;
   1459 		}
   1460 	}
   1461 
   1462 	if (o->cgroup && cgroup_setup(td, cgroup_list, &cgroup_mnt))
   1463 		goto err;
   1464 
   1465 	errno = 0;
   1466 	if (nice(o->nice) == -1 && errno != 0) {
   1467 		td_verror(td, errno, "nice");
   1468 		goto err;
   1469 	}
   1470 
   1471 	if (o->ioscheduler && switch_ioscheduler(td))
   1472 		goto err;
   1473 
   1474 	if (!o->create_serialize && setup_files(td))
   1475 		goto err;
   1476 
   1477 	if (td_io_init(td))
   1478 		goto err;
   1479 
   1480 	if (init_random_map(td))
   1481 		goto err;
   1482 
   1483 	if (o->exec_prerun && exec_string(o, o->exec_prerun, (const char *)"prerun"))
   1484 		goto err;
   1485 
   1486 	if (o->pre_read) {
   1487 		if (pre_read_files(td) < 0)
   1488 			goto err;
   1489 	}
   1490 
   1491 	if (td->flags & TD_F_COMPRESS_LOG)
   1492 		tp_init(&td->tp_data);
   1493 
   1494 	fio_verify_init(td);
   1495 
   1496 	fio_gettime(&td->epoch, NULL);
   1497 	fio_getrusage(&td->ru_start);
   1498 	clear_state = 0;
   1499 	while (keep_running(td)) {
   1500 		uint64_t verify_bytes;
   1501 
   1502 		fio_gettime(&td->start, NULL);
   1503 		memcpy(&td->bw_sample_time, &td->start, sizeof(td->start));
   1504 		memcpy(&td->iops_sample_time, &td->start, sizeof(td->start));
   1505 		memcpy(&td->tv_cache, &td->start, sizeof(td->start));
   1506 
   1507 		if (o->ratemin[DDIR_READ] || o->ratemin[DDIR_WRITE] ||
   1508 				o->ratemin[DDIR_TRIM]) {
   1509 		        memcpy(&td->lastrate[DDIR_READ], &td->bw_sample_time,
   1510 						sizeof(td->bw_sample_time));
   1511 		        memcpy(&td->lastrate[DDIR_WRITE], &td->bw_sample_time,
   1512 						sizeof(td->bw_sample_time));
   1513 		        memcpy(&td->lastrate[DDIR_TRIM], &td->bw_sample_time,
   1514 						sizeof(td->bw_sample_time));
   1515 		}
   1516 
   1517 		if (clear_state)
   1518 			clear_io_state(td);
   1519 
   1520 		prune_io_piece_log(td);
   1521 
   1522 		if (td->o.verify_only && (td_write(td) || td_rw(td)))
   1523 			verify_bytes = do_dry_run(td);
   1524 		else
   1525 			verify_bytes = do_io(td);
   1526 
   1527 		clear_state = 1;
   1528 
   1529 		fio_mutex_down(stat_mutex);
   1530 		if (td_read(td) && td->io_bytes[DDIR_READ]) {
   1531 			elapsed = mtime_since_now(&td->start);
   1532 			td->ts.runtime[DDIR_READ] += elapsed;
   1533 		}
   1534 		if (td_write(td) && td->io_bytes[DDIR_WRITE]) {
   1535 			elapsed = mtime_since_now(&td->start);
   1536 			td->ts.runtime[DDIR_WRITE] += elapsed;
   1537 		}
   1538 		if (td_trim(td) && td->io_bytes[DDIR_TRIM]) {
   1539 			elapsed = mtime_since_now(&td->start);
   1540 			td->ts.runtime[DDIR_TRIM] += elapsed;
   1541 		}
   1542 		fio_gettime(&td->start, NULL);
   1543 		fio_mutex_up(stat_mutex);
   1544 
   1545 		if (td->error || td->terminate)
   1546 			break;
   1547 
   1548 		if (!o->do_verify ||
   1549 		    o->verify == VERIFY_NONE ||
   1550 		    (td->io_ops->flags & FIO_UNIDIR))
   1551 			continue;
   1552 
   1553 		clear_io_state(td);
   1554 
   1555 		fio_gettime(&td->start, NULL);
   1556 
   1557 		do_verify(td, verify_bytes);
   1558 
   1559 		fio_mutex_down(stat_mutex);
   1560 		td->ts.runtime[DDIR_READ] += mtime_since_now(&td->start);
   1561 		fio_gettime(&td->start, NULL);
   1562 		fio_mutex_up(stat_mutex);
   1563 
   1564 		if (td->error || td->terminate)
   1565 			break;
   1566 	}
   1567 
   1568 	update_rusage_stat(td);
   1569 	td->ts.total_run_time = mtime_since_now(&td->epoch);
   1570 	td->ts.io_bytes[DDIR_READ] = td->io_bytes[DDIR_READ];
   1571 	td->ts.io_bytes[DDIR_WRITE] = td->io_bytes[DDIR_WRITE];
   1572 	td->ts.io_bytes[DDIR_TRIM] = td->io_bytes[DDIR_TRIM];
   1573 
   1574 	if (td->o.verify_state_save && !(td->flags & TD_F_VSTATE_SAVED) &&
   1575 	    (td->o.verify != VERIFY_NONE && td_write(td))) {
   1576 		struct all_io_list *state;
   1577 		size_t sz;
   1578 
   1579 		state = get_all_io_list(td->thread_number, &sz);
   1580 		if (state) {
   1581 			__verify_save_state(state, "local");
   1582 			free(state);
   1583 		}
   1584 	}
   1585 
   1586 	fio_unpin_memory(td);
   1587 
   1588 	fio_writeout_logs(td);
   1589 
   1590 	if (td->flags & TD_F_COMPRESS_LOG)
   1591 		tp_exit(&td->tp_data);
   1592 
   1593 	if (o->exec_postrun)
   1594 		exec_string(o, o->exec_postrun, (const char *)"postrun");
   1595 
   1596 	if (exitall_on_terminate)
   1597 		fio_terminate_threads(td->groupid);
   1598 
   1599 err:
   1600 	if (td->error)
   1601 		log_info("fio: pid=%d, err=%d/%s\n", (int) td->pid, td->error,
   1602 							td->verror);
   1603 
   1604 	if (o->verify_async)
   1605 		verify_async_exit(td);
   1606 
   1607 	close_and_free_files(td);
   1608 	cleanup_io_u(td);
   1609 	close_ioengine(td);
   1610 	cgroup_shutdown(td, &cgroup_mnt);
   1611 	verify_free_state(td);
   1612 
   1613 	if (fio_option_is_set(o, cpumask)) {
   1614 		ret = fio_cpuset_exit(&o->cpumask);
   1615 		if (ret)
   1616 			td_verror(td, ret, "fio_cpuset_exit");
   1617 	}
   1618 
   1619 	/*
   1620 	 * do this very late, it will log file closing as well
   1621 	 */
   1622 	if (o->write_iolog_file)
   1623 		write_iolog_close(td);
   1624 
   1625 	fio_mutex_remove(td->mutex);
   1626 	td->mutex = NULL;
   1627 
   1628 	td_set_runstate(td, TD_EXITED);
   1629 
   1630 	/*
   1631 	 * Do this last after setting our runstate to exited, so we
   1632 	 * know that the stat thread is signaled.
   1633 	 */
   1634 	check_update_rusage(td);
   1635 
   1636 	return (void *) (uintptr_t) td->error;
   1637 }
   1638 
   1639 
   1640 /*
   1641  * We cannot pass the td data into a forked process, so attach the td and
   1642  * pass it to the thread worker.
   1643  */
   1644 static int fork_main(int shmid, int offset)
   1645 {
   1646 	struct thread_data *td;
   1647 	void *data, *ret;
   1648 
   1649 #if !defined(__hpux) && !defined(CONFIG_NO_SHM)
   1650 	data = shmat(shmid, NULL, 0);
   1651 	if (data == (void *) -1) {
   1652 		int __err = errno;
   1653 
   1654 		perror("shmat");
   1655 		return __err;
   1656 	}
   1657 #else
   1658 	/*
   1659 	 * HP-UX inherits shm mappings?
   1660 	 */
   1661 	data = threads;
   1662 #endif
   1663 
   1664 	td = data + offset * sizeof(struct thread_data);
   1665 	ret = thread_main(td);
   1666 	shmdt(data);
   1667 	return (int) (uintptr_t) ret;
   1668 }
   1669 
   1670 static void dump_td_info(struct thread_data *td)
   1671 {
   1672 	log_err("fio: job '%s' hasn't exited in %lu seconds, it appears to "
   1673 		"be stuck. Doing forceful exit of this job.\n", td->o.name,
   1674 			(unsigned long) time_since_now(&td->terminate_time));
   1675 }
   1676 
   1677 /*
   1678  * Run over the job map and reap the threads that have exited, if any.
   1679  */
   1680 static void reap_threads(unsigned int *nr_running, unsigned int *t_rate,
   1681 			 unsigned int *m_rate)
   1682 {
   1683 	struct thread_data *td;
   1684 	unsigned int cputhreads, realthreads, pending;
   1685 	int i, status, ret;
   1686 
   1687 	/*
   1688 	 * reap exited threads (TD_EXITED -> TD_REAPED)
   1689 	 */
   1690 	realthreads = pending = cputhreads = 0;
   1691 	for_each_td(td, i) {
   1692 		int flags = 0;
   1693 
   1694 		/*
   1695 		 * ->io_ops is NULL for a thread that has closed its
   1696 		 * io engine
   1697 		 */
   1698 		if (td->io_ops && !strcmp(td->io_ops->name, "cpuio"))
   1699 			cputhreads++;
   1700 		else
   1701 			realthreads++;
   1702 
   1703 		if (!td->pid) {
   1704 			pending++;
   1705 			continue;
   1706 		}
   1707 		if (td->runstate == TD_REAPED)
   1708 			continue;
   1709 		if (td->o.use_thread) {
   1710 			if (td->runstate == TD_EXITED) {
   1711 				td_set_runstate(td, TD_REAPED);
   1712 				goto reaped;
   1713 			}
   1714 			continue;
   1715 		}
   1716 
   1717 		flags = WNOHANG;
   1718 		if (td->runstate == TD_EXITED)
   1719 			flags = 0;
   1720 
   1721 		/*
   1722 		 * check if someone quit or got killed in an unusual way
   1723 		 */
   1724 		ret = waitpid(td->pid, &status, flags);
   1725 		if (ret < 0) {
   1726 			if (errno == ECHILD) {
   1727 				log_err("fio: pid=%d disappeared %d\n",
   1728 						(int) td->pid, td->runstate);
   1729 				td->sig = ECHILD;
   1730 				td_set_runstate(td, TD_REAPED);
   1731 				goto reaped;
   1732 			}
   1733 			perror("waitpid");
   1734 		} else if (ret == td->pid) {
   1735 			if (WIFSIGNALED(status)) {
   1736 				int sig = WTERMSIG(status);
   1737 
   1738 				if (sig != SIGTERM && sig != SIGUSR2)
   1739 					log_err("fio: pid=%d, got signal=%d\n",
   1740 							(int) td->pid, sig);
   1741 				td->sig = sig;
   1742 				td_set_runstate(td, TD_REAPED);
   1743 				goto reaped;
   1744 			}
   1745 			if (WIFEXITED(status)) {
   1746 				if (WEXITSTATUS(status) && !td->error)
   1747 					td->error = WEXITSTATUS(status);
   1748 
   1749 				td_set_runstate(td, TD_REAPED);
   1750 				goto reaped;
   1751 			}
   1752 		}
   1753 
   1754 		/*
   1755 		 * If the job is stuck, do a forceful timeout of it and
   1756 		 * move on.
   1757 		 */
   1758 		if (td->terminate &&
   1759 		    time_since_now(&td->terminate_time) >= FIO_REAP_TIMEOUT) {
   1760 			dump_td_info(td);
   1761 			td_set_runstate(td, TD_REAPED);
   1762 			goto reaped;
   1763 		}
   1764 
   1765 		/*
   1766 		 * thread is not dead, continue
   1767 		 */
   1768 		pending++;
   1769 		continue;
   1770 reaped:
   1771 		(*nr_running)--;
   1772 		(*m_rate) -= ddir_rw_sum(td->o.ratemin);
   1773 		(*t_rate) -= ddir_rw_sum(td->o.rate);
   1774 		if (!td->pid)
   1775 			pending--;
   1776 
   1777 		if (td->error)
   1778 			exit_value++;
   1779 
   1780 		done_secs += mtime_since_now(&td->epoch) / 1000;
   1781 		profile_td_exit(td);
   1782 	}
   1783 
   1784 	if (*nr_running == cputhreads && !pending && realthreads)
   1785 		fio_terminate_threads(TERMINATE_ALL);
   1786 }
   1787 
   1788 static int __check_trigger_file(void)
   1789 {
   1790 	struct stat sb;
   1791 
   1792 	if (!trigger_file)
   1793 		return 0;
   1794 
   1795 	if (stat(trigger_file, &sb))
   1796 		return 0;
   1797 
   1798 	if (unlink(trigger_file) < 0)
   1799 		log_err("fio: failed to unlink %s: %s\n", trigger_file,
   1800 							strerror(errno));
   1801 
   1802 	return 1;
   1803 }
   1804 
   1805 static int trigger_timedout(void)
   1806 {
   1807 	if (trigger_timeout)
   1808 		return time_since_genesis() >= trigger_timeout;
   1809 
   1810 	return 0;
   1811 }
   1812 
   1813 void exec_trigger(const char *cmd)
   1814 {
   1815 	int ret;
   1816 
   1817 	if (!cmd)
   1818 		return;
   1819 
   1820 	ret = system(cmd);
   1821 	if (ret == -1)
   1822 		log_err("fio: failed executing %s trigger\n", cmd);
   1823 }
   1824 
   1825 void check_trigger_file(void)
   1826 {
   1827 	if (__check_trigger_file() || trigger_timedout()) {
   1828 		if (nr_clients)
   1829 			fio_clients_send_trigger(trigger_remote_cmd);
   1830 		else {
   1831 			verify_save_state();
   1832 			fio_terminate_threads(TERMINATE_ALL);
   1833 			exec_trigger(trigger_cmd);
   1834 		}
   1835 	}
   1836 }
   1837 
   1838 static int fio_verify_load_state(struct thread_data *td)
   1839 {
   1840 	int ret;
   1841 
   1842 	if (!td->o.verify_state)
   1843 		return 0;
   1844 
   1845 	if (is_backend) {
   1846 		void *data;
   1847 
   1848 		ret = fio_server_get_verify_state(td->o.name,
   1849 					td->thread_number - 1, &data);
   1850 		if (!ret)
   1851 			verify_convert_assign_state(td, data);
   1852 	} else
   1853 		ret = verify_load_state(td, "local");
   1854 
   1855 	return ret;
   1856 }
   1857 
   1858 static void do_usleep(unsigned int usecs)
   1859 {
   1860 	check_for_running_stats();
   1861 	check_trigger_file();
   1862 	usleep(usecs);
   1863 }
   1864 
   1865 /*
   1866  * Main function for kicking off and reaping jobs, as needed.
   1867  */
   1868 static void run_threads(void)
   1869 {
   1870 	struct thread_data *td;
   1871 	unsigned int i, todo, nr_running, m_rate, t_rate, nr_started;
   1872 	uint64_t spent;
   1873 
   1874 	if (fio_gtod_offload && fio_start_gtod_thread())
   1875 		return;
   1876 
   1877 	fio_idle_prof_init();
   1878 
   1879 	set_sig_handlers();
   1880 
   1881 	nr_thread = nr_process = 0;
   1882 	for_each_td(td, i) {
   1883 		if (td->o.use_thread)
   1884 			nr_thread++;
   1885 		else
   1886 			nr_process++;
   1887 	}
   1888 
   1889 	if (output_format == FIO_OUTPUT_NORMAL) {
   1890 		log_info("Starting ");
   1891 		if (nr_thread)
   1892 			log_info("%d thread%s", nr_thread,
   1893 						nr_thread > 1 ? "s" : "");
   1894 		if (nr_process) {
   1895 			if (nr_thread)
   1896 				log_info(" and ");
   1897 			log_info("%d process%s", nr_process,
   1898 						nr_process > 1 ? "es" : "");
   1899 		}
   1900 		log_info("\n");
   1901 		log_info_flush();
   1902 	}
   1903 
   1904 	todo = thread_number;
   1905 	nr_running = 0;
   1906 	nr_started = 0;
   1907 	m_rate = t_rate = 0;
   1908 
   1909 	for_each_td(td, i) {
   1910 		print_status_init(td->thread_number - 1);
   1911 
   1912 		if (!td->o.create_serialize)
   1913 			continue;
   1914 
   1915 		if (fio_verify_load_state(td))
   1916 			goto reap;
   1917 
   1918 		/*
   1919 		 * do file setup here so it happens sequentially,
   1920 		 * we don't want X number of threads getting their
   1921 		 * client data interspersed on disk
   1922 		 */
   1923 		if (setup_files(td)) {
   1924 reap:
   1925 			exit_value++;
   1926 			if (td->error)
   1927 				log_err("fio: pid=%d, err=%d/%s\n",
   1928 					(int) td->pid, td->error, td->verror);
   1929 			td_set_runstate(td, TD_REAPED);
   1930 			todo--;
   1931 		} else {
   1932 			struct fio_file *f;
   1933 			unsigned int j;
   1934 
   1935 			/*
   1936 			 * for sharing to work, each job must always open
   1937 			 * its own files. so close them, if we opened them
   1938 			 * for creation
   1939 			 */
   1940 			for_each_file(td, f, j) {
   1941 				if (fio_file_open(f))
   1942 					td_io_close_file(td, f);
   1943 			}
   1944 		}
   1945 	}
   1946 
   1947 	/* start idle threads before io threads start to run */
   1948 	fio_idle_prof_start();
   1949 
   1950 	set_genesis_time();
   1951 
   1952 	while (todo) {
   1953 		struct thread_data *map[REAL_MAX_JOBS];
   1954 		struct timeval this_start;
   1955 		int this_jobs = 0, left;
   1956 
   1957 		/*
   1958 		 * create threads (TD_NOT_CREATED -> TD_CREATED)
   1959 		 */
   1960 		for_each_td(td, i) {
   1961 			if (td->runstate != TD_NOT_CREATED)
   1962 				continue;
   1963 
   1964 			/*
   1965 			 * never got a chance to start, killed by other
   1966 			 * thread for some reason
   1967 			 */
   1968 			if (td->terminate) {
   1969 				todo--;
   1970 				continue;
   1971 			}
   1972 
   1973 			if (td->o.start_delay) {
   1974 				spent = utime_since_genesis();
   1975 
   1976 				if (td->o.start_delay > spent)
   1977 					continue;
   1978 			}
   1979 
   1980 			if (td->o.stonewall && (nr_started || nr_running)) {
   1981 				dprint(FD_PROCESS, "%s: stonewall wait\n",
   1982 							td->o.name);
   1983 				break;
   1984 			}
   1985 
   1986 			init_disk_util(td);
   1987 
   1988 			td->rusage_sem = fio_mutex_init(FIO_MUTEX_LOCKED);
   1989 			td->update_rusage = 0;
   1990 
   1991 			/*
   1992 			 * Set state to created. Thread will transition
   1993 			 * to TD_INITIALIZED when it's done setting up.
   1994 			 */
   1995 			td_set_runstate(td, TD_CREATED);
   1996 			map[this_jobs++] = td;
   1997 			nr_started++;
   1998 
   1999 			if (td->o.use_thread) {
   2000 				int ret;
   2001 
   2002 				dprint(FD_PROCESS, "will pthread_create\n");
   2003 				ret = pthread_create(&td->thread, NULL,
   2004 							thread_main, td);
   2005 				if (ret) {
   2006 					log_err("pthread_create: %s\n",
   2007 							strerror(ret));
   2008 					nr_started--;
   2009 					break;
   2010 				}
   2011 				ret = pthread_detach(td->thread);
   2012 				if (ret)
   2013 					log_err("pthread_detach: %s",
   2014 							strerror(ret));
   2015 			} else {
   2016 				pid_t pid;
   2017 				dprint(FD_PROCESS, "will fork\n");
   2018 				pid = fork();
   2019 				if (!pid) {
   2020 					int ret = fork_main(shm_id, i);
   2021 
   2022 					_exit(ret);
   2023 				} else if (i == fio_debug_jobno)
   2024 					*fio_debug_jobp = pid;
   2025 			}
   2026 			dprint(FD_MUTEX, "wait on startup_mutex\n");
   2027 			if (fio_mutex_down_timeout(startup_mutex, 10)) {
   2028 				log_err("fio: job startup hung? exiting.\n");
   2029 				fio_terminate_threads(TERMINATE_ALL);
   2030 				fio_abort = 1;
   2031 				nr_started--;
   2032 				break;
   2033 			}
   2034 			dprint(FD_MUTEX, "done waiting on startup_mutex\n");
   2035 		}
   2036 
   2037 		/*
   2038 		 * Wait for the started threads to transition to
   2039 		 * TD_INITIALIZED.
   2040 		 */
   2041 		fio_gettime(&this_start, NULL);
   2042 		left = this_jobs;
   2043 		while (left && !fio_abort) {
   2044 			if (mtime_since_now(&this_start) > JOB_START_TIMEOUT)
   2045 				break;
   2046 
   2047 			do_usleep(100000);
   2048 
   2049 			for (i = 0; i < this_jobs; i++) {
   2050 				td = map[i];
   2051 				if (!td)
   2052 					continue;
   2053 				if (td->runstate == TD_INITIALIZED) {
   2054 					map[i] = NULL;
   2055 					left--;
   2056 				} else if (td->runstate >= TD_EXITED) {
   2057 					map[i] = NULL;
   2058 					left--;
   2059 					todo--;
   2060 					nr_running++; /* work-around... */
   2061 				}
   2062 			}
   2063 		}
   2064 
   2065 		if (left) {
   2066 			log_err("fio: %d job%s failed to start\n", left,
   2067 					left > 1 ? "s" : "");
   2068 			for (i = 0; i < this_jobs; i++) {
   2069 				td = map[i];
   2070 				if (!td)
   2071 					continue;
   2072 				kill(td->pid, SIGTERM);
   2073 			}
   2074 			break;
   2075 		}
   2076 
   2077 		/*
   2078 		 * start created threads (TD_INITIALIZED -> TD_RUNNING).
   2079 		 */
   2080 		for_each_td(td, i) {
   2081 			if (td->runstate != TD_INITIALIZED)
   2082 				continue;
   2083 
   2084 			if (in_ramp_time(td))
   2085 				td_set_runstate(td, TD_RAMP);
   2086 			else
   2087 				td_set_runstate(td, TD_RUNNING);
   2088 			nr_running++;
   2089 			nr_started--;
   2090 			m_rate += ddir_rw_sum(td->o.ratemin);
   2091 			t_rate += ddir_rw_sum(td->o.rate);
   2092 			todo--;
   2093 			fio_mutex_up(td->mutex);
   2094 		}
   2095 
   2096 		reap_threads(&nr_running, &t_rate, &m_rate);
   2097 
   2098 		if (todo)
   2099 			do_usleep(100000);
   2100 	}
   2101 
   2102 	while (nr_running) {
   2103 		reap_threads(&nr_running, &t_rate, &m_rate);
   2104 		do_usleep(10000);
   2105 	}
   2106 
   2107 	fio_idle_prof_stop();
   2108 
   2109 	update_io_ticks();
   2110 }
   2111 
   2112 static void wait_for_helper_thread_exit(void)
   2113 {
   2114 	void *ret;
   2115 
   2116 	helper_exit = 1;
   2117 	pthread_cond_signal(&helper_cond);
   2118 	pthread_join(helper_thread, &ret);
   2119 }
   2120 
   2121 static void free_disk_util(void)
   2122 {
   2123 	disk_util_prune_entries();
   2124 
   2125 	pthread_cond_destroy(&helper_cond);
   2126 }
   2127 
   2128 static void *helper_thread_main(void *data)
   2129 {
   2130 	int ret = 0;
   2131 
   2132 	fio_mutex_up(startup_mutex);
   2133 
   2134 	while (!ret) {
   2135 		uint64_t sec = DISK_UTIL_MSEC / 1000;
   2136 		uint64_t nsec = (DISK_UTIL_MSEC % 1000) * 1000000;
   2137 		struct timespec ts;
   2138 		struct timeval tv;
   2139 
   2140 		gettimeofday(&tv, NULL);
   2141 		ts.tv_sec = tv.tv_sec + sec;
   2142 		ts.tv_nsec = (tv.tv_usec * 1000) + nsec;
   2143 
   2144 		if (ts.tv_nsec >= 1000000000ULL) {
   2145 			ts.tv_nsec -= 1000000000ULL;
   2146 			ts.tv_sec++;
   2147 		}
   2148 
   2149 		pthread_cond_timedwait(&helper_cond, &helper_lock, &ts);
   2150 
   2151 		ret = update_io_ticks();
   2152 
   2153 		if (helper_do_stat) {
   2154 			helper_do_stat = 0;
   2155 			__show_running_run_stats();
   2156 		}
   2157 
   2158 		if (!is_backend)
   2159 			print_thread_status();
   2160 	}
   2161 
   2162 	return NULL;
   2163 }
   2164 
   2165 static int create_helper_thread(void)
   2166 {
   2167 	int ret;
   2168 
   2169 	setup_disk_util();
   2170 
   2171 	pthread_cond_init(&helper_cond, NULL);
   2172 	pthread_mutex_init(&helper_lock, NULL);
   2173 
   2174 	ret = pthread_create(&helper_thread, NULL, helper_thread_main, NULL);
   2175 	if (ret) {
   2176 		log_err("Can't create helper thread: %s\n", strerror(ret));
   2177 		return 1;
   2178 	}
   2179 
   2180 	dprint(FD_MUTEX, "wait on startup_mutex\n");
   2181 	fio_mutex_down(startup_mutex);
   2182 	dprint(FD_MUTEX, "done waiting on startup_mutex\n");
   2183 	return 0;
   2184 }
   2185 
   2186 int fio_backend(void)
   2187 {
   2188 	struct thread_data *td;
   2189 	int i;
   2190 
   2191 	if (exec_profile) {
   2192 		if (load_profile(exec_profile))
   2193 			return 1;
   2194 		free(exec_profile);
   2195 		exec_profile = NULL;
   2196 	}
   2197 	if (!thread_number)
   2198 		return 0;
   2199 
   2200 	if (write_bw_log) {
   2201 		struct log_params p = {
   2202 			.log_type = IO_LOG_TYPE_BW,
   2203 		};
   2204 
   2205 		setup_log(&agg_io_log[DDIR_READ], &p, "agg-read_bw.log");
   2206 		setup_log(&agg_io_log[DDIR_WRITE], &p, "agg-write_bw.log");
   2207 		setup_log(&agg_io_log[DDIR_TRIM], &p, "agg-trim_bw.log");
   2208 	}
   2209 
   2210 	startup_mutex = fio_mutex_init(FIO_MUTEX_LOCKED);
   2211 	if (startup_mutex == NULL)
   2212 		return 1;
   2213 
   2214 	set_genesis_time();
   2215 	stat_init();
   2216 	create_helper_thread();
   2217 
   2218 	cgroup_list = smalloc(sizeof(*cgroup_list));
   2219 	INIT_FLIST_HEAD(cgroup_list);
   2220 
   2221 	run_threads();
   2222 
   2223 	wait_for_helper_thread_exit();
   2224 
   2225 	if (!fio_abort) {
   2226 		__show_run_stats();
   2227 		if (write_bw_log) {
   2228 			for (i = 0; i < DDIR_RWDIR_CNT; i++) {
   2229 				struct io_log *log = agg_io_log[i];
   2230 
   2231 				flush_log(log);
   2232 				free_log(log);
   2233 			}
   2234 		}
   2235 	}
   2236 
   2237 	for_each_td(td, i) {
   2238 		fio_options_free(td);
   2239 		if (td->rusage_sem) {
   2240 			fio_mutex_remove(td->rusage_sem);
   2241 			td->rusage_sem = NULL;
   2242 		}
   2243 	}
   2244 
   2245 	free_disk_util();
   2246 	cgroup_kill(cgroup_list);
   2247 	sfree(cgroup_list);
   2248 	sfree(cgroup_mnt);
   2249 
   2250 	fio_mutex_remove(startup_mutex);
   2251 	stat_exit();
   2252 	return exit_value;
   2253 }
   2254