1 /* 2 * fio - the flexible io tester 3 * 4 * Copyright (C) 2005 Jens Axboe <axboe (at) suse.de> 5 * Copyright (C) 2006-2012 Jens Axboe <axboe (at) kernel.dk> 6 * 7 * The license below covers all files distributed with fio unless otherwise 8 * noted in the file itself. 9 * 10 * This program is free software; you can redistribute it and/or modify 11 * it under the terms of the GNU General Public License version 2 as 12 * published by the Free Software Foundation. 13 * 14 * This program is distributed in the hope that it will be useful, 15 * but WITHOUT ANY WARRANTY; without even the implied warranty of 16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 * GNU General Public License for more details. 18 * 19 * You should have received a copy of the GNU General Public License 20 * along with this program; if not, write to the Free Software 21 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 22 * 23 */ 24 #include <unistd.h> 25 #include <fcntl.h> 26 #include <string.h> 27 #include <limits.h> 28 #include <signal.h> 29 #include <time.h> 30 #include <locale.h> 31 #include <assert.h> 32 #include <time.h> 33 #include <inttypes.h> 34 #include <sys/stat.h> 35 #include <sys/wait.h> 36 #include <sys/ipc.h> 37 #include <sys/mman.h> 38 #include <math.h> 39 40 #include "fio.h" 41 #ifndef FIO_NO_HAVE_SHM_H 42 #include <sys/shm.h> 43 #endif 44 #include "hash.h" 45 #include "smalloc.h" 46 #include "verify.h" 47 #include "trim.h" 48 #include "diskutil.h" 49 #include "cgroup.h" 50 #include "profile.h" 51 #include "lib/rand.h" 52 #include "lib/memalign.h" 53 #include "server.h" 54 #include "lib/getrusage.h" 55 #include "idletime.h" 56 #include "err.h" 57 #include "workqueue.h" 58 #include "lib/mountcheck.h" 59 #include "rate-submit.h" 60 #include "helper_thread.h" 61 62 static struct fio_mutex *startup_mutex; 63 static struct flist_head *cgroup_list; 64 static char *cgroup_mnt; 65 static int exit_value; 66 static volatile int fio_abort; 67 static unsigned int nr_process = 0; 68 static unsigned int nr_thread = 0; 69 70 struct io_log *agg_io_log[DDIR_RWDIR_CNT]; 71 72 int groupid = 0; 73 unsigned int thread_number = 0; 74 unsigned int stat_number = 0; 75 int shm_id = 0; 76 int temp_stall_ts; 77 unsigned long done_secs = 0; 78 79 #define JOB_START_TIMEOUT (5 * 1000) 80 81 static void sig_int(int sig) 82 { 83 if (threads) { 84 if (is_backend) 85 fio_server_got_signal(sig); 86 else { 87 log_info("\nfio: terminating on signal %d\n", sig); 88 log_info_flush(); 89 exit_value = 128; 90 } 91 92 fio_terminate_threads(TERMINATE_ALL); 93 } 94 } 95 96 void sig_show_status(int sig) 97 { 98 show_running_run_stats(); 99 } 100 101 static void set_sig_handlers(void) 102 { 103 struct sigaction act; 104 105 memset(&act, 0, sizeof(act)); 106 act.sa_handler = sig_int; 107 act.sa_flags = SA_RESTART; 108 sigaction(SIGINT, &act, NULL); 109 110 memset(&act, 0, sizeof(act)); 111 act.sa_handler = sig_int; 112 act.sa_flags = SA_RESTART; 113 sigaction(SIGTERM, &act, NULL); 114 115 /* Windows uses SIGBREAK as a quit signal from other applications */ 116 #ifdef WIN32 117 memset(&act, 0, sizeof(act)); 118 act.sa_handler = sig_int; 119 act.sa_flags = SA_RESTART; 120 sigaction(SIGBREAK, &act, NULL); 121 #endif 122 123 memset(&act, 0, sizeof(act)); 124 act.sa_handler = sig_show_status; 125 act.sa_flags = SA_RESTART; 126 sigaction(SIGUSR1, &act, NULL); 127 128 if (is_backend) { 129 memset(&act, 0, sizeof(act)); 130 act.sa_handler = sig_int; 131 act.sa_flags = SA_RESTART; 132 sigaction(SIGPIPE, &act, NULL); 133 } 134 } 135 136 /* 137 * Check if we are above the minimum rate given. 138 */ 139 static bool __check_min_rate(struct thread_data *td, struct timeval *now, 140 enum fio_ddir ddir) 141 { 142 unsigned long long bytes = 0; 143 unsigned long iops = 0; 144 unsigned long spent; 145 unsigned long rate; 146 unsigned int ratemin = 0; 147 unsigned int rate_iops = 0; 148 unsigned int rate_iops_min = 0; 149 150 assert(ddir_rw(ddir)); 151 152 if (!td->o.ratemin[ddir] && !td->o.rate_iops_min[ddir]) 153 return false; 154 155 /* 156 * allow a 2 second settle period in the beginning 157 */ 158 if (mtime_since(&td->start, now) < 2000) 159 return false; 160 161 iops += td->this_io_blocks[ddir]; 162 bytes += td->this_io_bytes[ddir]; 163 ratemin += td->o.ratemin[ddir]; 164 rate_iops += td->o.rate_iops[ddir]; 165 rate_iops_min += td->o.rate_iops_min[ddir]; 166 167 /* 168 * if rate blocks is set, sample is running 169 */ 170 if (td->rate_bytes[ddir] || td->rate_blocks[ddir]) { 171 spent = mtime_since(&td->lastrate[ddir], now); 172 if (spent < td->o.ratecycle) 173 return false; 174 175 if (td->o.rate[ddir] || td->o.ratemin[ddir]) { 176 /* 177 * check bandwidth specified rate 178 */ 179 if (bytes < td->rate_bytes[ddir]) { 180 log_err("%s: rate_min=%uB/s not met, only transferred %lluB\n", 181 td->o.name, ratemin, bytes); 182 return true; 183 } else { 184 if (spent) 185 rate = ((bytes - td->rate_bytes[ddir]) * 1000) / spent; 186 else 187 rate = 0; 188 189 if (rate < ratemin || 190 bytes < td->rate_bytes[ddir]) { 191 log_err("%s: rate_min=%uB/s not met, got %luB/s\n", 192 td->o.name, ratemin, rate); 193 return true; 194 } 195 } 196 } else { 197 /* 198 * checks iops specified rate 199 */ 200 if (iops < rate_iops) { 201 log_err("%s: rate_iops_min=%u not met, only performed %lu IOs\n", 202 td->o.name, rate_iops, iops); 203 return true; 204 } else { 205 if (spent) 206 rate = ((iops - td->rate_blocks[ddir]) * 1000) / spent; 207 else 208 rate = 0; 209 210 if (rate < rate_iops_min || 211 iops < td->rate_blocks[ddir]) { 212 log_err("%s: rate_iops_min=%u not met, got %lu IOPS\n", 213 td->o.name, rate_iops_min, rate); 214 return true; 215 } 216 } 217 } 218 } 219 220 td->rate_bytes[ddir] = bytes; 221 td->rate_blocks[ddir] = iops; 222 memcpy(&td->lastrate[ddir], now, sizeof(*now)); 223 return false; 224 } 225 226 static bool check_min_rate(struct thread_data *td, struct timeval *now) 227 { 228 bool ret = false; 229 230 if (td->bytes_done[DDIR_READ]) 231 ret |= __check_min_rate(td, now, DDIR_READ); 232 if (td->bytes_done[DDIR_WRITE]) 233 ret |= __check_min_rate(td, now, DDIR_WRITE); 234 if (td->bytes_done[DDIR_TRIM]) 235 ret |= __check_min_rate(td, now, DDIR_TRIM); 236 237 return ret; 238 } 239 240 /* 241 * When job exits, we can cancel the in-flight IO if we are using async 242 * io. Attempt to do so. 243 */ 244 static void cleanup_pending_aio(struct thread_data *td) 245 { 246 int r; 247 248 /* 249 * get immediately available events, if any 250 */ 251 r = io_u_queued_complete(td, 0); 252 if (r < 0) 253 return; 254 255 /* 256 * now cancel remaining active events 257 */ 258 if (td->io_ops->cancel) { 259 struct io_u *io_u; 260 int i; 261 262 io_u_qiter(&td->io_u_all, io_u, i) { 263 if (io_u->flags & IO_U_F_FLIGHT) { 264 r = td->io_ops->cancel(td, io_u); 265 if (!r) 266 put_io_u(td, io_u); 267 } 268 } 269 } 270 271 if (td->cur_depth) 272 r = io_u_queued_complete(td, td->cur_depth); 273 } 274 275 /* 276 * Helper to handle the final sync of a file. Works just like the normal 277 * io path, just does everything sync. 278 */ 279 static bool fio_io_sync(struct thread_data *td, struct fio_file *f) 280 { 281 struct io_u *io_u = __get_io_u(td); 282 int ret; 283 284 if (!io_u) 285 return true; 286 287 io_u->ddir = DDIR_SYNC; 288 io_u->file = f; 289 290 if (td_io_prep(td, io_u)) { 291 put_io_u(td, io_u); 292 return true; 293 } 294 295 requeue: 296 ret = td_io_queue(td, io_u); 297 if (ret < 0) { 298 td_verror(td, io_u->error, "td_io_queue"); 299 put_io_u(td, io_u); 300 return true; 301 } else if (ret == FIO_Q_QUEUED) { 302 if (td_io_commit(td)) 303 return true; 304 if (io_u_queued_complete(td, 1) < 0) 305 return true; 306 } else if (ret == FIO_Q_COMPLETED) { 307 if (io_u->error) { 308 td_verror(td, io_u->error, "td_io_queue"); 309 return true; 310 } 311 312 if (io_u_sync_complete(td, io_u) < 0) 313 return true; 314 } else if (ret == FIO_Q_BUSY) { 315 if (td_io_commit(td)) 316 return true; 317 goto requeue; 318 } 319 320 return false; 321 } 322 323 static int fio_file_fsync(struct thread_data *td, struct fio_file *f) 324 { 325 int ret; 326 327 if (fio_file_open(f)) 328 return fio_io_sync(td, f); 329 330 if (td_io_open_file(td, f)) 331 return 1; 332 333 ret = fio_io_sync(td, f); 334 td_io_close_file(td, f); 335 return ret; 336 } 337 338 static inline void __update_tv_cache(struct thread_data *td) 339 { 340 fio_gettime(&td->tv_cache, NULL); 341 } 342 343 static inline void update_tv_cache(struct thread_data *td) 344 { 345 if ((++td->tv_cache_nr & td->tv_cache_mask) == td->tv_cache_mask) 346 __update_tv_cache(td); 347 } 348 349 static inline bool runtime_exceeded(struct thread_data *td, struct timeval *t) 350 { 351 if (in_ramp_time(td)) 352 return false; 353 if (!td->o.timeout) 354 return false; 355 if (utime_since(&td->epoch, t) >= td->o.timeout) 356 return true; 357 358 return false; 359 } 360 361 /* 362 * We need to update the runtime consistently in ms, but keep a running 363 * tally of the current elapsed time in microseconds for sub millisecond 364 * updates. 365 */ 366 static inline void update_runtime(struct thread_data *td, 367 unsigned long long *elapsed_us, 368 const enum fio_ddir ddir) 369 { 370 if (ddir == DDIR_WRITE && td_write(td) && td->o.verify_only) 371 return; 372 373 td->ts.runtime[ddir] -= (elapsed_us[ddir] + 999) / 1000; 374 elapsed_us[ddir] += utime_since_now(&td->start); 375 td->ts.runtime[ddir] += (elapsed_us[ddir] + 999) / 1000; 376 } 377 378 static bool break_on_this_error(struct thread_data *td, enum fio_ddir ddir, 379 int *retptr) 380 { 381 int ret = *retptr; 382 383 if (ret < 0 || td->error) { 384 int err = td->error; 385 enum error_type_bit eb; 386 387 if (ret < 0) 388 err = -ret; 389 390 eb = td_error_type(ddir, err); 391 if (!(td->o.continue_on_error & (1 << eb))) 392 return true; 393 394 if (td_non_fatal_error(td, eb, err)) { 395 /* 396 * Continue with the I/Os in case of 397 * a non fatal error. 398 */ 399 update_error_count(td, err); 400 td_clear_error(td); 401 *retptr = 0; 402 return false; 403 } else if (td->o.fill_device && err == ENOSPC) { 404 /* 405 * We expect to hit this error if 406 * fill_device option is set. 407 */ 408 td_clear_error(td); 409 fio_mark_td_terminate(td); 410 return true; 411 } else { 412 /* 413 * Stop the I/O in case of a fatal 414 * error. 415 */ 416 update_error_count(td, err); 417 return true; 418 } 419 } 420 421 return false; 422 } 423 424 static void check_update_rusage(struct thread_data *td) 425 { 426 if (td->update_rusage) { 427 td->update_rusage = 0; 428 update_rusage_stat(td); 429 fio_mutex_up(td->rusage_sem); 430 } 431 } 432 433 static int wait_for_completions(struct thread_data *td, struct timeval *time) 434 { 435 const int full = queue_full(td); 436 int min_evts = 0; 437 int ret; 438 439 if (td->flags & TD_F_REGROW_LOGS) 440 return io_u_quiesce(td); 441 442 /* 443 * if the queue is full, we MUST reap at least 1 event 444 */ 445 min_evts = min(td->o.iodepth_batch_complete_min, td->cur_depth); 446 if ((full && !min_evts) || !td->o.iodepth_batch_complete_min) 447 min_evts = 1; 448 449 if (time && (__should_check_rate(td, DDIR_READ) || 450 __should_check_rate(td, DDIR_WRITE) || 451 __should_check_rate(td, DDIR_TRIM))) 452 fio_gettime(time, NULL); 453 454 do { 455 ret = io_u_queued_complete(td, min_evts); 456 if (ret < 0) 457 break; 458 } while (full && (td->cur_depth > td->o.iodepth_low)); 459 460 return ret; 461 } 462 463 int io_queue_event(struct thread_data *td, struct io_u *io_u, int *ret, 464 enum fio_ddir ddir, uint64_t *bytes_issued, int from_verify, 465 struct timeval *comp_time) 466 { 467 int ret2; 468 469 switch (*ret) { 470 case FIO_Q_COMPLETED: 471 if (io_u->error) { 472 *ret = -io_u->error; 473 clear_io_u(td, io_u); 474 } else if (io_u->resid) { 475 int bytes = io_u->xfer_buflen - io_u->resid; 476 struct fio_file *f = io_u->file; 477 478 if (bytes_issued) 479 *bytes_issued += bytes; 480 481 if (!from_verify) 482 trim_io_piece(td, io_u); 483 484 /* 485 * zero read, fail 486 */ 487 if (!bytes) { 488 if (!from_verify) 489 unlog_io_piece(td, io_u); 490 td_verror(td, EIO, "full resid"); 491 put_io_u(td, io_u); 492 break; 493 } 494 495 io_u->xfer_buflen = io_u->resid; 496 io_u->xfer_buf += bytes; 497 io_u->offset += bytes; 498 499 if (ddir_rw(io_u->ddir)) 500 td->ts.short_io_u[io_u->ddir]++; 501 502 f = io_u->file; 503 if (io_u->offset == f->real_file_size) 504 goto sync_done; 505 506 requeue_io_u(td, &io_u); 507 } else { 508 sync_done: 509 if (comp_time && (__should_check_rate(td, DDIR_READ) || 510 __should_check_rate(td, DDIR_WRITE) || 511 __should_check_rate(td, DDIR_TRIM))) 512 fio_gettime(comp_time, NULL); 513 514 *ret = io_u_sync_complete(td, io_u); 515 if (*ret < 0) 516 break; 517 } 518 519 if (td->flags & TD_F_REGROW_LOGS) 520 regrow_logs(td); 521 522 /* 523 * when doing I/O (not when verifying), 524 * check for any errors that are to be ignored 525 */ 526 if (!from_verify) 527 break; 528 529 return 0; 530 case FIO_Q_QUEUED: 531 /* 532 * if the engine doesn't have a commit hook, 533 * the io_u is really queued. if it does have such 534 * a hook, it has to call io_u_queued() itself. 535 */ 536 if (td->io_ops->commit == NULL) 537 io_u_queued(td, io_u); 538 if (bytes_issued) 539 *bytes_issued += io_u->xfer_buflen; 540 break; 541 case FIO_Q_BUSY: 542 if (!from_verify) 543 unlog_io_piece(td, io_u); 544 requeue_io_u(td, &io_u); 545 ret2 = td_io_commit(td); 546 if (ret2 < 0) 547 *ret = ret2; 548 break; 549 default: 550 assert(*ret < 0); 551 td_verror(td, -(*ret), "td_io_queue"); 552 break; 553 } 554 555 if (break_on_this_error(td, ddir, ret)) 556 return 1; 557 558 return 0; 559 } 560 561 static inline bool io_in_polling(struct thread_data *td) 562 { 563 return !td->o.iodepth_batch_complete_min && 564 !td->o.iodepth_batch_complete_max; 565 } 566 /* 567 * Unlinks files from thread data fio_file structure 568 */ 569 static int unlink_all_files(struct thread_data *td) 570 { 571 struct fio_file *f; 572 unsigned int i; 573 int ret = 0; 574 575 for_each_file(td, f, i) { 576 if (f->filetype != FIO_TYPE_FILE) 577 continue; 578 ret = td_io_unlink_file(td, f); 579 if (ret) 580 break; 581 } 582 583 if (ret) 584 td_verror(td, ret, "unlink_all_files"); 585 586 return ret; 587 } 588 589 /* 590 * The main verify engine. Runs over the writes we previously submitted, 591 * reads the blocks back in, and checks the crc/md5 of the data. 592 */ 593 static void do_verify(struct thread_data *td, uint64_t verify_bytes) 594 { 595 struct fio_file *f; 596 struct io_u *io_u; 597 int ret, min_events; 598 unsigned int i; 599 600 dprint(FD_VERIFY, "starting loop\n"); 601 602 /* 603 * sync io first and invalidate cache, to make sure we really 604 * read from disk. 605 */ 606 for_each_file(td, f, i) { 607 if (!fio_file_open(f)) 608 continue; 609 if (fio_io_sync(td, f)) 610 break; 611 if (file_invalidate_cache(td, f)) 612 break; 613 } 614 615 check_update_rusage(td); 616 617 if (td->error) 618 return; 619 620 /* 621 * verify_state needs to be reset before verification 622 * proceeds so that expected random seeds match actual 623 * random seeds in headers. The main loop will reset 624 * all random number generators if randrepeat is set. 625 */ 626 if (!td->o.rand_repeatable) 627 td_fill_verify_state_seed(td); 628 629 td_set_runstate(td, TD_VERIFYING); 630 631 io_u = NULL; 632 while (!td->terminate) { 633 enum fio_ddir ddir; 634 int full; 635 636 update_tv_cache(td); 637 check_update_rusage(td); 638 639 if (runtime_exceeded(td, &td->tv_cache)) { 640 __update_tv_cache(td); 641 if (runtime_exceeded(td, &td->tv_cache)) { 642 fio_mark_td_terminate(td); 643 break; 644 } 645 } 646 647 if (flow_threshold_exceeded(td)) 648 continue; 649 650 if (!td->o.experimental_verify) { 651 io_u = __get_io_u(td); 652 if (!io_u) 653 break; 654 655 if (get_next_verify(td, io_u)) { 656 put_io_u(td, io_u); 657 break; 658 } 659 660 if (td_io_prep(td, io_u)) { 661 put_io_u(td, io_u); 662 break; 663 } 664 } else { 665 if (ddir_rw_sum(td->bytes_done) + td->o.rw_min_bs > verify_bytes) 666 break; 667 668 while ((io_u = get_io_u(td)) != NULL) { 669 if (IS_ERR_OR_NULL(io_u)) { 670 io_u = NULL; 671 ret = FIO_Q_BUSY; 672 goto reap; 673 } 674 675 /* 676 * We are only interested in the places where 677 * we wrote or trimmed IOs. Turn those into 678 * reads for verification purposes. 679 */ 680 if (io_u->ddir == DDIR_READ) { 681 /* 682 * Pretend we issued it for rwmix 683 * accounting 684 */ 685 td->io_issues[DDIR_READ]++; 686 put_io_u(td, io_u); 687 continue; 688 } else if (io_u->ddir == DDIR_TRIM) { 689 io_u->ddir = DDIR_READ; 690 io_u_set(td, io_u, IO_U_F_TRIMMED); 691 break; 692 } else if (io_u->ddir == DDIR_WRITE) { 693 io_u->ddir = DDIR_READ; 694 break; 695 } else { 696 put_io_u(td, io_u); 697 continue; 698 } 699 } 700 701 if (!io_u) 702 break; 703 } 704 705 if (verify_state_should_stop(td, io_u)) { 706 put_io_u(td, io_u); 707 break; 708 } 709 710 if (td->o.verify_async) 711 io_u->end_io = verify_io_u_async; 712 else 713 io_u->end_io = verify_io_u; 714 715 ddir = io_u->ddir; 716 if (!td->o.disable_slat) 717 fio_gettime(&io_u->start_time, NULL); 718 719 ret = td_io_queue(td, io_u); 720 721 if (io_queue_event(td, io_u, &ret, ddir, NULL, 1, NULL)) 722 break; 723 724 /* 725 * if we can queue more, do so. but check if there are 726 * completed io_u's first. Note that we can get BUSY even 727 * without IO queued, if the system is resource starved. 728 */ 729 reap: 730 full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth); 731 if (full || io_in_polling(td)) 732 ret = wait_for_completions(td, NULL); 733 734 if (ret < 0) 735 break; 736 } 737 738 check_update_rusage(td); 739 740 if (!td->error) { 741 min_events = td->cur_depth; 742 743 if (min_events) 744 ret = io_u_queued_complete(td, min_events); 745 } else 746 cleanup_pending_aio(td); 747 748 td_set_runstate(td, TD_RUNNING); 749 750 dprint(FD_VERIFY, "exiting loop\n"); 751 } 752 753 static bool exceeds_number_ios(struct thread_data *td) 754 { 755 unsigned long long number_ios; 756 757 if (!td->o.number_ios) 758 return false; 759 760 number_ios = ddir_rw_sum(td->io_blocks); 761 number_ios += td->io_u_queued + td->io_u_in_flight; 762 763 return number_ios >= (td->o.number_ios * td->loops); 764 } 765 766 static bool io_bytes_exceeded(struct thread_data *td, uint64_t *this_bytes) 767 { 768 unsigned long long bytes, limit; 769 770 if (td_rw(td)) 771 bytes = this_bytes[DDIR_READ] + this_bytes[DDIR_WRITE]; 772 else if (td_write(td)) 773 bytes = this_bytes[DDIR_WRITE]; 774 else if (td_read(td)) 775 bytes = this_bytes[DDIR_READ]; 776 else 777 bytes = this_bytes[DDIR_TRIM]; 778 779 if (td->o.io_size) 780 limit = td->o.io_size; 781 else 782 limit = td->o.size; 783 784 limit *= td->loops; 785 return bytes >= limit || exceeds_number_ios(td); 786 } 787 788 static bool io_issue_bytes_exceeded(struct thread_data *td) 789 { 790 return io_bytes_exceeded(td, td->io_issue_bytes); 791 } 792 793 static bool io_complete_bytes_exceeded(struct thread_data *td) 794 { 795 return io_bytes_exceeded(td, td->this_io_bytes); 796 } 797 798 /* 799 * used to calculate the next io time for rate control 800 * 801 */ 802 static long long usec_for_io(struct thread_data *td, enum fio_ddir ddir) 803 { 804 uint64_t secs, remainder, bps, bytes, iops; 805 806 assert(!(td->flags & TD_F_CHILD)); 807 bytes = td->rate_io_issue_bytes[ddir]; 808 bps = td->rate_bps[ddir]; 809 810 if (td->o.rate_process == RATE_PROCESS_POISSON) { 811 uint64_t val; 812 iops = bps / td->o.bs[ddir]; 813 val = (int64_t) (1000000 / iops) * 814 -logf(__rand_0_1(&td->poisson_state[ddir])); 815 if (val) { 816 dprint(FD_RATE, "poisson rate iops=%llu, ddir=%d\n", 817 (unsigned long long) 1000000 / val, 818 ddir); 819 } 820 td->last_usec[ddir] += val; 821 return td->last_usec[ddir]; 822 } else if (bps) { 823 secs = bytes / bps; 824 remainder = bytes % bps; 825 return remainder * 1000000 / bps + secs * 1000000; 826 } 827 828 return 0; 829 } 830 831 /* 832 * Main IO worker function. It retrieves io_u's to process and queues 833 * and reaps them, checking for rate and errors along the way. 834 * 835 * Returns number of bytes written and trimmed. 836 */ 837 static void do_io(struct thread_data *td, uint64_t *bytes_done) 838 { 839 unsigned int i; 840 int ret = 0; 841 uint64_t total_bytes, bytes_issued = 0; 842 843 for (i = 0; i < DDIR_RWDIR_CNT; i++) 844 bytes_done[i] = td->bytes_done[i]; 845 846 if (in_ramp_time(td)) 847 td_set_runstate(td, TD_RAMP); 848 else 849 td_set_runstate(td, TD_RUNNING); 850 851 lat_target_init(td); 852 853 total_bytes = td->o.size; 854 /* 855 * Allow random overwrite workloads to write up to io_size 856 * before starting verification phase as 'size' doesn't apply. 857 */ 858 if (td_write(td) && td_random(td) && td->o.norandommap) 859 total_bytes = max(total_bytes, (uint64_t) td->o.io_size); 860 /* 861 * If verify_backlog is enabled, we'll run the verify in this 862 * handler as well. For that case, we may need up to twice the 863 * amount of bytes. 864 */ 865 if (td->o.verify != VERIFY_NONE && 866 (td_write(td) && td->o.verify_backlog)) 867 total_bytes += td->o.size; 868 869 /* In trimwrite mode, each byte is trimmed and then written, so 870 * allow total_bytes to be twice as big */ 871 if (td_trimwrite(td)) 872 total_bytes += td->total_io_size; 873 874 while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) || 875 (!flist_empty(&td->trim_list)) || !io_issue_bytes_exceeded(td) || 876 td->o.time_based) { 877 struct timeval comp_time; 878 struct io_u *io_u; 879 int full; 880 enum fio_ddir ddir; 881 882 check_update_rusage(td); 883 884 if (td->terminate || td->done) 885 break; 886 887 update_tv_cache(td); 888 889 if (runtime_exceeded(td, &td->tv_cache)) { 890 __update_tv_cache(td); 891 if (runtime_exceeded(td, &td->tv_cache)) { 892 fio_mark_td_terminate(td); 893 break; 894 } 895 } 896 897 if (flow_threshold_exceeded(td)) 898 continue; 899 900 /* 901 * Break if we exceeded the bytes. The exception is time 902 * based runs, but we still need to break out of the loop 903 * for those to run verification, if enabled. 904 */ 905 if (bytes_issued >= total_bytes && 906 (!td->o.time_based || 907 (td->o.time_based && td->o.verify != VERIFY_NONE))) 908 break; 909 910 io_u = get_io_u(td); 911 if (IS_ERR_OR_NULL(io_u)) { 912 int err = PTR_ERR(io_u); 913 914 io_u = NULL; 915 if (err == -EBUSY) { 916 ret = FIO_Q_BUSY; 917 goto reap; 918 } 919 if (td->o.latency_target) 920 goto reap; 921 break; 922 } 923 924 ddir = io_u->ddir; 925 926 /* 927 * Add verification end_io handler if: 928 * - Asked to verify (!td_rw(td)) 929 * - Or the io_u is from our verify list (mixed write/ver) 930 */ 931 if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ && 932 ((io_u->flags & IO_U_F_VER_LIST) || !td_rw(td))) { 933 934 if (!td->o.verify_pattern_bytes) { 935 io_u->rand_seed = __rand(&td->verify_state); 936 if (sizeof(int) != sizeof(long *)) 937 io_u->rand_seed *= __rand(&td->verify_state); 938 } 939 940 if (verify_state_should_stop(td, io_u)) { 941 put_io_u(td, io_u); 942 break; 943 } 944 945 if (td->o.verify_async) 946 io_u->end_io = verify_io_u_async; 947 else 948 io_u->end_io = verify_io_u; 949 td_set_runstate(td, TD_VERIFYING); 950 } else if (in_ramp_time(td)) 951 td_set_runstate(td, TD_RAMP); 952 else 953 td_set_runstate(td, TD_RUNNING); 954 955 /* 956 * Always log IO before it's issued, so we know the specific 957 * order of it. The logged unit will track when the IO has 958 * completed. 959 */ 960 if (td_write(td) && io_u->ddir == DDIR_WRITE && 961 td->o.do_verify && 962 td->o.verify != VERIFY_NONE && 963 !td->o.experimental_verify) 964 log_io_piece(td, io_u); 965 966 if (td->o.io_submit_mode == IO_MODE_OFFLOAD) { 967 const unsigned long blen = io_u->xfer_buflen; 968 const enum fio_ddir ddir = acct_ddir(io_u); 969 970 if (td->error) 971 break; 972 973 workqueue_enqueue(&td->io_wq, &io_u->work); 974 ret = FIO_Q_QUEUED; 975 976 if (ddir_rw(ddir)) { 977 td->io_issues[ddir]++; 978 td->io_issue_bytes[ddir] += blen; 979 td->rate_io_issue_bytes[ddir] += blen; 980 } 981 982 if (should_check_rate(td)) 983 td->rate_next_io_time[ddir] = usec_for_io(td, ddir); 984 985 } else { 986 ret = td_io_queue(td, io_u); 987 988 if (should_check_rate(td)) 989 td->rate_next_io_time[ddir] = usec_for_io(td, ddir); 990 991 if (io_queue_event(td, io_u, &ret, ddir, &bytes_issued, 0, &comp_time)) 992 break; 993 994 /* 995 * See if we need to complete some commands. Note that 996 * we can get BUSY even without IO queued, if the 997 * system is resource starved. 998 */ 999 reap: 1000 full = queue_full(td) || 1001 (ret == FIO_Q_BUSY && td->cur_depth); 1002 if (full || io_in_polling(td)) 1003 ret = wait_for_completions(td, &comp_time); 1004 } 1005 if (ret < 0) 1006 break; 1007 if (!ddir_rw_sum(td->bytes_done) && 1008 !td_ioengine_flagged(td, FIO_NOIO)) 1009 continue; 1010 1011 if (!in_ramp_time(td) && should_check_rate(td)) { 1012 if (check_min_rate(td, &comp_time)) { 1013 if (exitall_on_terminate || td->o.exitall_error) 1014 fio_terminate_threads(td->groupid); 1015 td_verror(td, EIO, "check_min_rate"); 1016 break; 1017 } 1018 } 1019 if (!in_ramp_time(td) && td->o.latency_target) 1020 lat_target_check(td); 1021 1022 if (td->o.thinktime) { 1023 unsigned long long b; 1024 1025 b = ddir_rw_sum(td->io_blocks); 1026 if (!(b % td->o.thinktime_blocks)) { 1027 int left; 1028 1029 io_u_quiesce(td); 1030 1031 if (td->o.thinktime_spin) 1032 usec_spin(td->o.thinktime_spin); 1033 1034 left = td->o.thinktime - td->o.thinktime_spin; 1035 if (left) 1036 usec_sleep(td, left); 1037 } 1038 } 1039 } 1040 1041 check_update_rusage(td); 1042 1043 if (td->trim_entries) 1044 log_err("fio: %lu trim entries leaked?\n", td->trim_entries); 1045 1046 if (td->o.fill_device && td->error == ENOSPC) { 1047 td->error = 0; 1048 fio_mark_td_terminate(td); 1049 } 1050 if (!td->error) { 1051 struct fio_file *f; 1052 1053 if (td->o.io_submit_mode == IO_MODE_OFFLOAD) { 1054 workqueue_flush(&td->io_wq); 1055 i = 0; 1056 } else 1057 i = td->cur_depth; 1058 1059 if (i) { 1060 ret = io_u_queued_complete(td, i); 1061 if (td->o.fill_device && td->error == ENOSPC) 1062 td->error = 0; 1063 } 1064 1065 if (should_fsync(td) && td->o.end_fsync) { 1066 td_set_runstate(td, TD_FSYNCING); 1067 1068 for_each_file(td, f, i) { 1069 if (!fio_file_fsync(td, f)) 1070 continue; 1071 1072 log_err("fio: end_fsync failed for file %s\n", 1073 f->file_name); 1074 } 1075 } 1076 } else 1077 cleanup_pending_aio(td); 1078 1079 /* 1080 * stop job if we failed doing any IO 1081 */ 1082 if (!ddir_rw_sum(td->this_io_bytes)) 1083 td->done = 1; 1084 1085 for (i = 0; i < DDIR_RWDIR_CNT; i++) 1086 bytes_done[i] = td->bytes_done[i] - bytes_done[i]; 1087 } 1088 1089 static void free_file_completion_logging(struct thread_data *td) 1090 { 1091 struct fio_file *f; 1092 unsigned int i; 1093 1094 for_each_file(td, f, i) { 1095 if (!f->last_write_comp) 1096 break; 1097 sfree(f->last_write_comp); 1098 } 1099 } 1100 1101 static int init_file_completion_logging(struct thread_data *td, 1102 unsigned int depth) 1103 { 1104 struct fio_file *f; 1105 unsigned int i; 1106 1107 if (td->o.verify == VERIFY_NONE || !td->o.verify_state_save) 1108 return 0; 1109 1110 for_each_file(td, f, i) { 1111 f->last_write_comp = scalloc(depth, sizeof(uint64_t)); 1112 if (!f->last_write_comp) 1113 goto cleanup; 1114 } 1115 1116 return 0; 1117 1118 cleanup: 1119 free_file_completion_logging(td); 1120 log_err("fio: failed to alloc write comp data\n"); 1121 return 1; 1122 } 1123 1124 static void cleanup_io_u(struct thread_data *td) 1125 { 1126 struct io_u *io_u; 1127 1128 while ((io_u = io_u_qpop(&td->io_u_freelist)) != NULL) { 1129 1130 if (td->io_ops->io_u_free) 1131 td->io_ops->io_u_free(td, io_u); 1132 1133 fio_memfree(io_u, sizeof(*io_u)); 1134 } 1135 1136 free_io_mem(td); 1137 1138 io_u_rexit(&td->io_u_requeues); 1139 io_u_qexit(&td->io_u_freelist); 1140 io_u_qexit(&td->io_u_all); 1141 1142 free_file_completion_logging(td); 1143 } 1144 1145 static int init_io_u(struct thread_data *td) 1146 { 1147 struct io_u *io_u; 1148 unsigned int max_bs, min_write; 1149 int cl_align, i, max_units; 1150 int data_xfer = 1, err; 1151 char *p; 1152 1153 max_units = td->o.iodepth; 1154 max_bs = td_max_bs(td); 1155 min_write = td->o.min_bs[DDIR_WRITE]; 1156 td->orig_buffer_size = (unsigned long long) max_bs 1157 * (unsigned long long) max_units; 1158 1159 if (td_ioengine_flagged(td, FIO_NOIO) || !(td_read(td) || td_write(td))) 1160 data_xfer = 0; 1161 1162 err = 0; 1163 err += io_u_rinit(&td->io_u_requeues, td->o.iodepth); 1164 err += io_u_qinit(&td->io_u_freelist, td->o.iodepth); 1165 err += io_u_qinit(&td->io_u_all, td->o.iodepth); 1166 1167 if (err) { 1168 log_err("fio: failed setting up IO queues\n"); 1169 return 1; 1170 } 1171 1172 /* 1173 * if we may later need to do address alignment, then add any 1174 * possible adjustment here so that we don't cause a buffer 1175 * overflow later. this adjustment may be too much if we get 1176 * lucky and the allocator gives us an aligned address. 1177 */ 1178 if (td->o.odirect || td->o.mem_align || td->o.oatomic || 1179 td_ioengine_flagged(td, FIO_RAWIO)) 1180 td->orig_buffer_size += page_mask + td->o.mem_align; 1181 1182 if (td->o.mem_type == MEM_SHMHUGE || td->o.mem_type == MEM_MMAPHUGE) { 1183 unsigned long bs; 1184 1185 bs = td->orig_buffer_size + td->o.hugepage_size - 1; 1186 td->orig_buffer_size = bs & ~(td->o.hugepage_size - 1); 1187 } 1188 1189 if (td->orig_buffer_size != (size_t) td->orig_buffer_size) { 1190 log_err("fio: IO memory too large. Reduce max_bs or iodepth\n"); 1191 return 1; 1192 } 1193 1194 if (data_xfer && allocate_io_mem(td)) 1195 return 1; 1196 1197 if (td->o.odirect || td->o.mem_align || td->o.oatomic || 1198 td_ioengine_flagged(td, FIO_RAWIO)) 1199 p = PTR_ALIGN(td->orig_buffer, page_mask) + td->o.mem_align; 1200 else 1201 p = td->orig_buffer; 1202 1203 cl_align = os_cache_line_size(); 1204 1205 for (i = 0; i < max_units; i++) { 1206 void *ptr; 1207 1208 if (td->terminate) 1209 return 1; 1210 1211 ptr = fio_memalign(cl_align, sizeof(*io_u)); 1212 if (!ptr) { 1213 log_err("fio: unable to allocate aligned memory\n"); 1214 break; 1215 } 1216 1217 io_u = ptr; 1218 memset(io_u, 0, sizeof(*io_u)); 1219 INIT_FLIST_HEAD(&io_u->verify_list); 1220 dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i); 1221 1222 if (data_xfer) { 1223 io_u->buf = p; 1224 dprint(FD_MEM, "io_u %p, mem %p\n", io_u, io_u->buf); 1225 1226 if (td_write(td)) 1227 io_u_fill_buffer(td, io_u, min_write, max_bs); 1228 if (td_write(td) && td->o.verify_pattern_bytes) { 1229 /* 1230 * Fill the buffer with the pattern if we are 1231 * going to be doing writes. 1232 */ 1233 fill_verify_pattern(td, io_u->buf, max_bs, io_u, 0, 0); 1234 } 1235 } 1236 1237 io_u->index = i; 1238 io_u->flags = IO_U_F_FREE; 1239 io_u_qpush(&td->io_u_freelist, io_u); 1240 1241 /* 1242 * io_u never leaves this stack, used for iteration of all 1243 * io_u buffers. 1244 */ 1245 io_u_qpush(&td->io_u_all, io_u); 1246 1247 if (td->io_ops->io_u_init) { 1248 int ret = td->io_ops->io_u_init(td, io_u); 1249 1250 if (ret) { 1251 log_err("fio: failed to init engine data: %d\n", ret); 1252 return 1; 1253 } 1254 } 1255 1256 p += max_bs; 1257 } 1258 1259 if (init_file_completion_logging(td, max_units)) 1260 return 1; 1261 1262 return 0; 1263 } 1264 1265 /* 1266 * This function is Linux specific. 1267 * FIO_HAVE_IOSCHED_SWITCH enabled currently means it's Linux. 1268 */ 1269 static int switch_ioscheduler(struct thread_data *td) 1270 { 1271 #ifdef FIO_HAVE_IOSCHED_SWITCH 1272 char tmp[256], tmp2[128]; 1273 FILE *f; 1274 int ret; 1275 1276 if (td_ioengine_flagged(td, FIO_DISKLESSIO)) 1277 return 0; 1278 1279 assert(td->files && td->files[0]); 1280 sprintf(tmp, "%s/queue/scheduler", td->files[0]->du->sysfs_root); 1281 1282 f = fopen(tmp, "r+"); 1283 if (!f) { 1284 if (errno == ENOENT) { 1285 log_err("fio: os or kernel doesn't support IO scheduler" 1286 " switching\n"); 1287 return 0; 1288 } 1289 td_verror(td, errno, "fopen iosched"); 1290 return 1; 1291 } 1292 1293 /* 1294 * Set io scheduler. 1295 */ 1296 ret = fwrite(td->o.ioscheduler, strlen(td->o.ioscheduler), 1, f); 1297 if (ferror(f) || ret != 1) { 1298 td_verror(td, errno, "fwrite"); 1299 fclose(f); 1300 return 1; 1301 } 1302 1303 rewind(f); 1304 1305 /* 1306 * Read back and check that the selected scheduler is now the default. 1307 */ 1308 memset(tmp, 0, sizeof(tmp)); 1309 ret = fread(tmp, sizeof(tmp), 1, f); 1310 if (ferror(f) || ret < 0) { 1311 td_verror(td, errno, "fread"); 1312 fclose(f); 1313 return 1; 1314 } 1315 /* 1316 * either a list of io schedulers or "none\n" is expected. 1317 */ 1318 tmp[strlen(tmp) - 1] = '\0'; 1319 1320 /* 1321 * Write to "none" entry doesn't fail, so check the result here. 1322 */ 1323 if (!strcmp(tmp, "none")) { 1324 log_err("fio: io scheduler is not tunable\n"); 1325 fclose(f); 1326 return 0; 1327 } 1328 1329 sprintf(tmp2, "[%s]", td->o.ioscheduler); 1330 if (!strstr(tmp, tmp2)) { 1331 log_err("fio: io scheduler %s not found\n", td->o.ioscheduler); 1332 td_verror(td, EINVAL, "iosched_switch"); 1333 fclose(f); 1334 return 1; 1335 } 1336 1337 fclose(f); 1338 return 0; 1339 #else 1340 return 0; 1341 #endif 1342 } 1343 1344 static bool keep_running(struct thread_data *td) 1345 { 1346 unsigned long long limit; 1347 1348 if (td->done) 1349 return false; 1350 if (td->o.time_based) 1351 return true; 1352 if (td->o.loops) { 1353 td->o.loops--; 1354 return true; 1355 } 1356 if (exceeds_number_ios(td)) 1357 return false; 1358 1359 if (td->o.io_size) 1360 limit = td->o.io_size; 1361 else 1362 limit = td->o.size; 1363 1364 if (limit != -1ULL && ddir_rw_sum(td->io_bytes) < limit) { 1365 uint64_t diff; 1366 1367 /* 1368 * If the difference is less than the maximum IO size, we 1369 * are done. 1370 */ 1371 diff = limit - ddir_rw_sum(td->io_bytes); 1372 if (diff < td_max_bs(td)) 1373 return false; 1374 1375 if (fio_files_done(td) && !td->o.io_size) 1376 return false; 1377 1378 return true; 1379 } 1380 1381 return false; 1382 } 1383 1384 static int exec_string(struct thread_options *o, const char *string, const char *mode) 1385 { 1386 size_t newlen = strlen(string) + strlen(o->name) + strlen(mode) + 9 + 1; 1387 int ret; 1388 char *str; 1389 1390 str = malloc(newlen); 1391 sprintf(str, "%s &> %s.%s.txt", string, o->name, mode); 1392 1393 log_info("%s : Saving output of %s in %s.%s.txt\n",o->name, mode, o->name, mode); 1394 ret = system(str); 1395 if (ret == -1) 1396 log_err("fio: exec of cmd <%s> failed\n", str); 1397 1398 free(str); 1399 return ret; 1400 } 1401 1402 /* 1403 * Dry run to compute correct state of numberio for verification. 1404 */ 1405 static uint64_t do_dry_run(struct thread_data *td) 1406 { 1407 td_set_runstate(td, TD_RUNNING); 1408 1409 while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) || 1410 (!flist_empty(&td->trim_list)) || !io_complete_bytes_exceeded(td)) { 1411 struct io_u *io_u; 1412 int ret; 1413 1414 if (td->terminate || td->done) 1415 break; 1416 1417 io_u = get_io_u(td); 1418 if (IS_ERR_OR_NULL(io_u)) 1419 break; 1420 1421 io_u_set(td, io_u, IO_U_F_FLIGHT); 1422 io_u->error = 0; 1423 io_u->resid = 0; 1424 if (ddir_rw(acct_ddir(io_u))) 1425 td->io_issues[acct_ddir(io_u)]++; 1426 if (ddir_rw(io_u->ddir)) { 1427 io_u_mark_depth(td, 1); 1428 td->ts.total_io_u[io_u->ddir]++; 1429 } 1430 1431 if (td_write(td) && io_u->ddir == DDIR_WRITE && 1432 td->o.do_verify && 1433 td->o.verify != VERIFY_NONE && 1434 !td->o.experimental_verify) 1435 log_io_piece(td, io_u); 1436 1437 ret = io_u_sync_complete(td, io_u); 1438 (void) ret; 1439 } 1440 1441 return td->bytes_done[DDIR_WRITE] + td->bytes_done[DDIR_TRIM]; 1442 } 1443 1444 struct fork_data { 1445 struct thread_data *td; 1446 struct sk_out *sk_out; 1447 }; 1448 1449 /* 1450 * Entry point for the thread based jobs. The process based jobs end up 1451 * here as well, after a little setup. 1452 */ 1453 static void *thread_main(void *data) 1454 { 1455 struct fork_data *fd = data; 1456 unsigned long long elapsed_us[DDIR_RWDIR_CNT] = { 0, }; 1457 struct thread_data *td = fd->td; 1458 struct thread_options *o = &td->o; 1459 struct sk_out *sk_out = fd->sk_out; 1460 uint64_t bytes_done[DDIR_RWDIR_CNT]; 1461 int deadlock_loop_cnt; 1462 int clear_state; 1463 int ret; 1464 1465 sk_out_assign(sk_out); 1466 free(fd); 1467 1468 if (!o->use_thread) { 1469 setsid(); 1470 td->pid = getpid(); 1471 } else 1472 td->pid = gettid(); 1473 1474 fio_local_clock_init(o->use_thread); 1475 1476 dprint(FD_PROCESS, "jobs pid=%d started\n", (int) td->pid); 1477 1478 if (is_backend) 1479 fio_server_send_start(td); 1480 1481 INIT_FLIST_HEAD(&td->io_log_list); 1482 INIT_FLIST_HEAD(&td->io_hist_list); 1483 INIT_FLIST_HEAD(&td->verify_list); 1484 INIT_FLIST_HEAD(&td->trim_list); 1485 INIT_FLIST_HEAD(&td->next_rand_list); 1486 td->io_hist_tree = RB_ROOT; 1487 1488 ret = mutex_cond_init_pshared(&td->io_u_lock, &td->free_cond); 1489 if (ret) { 1490 td_verror(td, ret, "mutex_cond_init_pshared"); 1491 goto err; 1492 } 1493 ret = cond_init_pshared(&td->verify_cond); 1494 if (ret) { 1495 td_verror(td, ret, "mutex_cond_pshared"); 1496 goto err; 1497 } 1498 1499 td_set_runstate(td, TD_INITIALIZED); 1500 dprint(FD_MUTEX, "up startup_mutex\n"); 1501 fio_mutex_up(startup_mutex); 1502 dprint(FD_MUTEX, "wait on td->mutex\n"); 1503 fio_mutex_down(td->mutex); 1504 dprint(FD_MUTEX, "done waiting on td->mutex\n"); 1505 1506 /* 1507 * A new gid requires privilege, so we need to do this before setting 1508 * the uid. 1509 */ 1510 if (o->gid != -1U && setgid(o->gid)) { 1511 td_verror(td, errno, "setgid"); 1512 goto err; 1513 } 1514 if (o->uid != -1U && setuid(o->uid)) { 1515 td_verror(td, errno, "setuid"); 1516 goto err; 1517 } 1518 1519 /* 1520 * Do this early, we don't want the compress threads to be limited 1521 * to the same CPUs as the IO workers. So do this before we set 1522 * any potential CPU affinity 1523 */ 1524 if (iolog_compress_init(td, sk_out)) 1525 goto err; 1526 1527 /* 1528 * If we have a gettimeofday() thread, make sure we exclude that 1529 * thread from this job 1530 */ 1531 if (o->gtod_cpu) 1532 fio_cpu_clear(&o->cpumask, o->gtod_cpu); 1533 1534 /* 1535 * Set affinity first, in case it has an impact on the memory 1536 * allocations. 1537 */ 1538 if (fio_option_is_set(o, cpumask)) { 1539 if (o->cpus_allowed_policy == FIO_CPUS_SPLIT) { 1540 ret = fio_cpus_split(&o->cpumask, td->thread_number - 1); 1541 if (!ret) { 1542 log_err("fio: no CPUs set\n"); 1543 log_err("fio: Try increasing number of available CPUs\n"); 1544 td_verror(td, EINVAL, "cpus_split"); 1545 goto err; 1546 } 1547 } 1548 ret = fio_setaffinity(td->pid, o->cpumask); 1549 if (ret == -1) { 1550 td_verror(td, errno, "cpu_set_affinity"); 1551 goto err; 1552 } 1553 } 1554 1555 #ifdef CONFIG_LIBNUMA 1556 /* numa node setup */ 1557 if (fio_option_is_set(o, numa_cpunodes) || 1558 fio_option_is_set(o, numa_memnodes)) { 1559 struct bitmask *mask; 1560 1561 if (numa_available() < 0) { 1562 td_verror(td, errno, "Does not support NUMA API\n"); 1563 goto err; 1564 } 1565 1566 if (fio_option_is_set(o, numa_cpunodes)) { 1567 mask = numa_parse_nodestring(o->numa_cpunodes); 1568 ret = numa_run_on_node_mask(mask); 1569 numa_free_nodemask(mask); 1570 if (ret == -1) { 1571 td_verror(td, errno, \ 1572 "numa_run_on_node_mask failed\n"); 1573 goto err; 1574 } 1575 } 1576 1577 if (fio_option_is_set(o, numa_memnodes)) { 1578 mask = NULL; 1579 if (o->numa_memnodes) 1580 mask = numa_parse_nodestring(o->numa_memnodes); 1581 1582 switch (o->numa_mem_mode) { 1583 case MPOL_INTERLEAVE: 1584 numa_set_interleave_mask(mask); 1585 break; 1586 case MPOL_BIND: 1587 numa_set_membind(mask); 1588 break; 1589 case MPOL_LOCAL: 1590 numa_set_localalloc(); 1591 break; 1592 case MPOL_PREFERRED: 1593 numa_set_preferred(o->numa_mem_prefer_node); 1594 break; 1595 case MPOL_DEFAULT: 1596 default: 1597 break; 1598 } 1599 1600 if (mask) 1601 numa_free_nodemask(mask); 1602 1603 } 1604 } 1605 #endif 1606 1607 if (fio_pin_memory(td)) 1608 goto err; 1609 1610 /* 1611 * May alter parameters that init_io_u() will use, so we need to 1612 * do this first. 1613 */ 1614 if (init_iolog(td)) 1615 goto err; 1616 1617 if (init_io_u(td)) 1618 goto err; 1619 1620 if (o->verify_async && verify_async_init(td)) 1621 goto err; 1622 1623 if (fio_option_is_set(o, ioprio) || 1624 fio_option_is_set(o, ioprio_class)) { 1625 ret = ioprio_set(IOPRIO_WHO_PROCESS, 0, o->ioprio_class, o->ioprio); 1626 if (ret == -1) { 1627 td_verror(td, errno, "ioprio_set"); 1628 goto err; 1629 } 1630 } 1631 1632 if (o->cgroup && cgroup_setup(td, cgroup_list, &cgroup_mnt)) 1633 goto err; 1634 1635 errno = 0; 1636 if (nice(o->nice) == -1 && errno != 0) { 1637 td_verror(td, errno, "nice"); 1638 goto err; 1639 } 1640 1641 if (o->ioscheduler && switch_ioscheduler(td)) 1642 goto err; 1643 1644 if (!o->create_serialize && setup_files(td)) 1645 goto err; 1646 1647 if (td_io_init(td)) 1648 goto err; 1649 1650 if (init_random_map(td)) 1651 goto err; 1652 1653 if (o->exec_prerun && exec_string(o, o->exec_prerun, (const char *)"prerun")) 1654 goto err; 1655 1656 if (o->pre_read) { 1657 if (pre_read_files(td) < 0) 1658 goto err; 1659 } 1660 1661 fio_verify_init(td); 1662 1663 if (rate_submit_init(td, sk_out)) 1664 goto err; 1665 1666 set_epoch_time(td, o->log_unix_epoch); 1667 fio_getrusage(&td->ru_start); 1668 memcpy(&td->bw_sample_time, &td->epoch, sizeof(td->epoch)); 1669 memcpy(&td->iops_sample_time, &td->epoch, sizeof(td->epoch)); 1670 memcpy(&td->ss.prev_time, &td->epoch, sizeof(td->epoch)); 1671 1672 if (o->ratemin[DDIR_READ] || o->ratemin[DDIR_WRITE] || 1673 o->ratemin[DDIR_TRIM]) { 1674 memcpy(&td->lastrate[DDIR_READ], &td->bw_sample_time, 1675 sizeof(td->bw_sample_time)); 1676 memcpy(&td->lastrate[DDIR_WRITE], &td->bw_sample_time, 1677 sizeof(td->bw_sample_time)); 1678 memcpy(&td->lastrate[DDIR_TRIM], &td->bw_sample_time, 1679 sizeof(td->bw_sample_time)); 1680 } 1681 1682 memset(bytes_done, 0, sizeof(bytes_done)); 1683 clear_state = 0; 1684 1685 while (keep_running(td)) { 1686 uint64_t verify_bytes; 1687 1688 fio_gettime(&td->start, NULL); 1689 memcpy(&td->tv_cache, &td->start, sizeof(td->start)); 1690 1691 if (clear_state) { 1692 clear_io_state(td, 0); 1693 1694 if (o->unlink_each_loop && unlink_all_files(td)) 1695 break; 1696 } 1697 1698 prune_io_piece_log(td); 1699 1700 if (td->o.verify_only && td_write(td)) 1701 verify_bytes = do_dry_run(td); 1702 else { 1703 do_io(td, bytes_done); 1704 1705 if (!ddir_rw_sum(bytes_done)) { 1706 fio_mark_td_terminate(td); 1707 verify_bytes = 0; 1708 } else { 1709 verify_bytes = bytes_done[DDIR_WRITE] + 1710 bytes_done[DDIR_TRIM]; 1711 } 1712 } 1713 1714 /* 1715 * If we took too long to shut down, the main thread could 1716 * already consider us reaped/exited. If that happens, break 1717 * out and clean up. 1718 */ 1719 if (td->runstate >= TD_EXITED) 1720 break; 1721 1722 clear_state = 1; 1723 1724 /* 1725 * Make sure we've successfully updated the rusage stats 1726 * before waiting on the stat mutex. Otherwise we could have 1727 * the stat thread holding stat mutex and waiting for 1728 * the rusage_sem, which would never get upped because 1729 * this thread is waiting for the stat mutex. 1730 */ 1731 deadlock_loop_cnt = 0; 1732 do { 1733 check_update_rusage(td); 1734 if (!fio_mutex_down_trylock(stat_mutex)) 1735 break; 1736 usleep(1000); 1737 if (deadlock_loop_cnt++ > 5000) { 1738 log_err("fio seems to be stuck grabbing stat_mutex, forcibly exiting\n"); 1739 td->error = EDEADLK; 1740 goto err; 1741 } 1742 } while (1); 1743 1744 if (td_read(td) && td->io_bytes[DDIR_READ]) 1745 update_runtime(td, elapsed_us, DDIR_READ); 1746 if (td_write(td) && td->io_bytes[DDIR_WRITE]) 1747 update_runtime(td, elapsed_us, DDIR_WRITE); 1748 if (td_trim(td) && td->io_bytes[DDIR_TRIM]) 1749 update_runtime(td, elapsed_us, DDIR_TRIM); 1750 fio_gettime(&td->start, NULL); 1751 fio_mutex_up(stat_mutex); 1752 1753 if (td->error || td->terminate) 1754 break; 1755 1756 if (!o->do_verify || 1757 o->verify == VERIFY_NONE || 1758 td_ioengine_flagged(td, FIO_UNIDIR)) 1759 continue; 1760 1761 clear_io_state(td, 0); 1762 1763 fio_gettime(&td->start, NULL); 1764 1765 do_verify(td, verify_bytes); 1766 1767 /* 1768 * See comment further up for why this is done here. 1769 */ 1770 check_update_rusage(td); 1771 1772 fio_mutex_down(stat_mutex); 1773 update_runtime(td, elapsed_us, DDIR_READ); 1774 fio_gettime(&td->start, NULL); 1775 fio_mutex_up(stat_mutex); 1776 1777 if (td->error || td->terminate) 1778 break; 1779 } 1780 1781 /* 1782 * If td ended up with no I/O when it should have had, 1783 * then something went wrong unless FIO_NOIO or FIO_DISKLESSIO. 1784 * (Are we not missing other flags that can be ignored ?) 1785 */ 1786 if ((td->o.size || td->o.io_size) && !ddir_rw_sum(bytes_done) && 1787 !(td_ioengine_flagged(td, FIO_NOIO) || 1788 td_ioengine_flagged(td, FIO_DISKLESSIO))) 1789 log_err("%s: No I/O performed by %s, " 1790 "perhaps try --debug=io option for details?\n", 1791 td->o.name, td->io_ops->name); 1792 1793 td_set_runstate(td, TD_FINISHING); 1794 1795 update_rusage_stat(td); 1796 td->ts.total_run_time = mtime_since_now(&td->epoch); 1797 td->ts.io_bytes[DDIR_READ] = td->io_bytes[DDIR_READ]; 1798 td->ts.io_bytes[DDIR_WRITE] = td->io_bytes[DDIR_WRITE]; 1799 td->ts.io_bytes[DDIR_TRIM] = td->io_bytes[DDIR_TRIM]; 1800 1801 if (td->o.verify_state_save && !(td->flags & TD_F_VSTATE_SAVED) && 1802 (td->o.verify != VERIFY_NONE && td_write(td))) 1803 verify_save_state(td->thread_number); 1804 1805 fio_unpin_memory(td); 1806 1807 td_writeout_logs(td, true); 1808 1809 iolog_compress_exit(td); 1810 rate_submit_exit(td); 1811 1812 if (o->exec_postrun) 1813 exec_string(o, o->exec_postrun, (const char *)"postrun"); 1814 1815 if (exitall_on_terminate || (o->exitall_error && td->error)) 1816 fio_terminate_threads(td->groupid); 1817 1818 err: 1819 if (td->error) 1820 log_info("fio: pid=%d, err=%d/%s\n", (int) td->pid, td->error, 1821 td->verror); 1822 1823 if (o->verify_async) 1824 verify_async_exit(td); 1825 1826 close_and_free_files(td); 1827 cleanup_io_u(td); 1828 close_ioengine(td); 1829 cgroup_shutdown(td, &cgroup_mnt); 1830 verify_free_state(td); 1831 1832 if (td->zone_state_index) { 1833 int i; 1834 1835 for (i = 0; i < DDIR_RWDIR_CNT; i++) 1836 free(td->zone_state_index[i]); 1837 free(td->zone_state_index); 1838 td->zone_state_index = NULL; 1839 } 1840 1841 if (fio_option_is_set(o, cpumask)) { 1842 ret = fio_cpuset_exit(&o->cpumask); 1843 if (ret) 1844 td_verror(td, ret, "fio_cpuset_exit"); 1845 } 1846 1847 /* 1848 * do this very late, it will log file closing as well 1849 */ 1850 if (o->write_iolog_file) 1851 write_iolog_close(td); 1852 1853 td_set_runstate(td, TD_EXITED); 1854 1855 /* 1856 * Do this last after setting our runstate to exited, so we 1857 * know that the stat thread is signaled. 1858 */ 1859 check_update_rusage(td); 1860 1861 sk_out_drop(); 1862 return (void *) (uintptr_t) td->error; 1863 } 1864 1865 /* 1866 * Run over the job map and reap the threads that have exited, if any. 1867 */ 1868 static void reap_threads(unsigned int *nr_running, uint64_t *t_rate, 1869 uint64_t *m_rate) 1870 { 1871 struct thread_data *td; 1872 unsigned int cputhreads, realthreads, pending; 1873 int i, status, ret; 1874 1875 /* 1876 * reap exited threads (TD_EXITED -> TD_REAPED) 1877 */ 1878 realthreads = pending = cputhreads = 0; 1879 for_each_td(td, i) { 1880 int flags = 0; 1881 1882 /* 1883 * ->io_ops is NULL for a thread that has closed its 1884 * io engine 1885 */ 1886 if (td->io_ops && !strcmp(td->io_ops->name, "cpuio")) 1887 cputhreads++; 1888 else 1889 realthreads++; 1890 1891 if (!td->pid) { 1892 pending++; 1893 continue; 1894 } 1895 if (td->runstate == TD_REAPED) 1896 continue; 1897 if (td->o.use_thread) { 1898 if (td->runstate == TD_EXITED) { 1899 td_set_runstate(td, TD_REAPED); 1900 goto reaped; 1901 } 1902 continue; 1903 } 1904 1905 flags = WNOHANG; 1906 if (td->runstate == TD_EXITED) 1907 flags = 0; 1908 1909 /* 1910 * check if someone quit or got killed in an unusual way 1911 */ 1912 ret = waitpid(td->pid, &status, flags); 1913 if (ret < 0) { 1914 if (errno == ECHILD) { 1915 log_err("fio: pid=%d disappeared %d\n", 1916 (int) td->pid, td->runstate); 1917 td->sig = ECHILD; 1918 td_set_runstate(td, TD_REAPED); 1919 goto reaped; 1920 } 1921 perror("waitpid"); 1922 } else if (ret == td->pid) { 1923 if (WIFSIGNALED(status)) { 1924 int sig = WTERMSIG(status); 1925 1926 if (sig != SIGTERM && sig != SIGUSR2) 1927 log_err("fio: pid=%d, got signal=%d\n", 1928 (int) td->pid, sig); 1929 td->sig = sig; 1930 td_set_runstate(td, TD_REAPED); 1931 goto reaped; 1932 } 1933 if (WIFEXITED(status)) { 1934 if (WEXITSTATUS(status) && !td->error) 1935 td->error = WEXITSTATUS(status); 1936 1937 td_set_runstate(td, TD_REAPED); 1938 goto reaped; 1939 } 1940 } 1941 1942 /* 1943 * If the job is stuck, do a forceful timeout of it and 1944 * move on. 1945 */ 1946 if (td->terminate && 1947 td->runstate < TD_FSYNCING && 1948 time_since_now(&td->terminate_time) >= FIO_REAP_TIMEOUT) { 1949 log_err("fio: job '%s' (state=%d) hasn't exited in " 1950 "%lu seconds, it appears to be stuck. Doing " 1951 "forceful exit of this job.\n", 1952 td->o.name, td->runstate, 1953 (unsigned long) time_since_now(&td->terminate_time)); 1954 td_set_runstate(td, TD_REAPED); 1955 goto reaped; 1956 } 1957 1958 /* 1959 * thread is not dead, continue 1960 */ 1961 pending++; 1962 continue; 1963 reaped: 1964 (*nr_running)--; 1965 (*m_rate) -= ddir_rw_sum(td->o.ratemin); 1966 (*t_rate) -= ddir_rw_sum(td->o.rate); 1967 if (!td->pid) 1968 pending--; 1969 1970 if (td->error) 1971 exit_value++; 1972 1973 done_secs += mtime_since_now(&td->epoch) / 1000; 1974 profile_td_exit(td); 1975 } 1976 1977 if (*nr_running == cputhreads && !pending && realthreads) 1978 fio_terminate_threads(TERMINATE_ALL); 1979 } 1980 1981 static bool __check_trigger_file(void) 1982 { 1983 struct stat sb; 1984 1985 if (!trigger_file) 1986 return false; 1987 1988 if (stat(trigger_file, &sb)) 1989 return false; 1990 1991 if (unlink(trigger_file) < 0) 1992 log_err("fio: failed to unlink %s: %s\n", trigger_file, 1993 strerror(errno)); 1994 1995 return true; 1996 } 1997 1998 static bool trigger_timedout(void) 1999 { 2000 if (trigger_timeout) 2001 return time_since_genesis() >= trigger_timeout; 2002 2003 return false; 2004 } 2005 2006 void exec_trigger(const char *cmd) 2007 { 2008 int ret; 2009 2010 if (!cmd) 2011 return; 2012 2013 ret = system(cmd); 2014 if (ret == -1) 2015 log_err("fio: failed executing %s trigger\n", cmd); 2016 } 2017 2018 void check_trigger_file(void) 2019 { 2020 if (__check_trigger_file() || trigger_timedout()) { 2021 if (nr_clients) 2022 fio_clients_send_trigger(trigger_remote_cmd); 2023 else { 2024 verify_save_state(IO_LIST_ALL); 2025 fio_terminate_threads(TERMINATE_ALL); 2026 exec_trigger(trigger_cmd); 2027 } 2028 } 2029 } 2030 2031 static int fio_verify_load_state(struct thread_data *td) 2032 { 2033 int ret; 2034 2035 if (!td->o.verify_state) 2036 return 0; 2037 2038 if (is_backend) { 2039 void *data; 2040 2041 ret = fio_server_get_verify_state(td->o.name, 2042 td->thread_number - 1, &data); 2043 if (!ret) 2044 verify_assign_state(td, data); 2045 } else 2046 ret = verify_load_state(td, "local"); 2047 2048 return ret; 2049 } 2050 2051 static void do_usleep(unsigned int usecs) 2052 { 2053 check_for_running_stats(); 2054 check_trigger_file(); 2055 usleep(usecs); 2056 } 2057 2058 static bool check_mount_writes(struct thread_data *td) 2059 { 2060 struct fio_file *f; 2061 unsigned int i; 2062 2063 if (!td_write(td) || td->o.allow_mounted_write) 2064 return false; 2065 2066 /* 2067 * If FIO_HAVE_CHARDEV_SIZE is defined, it's likely that chrdevs 2068 * are mkfs'd and mounted. 2069 */ 2070 for_each_file(td, f, i) { 2071 #ifdef FIO_HAVE_CHARDEV_SIZE 2072 if (f->filetype != FIO_TYPE_BLOCK && f->filetype != FIO_TYPE_CHAR) 2073 #else 2074 if (f->filetype != FIO_TYPE_BLOCK) 2075 #endif 2076 continue; 2077 if (device_is_mounted(f->file_name)) 2078 goto mounted; 2079 } 2080 2081 return false; 2082 mounted: 2083 log_err("fio: %s appears mounted, and 'allow_mounted_write' isn't set. Aborting.\n", f->file_name); 2084 return true; 2085 } 2086 2087 static bool waitee_running(struct thread_data *me) 2088 { 2089 const char *waitee = me->o.wait_for; 2090 const char *self = me->o.name; 2091 struct thread_data *td; 2092 int i; 2093 2094 if (!waitee) 2095 return false; 2096 2097 for_each_td(td, i) { 2098 if (!strcmp(td->o.name, self) || strcmp(td->o.name, waitee)) 2099 continue; 2100 2101 if (td->runstate < TD_EXITED) { 2102 dprint(FD_PROCESS, "%s fenced by %s(%s)\n", 2103 self, td->o.name, 2104 runstate_to_name(td->runstate)); 2105 return true; 2106 } 2107 } 2108 2109 dprint(FD_PROCESS, "%s: %s completed, can run\n", self, waitee); 2110 return false; 2111 } 2112 2113 /* 2114 * Main function for kicking off and reaping jobs, as needed. 2115 */ 2116 static void run_threads(struct sk_out *sk_out) 2117 { 2118 struct thread_data *td; 2119 unsigned int i, todo, nr_running, nr_started; 2120 uint64_t m_rate, t_rate; 2121 uint64_t spent; 2122 2123 if (fio_gtod_offload && fio_start_gtod_thread()) 2124 return; 2125 2126 fio_idle_prof_init(); 2127 2128 set_sig_handlers(); 2129 2130 nr_thread = nr_process = 0; 2131 for_each_td(td, i) { 2132 if (check_mount_writes(td)) 2133 return; 2134 if (td->o.use_thread) 2135 nr_thread++; 2136 else 2137 nr_process++; 2138 } 2139 2140 if (output_format & FIO_OUTPUT_NORMAL) { 2141 log_info("Starting "); 2142 if (nr_thread) 2143 log_info("%d thread%s", nr_thread, 2144 nr_thread > 1 ? "s" : ""); 2145 if (nr_process) { 2146 if (nr_thread) 2147 log_info(" and "); 2148 log_info("%d process%s", nr_process, 2149 nr_process > 1 ? "es" : ""); 2150 } 2151 log_info("\n"); 2152 log_info_flush(); 2153 } 2154 2155 todo = thread_number; 2156 nr_running = 0; 2157 nr_started = 0; 2158 m_rate = t_rate = 0; 2159 2160 for_each_td(td, i) { 2161 print_status_init(td->thread_number - 1); 2162 2163 if (!td->o.create_serialize) 2164 continue; 2165 2166 if (fio_verify_load_state(td)) 2167 goto reap; 2168 2169 /* 2170 * do file setup here so it happens sequentially, 2171 * we don't want X number of threads getting their 2172 * client data interspersed on disk 2173 */ 2174 if (setup_files(td)) { 2175 reap: 2176 exit_value++; 2177 if (td->error) 2178 log_err("fio: pid=%d, err=%d/%s\n", 2179 (int) td->pid, td->error, td->verror); 2180 td_set_runstate(td, TD_REAPED); 2181 todo--; 2182 } else { 2183 struct fio_file *f; 2184 unsigned int j; 2185 2186 /* 2187 * for sharing to work, each job must always open 2188 * its own files. so close them, if we opened them 2189 * for creation 2190 */ 2191 for_each_file(td, f, j) { 2192 if (fio_file_open(f)) 2193 td_io_close_file(td, f); 2194 } 2195 } 2196 } 2197 2198 /* start idle threads before io threads start to run */ 2199 fio_idle_prof_start(); 2200 2201 set_genesis_time(); 2202 2203 while (todo) { 2204 struct thread_data *map[REAL_MAX_JOBS]; 2205 struct timeval this_start; 2206 int this_jobs = 0, left; 2207 struct fork_data *fd; 2208 2209 /* 2210 * create threads (TD_NOT_CREATED -> TD_CREATED) 2211 */ 2212 for_each_td(td, i) { 2213 if (td->runstate != TD_NOT_CREATED) 2214 continue; 2215 2216 /* 2217 * never got a chance to start, killed by other 2218 * thread for some reason 2219 */ 2220 if (td->terminate) { 2221 todo--; 2222 continue; 2223 } 2224 2225 if (td->o.start_delay) { 2226 spent = utime_since_genesis(); 2227 2228 if (td->o.start_delay > spent) 2229 continue; 2230 } 2231 2232 if (td->o.stonewall && (nr_started || nr_running)) { 2233 dprint(FD_PROCESS, "%s: stonewall wait\n", 2234 td->o.name); 2235 break; 2236 } 2237 2238 if (waitee_running(td)) { 2239 dprint(FD_PROCESS, "%s: waiting for %s\n", 2240 td->o.name, td->o.wait_for); 2241 continue; 2242 } 2243 2244 init_disk_util(td); 2245 2246 td->rusage_sem = fio_mutex_init(FIO_MUTEX_LOCKED); 2247 td->update_rusage = 0; 2248 2249 /* 2250 * Set state to created. Thread will transition 2251 * to TD_INITIALIZED when it's done setting up. 2252 */ 2253 td_set_runstate(td, TD_CREATED); 2254 map[this_jobs++] = td; 2255 nr_started++; 2256 2257 fd = calloc(1, sizeof(*fd)); 2258 fd->td = td; 2259 fd->sk_out = sk_out; 2260 2261 if (td->o.use_thread) { 2262 int ret; 2263 2264 dprint(FD_PROCESS, "will pthread_create\n"); 2265 ret = pthread_create(&td->thread, NULL, 2266 thread_main, fd); 2267 if (ret) { 2268 log_err("pthread_create: %s\n", 2269 strerror(ret)); 2270 free(fd); 2271 nr_started--; 2272 break; 2273 } 2274 ret = pthread_detach(td->thread); 2275 if (ret) 2276 log_err("pthread_detach: %s", 2277 strerror(ret)); 2278 } else { 2279 pid_t pid; 2280 dprint(FD_PROCESS, "will fork\n"); 2281 pid = fork(); 2282 if (!pid) { 2283 int ret; 2284 2285 ret = (int)(uintptr_t)thread_main(fd); 2286 _exit(ret); 2287 } else if (i == fio_debug_jobno) 2288 *fio_debug_jobp = pid; 2289 } 2290 dprint(FD_MUTEX, "wait on startup_mutex\n"); 2291 if (fio_mutex_down_timeout(startup_mutex, 10000)) { 2292 log_err("fio: job startup hung? exiting.\n"); 2293 fio_terminate_threads(TERMINATE_ALL); 2294 fio_abort = 1; 2295 nr_started--; 2296 break; 2297 } 2298 dprint(FD_MUTEX, "done waiting on startup_mutex\n"); 2299 } 2300 2301 /* 2302 * Wait for the started threads to transition to 2303 * TD_INITIALIZED. 2304 */ 2305 fio_gettime(&this_start, NULL); 2306 left = this_jobs; 2307 while (left && !fio_abort) { 2308 if (mtime_since_now(&this_start) > JOB_START_TIMEOUT) 2309 break; 2310 2311 do_usleep(100000); 2312 2313 for (i = 0; i < this_jobs; i++) { 2314 td = map[i]; 2315 if (!td) 2316 continue; 2317 if (td->runstate == TD_INITIALIZED) { 2318 map[i] = NULL; 2319 left--; 2320 } else if (td->runstate >= TD_EXITED) { 2321 map[i] = NULL; 2322 left--; 2323 todo--; 2324 nr_running++; /* work-around... */ 2325 } 2326 } 2327 } 2328 2329 if (left) { 2330 log_err("fio: %d job%s failed to start\n", left, 2331 left > 1 ? "s" : ""); 2332 for (i = 0; i < this_jobs; i++) { 2333 td = map[i]; 2334 if (!td) 2335 continue; 2336 kill(td->pid, SIGTERM); 2337 } 2338 break; 2339 } 2340 2341 /* 2342 * start created threads (TD_INITIALIZED -> TD_RUNNING). 2343 */ 2344 for_each_td(td, i) { 2345 if (td->runstate != TD_INITIALIZED) 2346 continue; 2347 2348 if (in_ramp_time(td)) 2349 td_set_runstate(td, TD_RAMP); 2350 else 2351 td_set_runstate(td, TD_RUNNING); 2352 nr_running++; 2353 nr_started--; 2354 m_rate += ddir_rw_sum(td->o.ratemin); 2355 t_rate += ddir_rw_sum(td->o.rate); 2356 todo--; 2357 fio_mutex_up(td->mutex); 2358 } 2359 2360 reap_threads(&nr_running, &t_rate, &m_rate); 2361 2362 if (todo) 2363 do_usleep(100000); 2364 } 2365 2366 while (nr_running) { 2367 reap_threads(&nr_running, &t_rate, &m_rate); 2368 do_usleep(10000); 2369 } 2370 2371 fio_idle_prof_stop(); 2372 2373 update_io_ticks(); 2374 } 2375 2376 static void free_disk_util(void) 2377 { 2378 disk_util_prune_entries(); 2379 helper_thread_destroy(); 2380 } 2381 2382 int fio_backend(struct sk_out *sk_out) 2383 { 2384 struct thread_data *td; 2385 int i; 2386 2387 if (exec_profile) { 2388 if (load_profile(exec_profile)) 2389 return 1; 2390 free(exec_profile); 2391 exec_profile = NULL; 2392 } 2393 if (!thread_number) 2394 return 0; 2395 2396 if (write_bw_log) { 2397 struct log_params p = { 2398 .log_type = IO_LOG_TYPE_BW, 2399 }; 2400 2401 setup_log(&agg_io_log[DDIR_READ], &p, "agg-read_bw.log"); 2402 setup_log(&agg_io_log[DDIR_WRITE], &p, "agg-write_bw.log"); 2403 setup_log(&agg_io_log[DDIR_TRIM], &p, "agg-trim_bw.log"); 2404 } 2405 2406 startup_mutex = fio_mutex_init(FIO_MUTEX_LOCKED); 2407 if (startup_mutex == NULL) 2408 return 1; 2409 2410 set_genesis_time(); 2411 stat_init(); 2412 helper_thread_create(startup_mutex, sk_out); 2413 2414 cgroup_list = smalloc(sizeof(*cgroup_list)); 2415 INIT_FLIST_HEAD(cgroup_list); 2416 2417 run_threads(sk_out); 2418 2419 helper_thread_exit(); 2420 2421 if (!fio_abort) { 2422 __show_run_stats(); 2423 if (write_bw_log) { 2424 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 2425 struct io_log *log = agg_io_log[i]; 2426 2427 flush_log(log, false); 2428 free_log(log); 2429 } 2430 } 2431 } 2432 2433 for_each_td(td, i) { 2434 if (td->ss.dur) { 2435 if (td->ss.iops_data != NULL) { 2436 free(td->ss.iops_data); 2437 free(td->ss.bw_data); 2438 } 2439 } 2440 fio_options_free(td); 2441 if (td->rusage_sem) { 2442 fio_mutex_remove(td->rusage_sem); 2443 td->rusage_sem = NULL; 2444 } 2445 fio_mutex_remove(td->mutex); 2446 td->mutex = NULL; 2447 } 2448 2449 free_disk_util(); 2450 cgroup_kill(cgroup_list); 2451 sfree(cgroup_list); 2452 sfree(cgroup_mnt); 2453 2454 fio_mutex_remove(startup_mutex); 2455 stat_exit(); 2456 return exit_value; 2457 } 2458