1 /* 2 * Status and ETA code 3 */ 4 #include <unistd.h> 5 #include <fcntl.h> 6 #include <string.h> 7 8 #include "fio.h" 9 10 static char __run_str[REAL_MAX_JOBS + 1]; 11 static char run_str[__THREAD_RUNSTR_SZ(REAL_MAX_JOBS)]; 12 13 static void update_condensed_str(char *rstr, char *run_str_condensed) 14 { 15 if (*rstr) { 16 while (*rstr) { 17 int nr = 1; 18 19 *run_str_condensed++ = *rstr++; 20 while (*(rstr - 1) == *rstr) { 21 rstr++; 22 nr++; 23 } 24 run_str_condensed += sprintf(run_str_condensed, "(%u),", nr); 25 } 26 run_str_condensed--; 27 } 28 *run_str_condensed = '\0'; 29 } 30 31 /* 32 * Sets the status of the 'td' in the printed status map. 33 */ 34 static void check_str_update(struct thread_data *td) 35 { 36 char c = __run_str[td->thread_number - 1]; 37 38 switch (td->runstate) { 39 case TD_REAPED: 40 if (td->error) 41 c = 'X'; 42 else if (td->sig) 43 c = 'K'; 44 else 45 c = '_'; 46 break; 47 case TD_EXITED: 48 c = 'E'; 49 break; 50 case TD_RAMP: 51 c = '/'; 52 break; 53 case TD_RUNNING: 54 if (td_rw(td)) { 55 if (td_random(td)) { 56 if (td->o.rwmix[DDIR_READ] == 100) 57 c = 'r'; 58 else if (td->o.rwmix[DDIR_WRITE] == 100) 59 c = 'w'; 60 else 61 c = 'm'; 62 } else { 63 if (td->o.rwmix[DDIR_READ] == 100) 64 c = 'R'; 65 else if (td->o.rwmix[DDIR_WRITE] == 100) 66 c = 'W'; 67 else 68 c = 'M'; 69 } 70 } else if (td_read(td)) { 71 if (td_random(td)) 72 c = 'r'; 73 else 74 c = 'R'; 75 } else if (td_write(td)) { 76 if (td_random(td)) 77 c = 'w'; 78 else 79 c = 'W'; 80 } else { 81 if (td_random(td)) 82 c = 'd'; 83 else 84 c = 'D'; 85 } 86 break; 87 case TD_PRE_READING: 88 c = 'p'; 89 break; 90 case TD_VERIFYING: 91 c = 'V'; 92 break; 93 case TD_FSYNCING: 94 c = 'F'; 95 break; 96 case TD_FINISHING: 97 c = 'f'; 98 break; 99 case TD_CREATED: 100 c = 'C'; 101 break; 102 case TD_INITIALIZED: 103 case TD_SETTING_UP: 104 c = 'I'; 105 break; 106 case TD_NOT_CREATED: 107 c = 'P'; 108 break; 109 default: 110 log_err("state %d\n", td->runstate); 111 } 112 113 __run_str[td->thread_number - 1] = c; 114 update_condensed_str(__run_str, run_str); 115 } 116 117 /* 118 * Convert seconds to a printable string. 119 */ 120 void eta_to_str(char *str, unsigned long eta_sec) 121 { 122 unsigned int d, h, m, s; 123 int disp_hour = 0; 124 125 s = eta_sec % 60; 126 eta_sec /= 60; 127 m = eta_sec % 60; 128 eta_sec /= 60; 129 h = eta_sec % 24; 130 eta_sec /= 24; 131 d = eta_sec; 132 133 if (d) { 134 disp_hour = 1; 135 str += sprintf(str, "%02ud:", d); 136 } 137 138 if (h || disp_hour) 139 str += sprintf(str, "%02uh:", h); 140 141 str += sprintf(str, "%02um:", m); 142 str += sprintf(str, "%02us", s); 143 } 144 145 /* 146 * Best effort calculation of the estimated pending runtime of a job. 147 */ 148 static int thread_eta(struct thread_data *td) 149 { 150 unsigned long long bytes_total, bytes_done; 151 unsigned long eta_sec = 0; 152 unsigned long elapsed; 153 uint64_t timeout; 154 155 elapsed = (mtime_since_now(&td->epoch) + 999) / 1000; 156 timeout = td->o.timeout / 1000000UL; 157 158 bytes_total = td->total_io_size; 159 160 if (td->o.fill_device && td->o.size == -1ULL) { 161 if (!td->fill_device_size || td->fill_device_size == -1ULL) 162 return 0; 163 164 bytes_total = td->fill_device_size; 165 } 166 167 if (td->o.zone_size && td->o.zone_skip && bytes_total) { 168 unsigned int nr_zones; 169 uint64_t zone_bytes; 170 171 zone_bytes = bytes_total + td->o.zone_size + td->o.zone_skip; 172 nr_zones = (zone_bytes - 1) / (td->o.zone_size + td->o.zone_skip); 173 bytes_total -= nr_zones * td->o.zone_skip; 174 } 175 176 /* 177 * if writing and verifying afterwards, bytes_total will be twice the 178 * size. In a mixed workload, verify phase will be the size of the 179 * first stage writes. 180 */ 181 if (td->o.do_verify && td->o.verify && td_write(td)) { 182 if (td_rw(td)) { 183 unsigned int perc = 50; 184 185 if (td->o.rwmix[DDIR_WRITE]) 186 perc = td->o.rwmix[DDIR_WRITE]; 187 188 bytes_total += (bytes_total * perc) / 100; 189 } else 190 bytes_total <<= 1; 191 } 192 193 if (td->runstate == TD_RUNNING || td->runstate == TD_VERIFYING) { 194 double perc, perc_t; 195 196 bytes_done = ddir_rw_sum(td->io_bytes); 197 198 if (bytes_total) { 199 perc = (double) bytes_done / (double) bytes_total; 200 if (perc > 1.0) 201 perc = 1.0; 202 } else 203 perc = 0.0; 204 205 if (td->o.time_based) { 206 if (timeout) { 207 perc_t = (double) elapsed / (double) timeout; 208 if (perc_t < perc) 209 perc = perc_t; 210 } else { 211 /* 212 * Will never hit, we can't have time_based 213 * without a timeout set. 214 */ 215 perc = 0.0; 216 } 217 } 218 219 eta_sec = (unsigned long) (elapsed * (1.0 / perc)) - elapsed; 220 221 if (td->o.timeout && 222 eta_sec > (timeout + done_secs - elapsed)) 223 eta_sec = timeout + done_secs - elapsed; 224 } else if (td->runstate == TD_NOT_CREATED || td->runstate == TD_CREATED 225 || td->runstate == TD_INITIALIZED 226 || td->runstate == TD_SETTING_UP 227 || td->runstate == TD_RAMP 228 || td->runstate == TD_PRE_READING) { 229 int t_eta = 0, r_eta = 0; 230 unsigned long long rate_bytes; 231 232 /* 233 * We can only guess - assume it'll run the full timeout 234 * if given, otherwise assume it'll run at the specified rate. 235 */ 236 if (td->o.timeout) { 237 uint64_t __timeout = td->o.timeout; 238 uint64_t start_delay = td->o.start_delay; 239 uint64_t ramp_time = td->o.ramp_time; 240 241 t_eta = __timeout + start_delay + ramp_time; 242 t_eta /= 1000000ULL; 243 244 if (in_ramp_time(td)) { 245 unsigned long ramp_left; 246 247 ramp_left = mtime_since_now(&td->epoch); 248 ramp_left = (ramp_left + 999) / 1000; 249 if (ramp_left <= t_eta) 250 t_eta -= ramp_left; 251 } 252 } 253 rate_bytes = ddir_rw_sum(td->o.rate); 254 if (rate_bytes) { 255 r_eta = (bytes_total / 1024) / rate_bytes; 256 r_eta += (td->o.start_delay / 1000000ULL); 257 } 258 259 if (r_eta && t_eta) 260 eta_sec = min(r_eta, t_eta); 261 else if (r_eta) 262 eta_sec = r_eta; 263 else if (t_eta) 264 eta_sec = t_eta; 265 else 266 eta_sec = 0; 267 } else { 268 /* 269 * thread is already done or waiting for fsync 270 */ 271 eta_sec = 0; 272 } 273 274 return eta_sec; 275 } 276 277 static void calc_rate(int unified_rw_rep, unsigned long mtime, 278 unsigned long long *io_bytes, 279 unsigned long long *prev_io_bytes, unsigned int *rate) 280 { 281 int i; 282 283 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 284 unsigned long long diff; 285 286 diff = io_bytes[i] - prev_io_bytes[i]; 287 if (unified_rw_rep) { 288 rate[i] = 0; 289 rate[0] += ((1000 * diff) / mtime) / 1024; 290 } else 291 rate[i] = ((1000 * diff) / mtime) / 1024; 292 293 prev_io_bytes[i] = io_bytes[i]; 294 } 295 } 296 297 static void calc_iops(int unified_rw_rep, unsigned long mtime, 298 unsigned long long *io_iops, 299 unsigned long long *prev_io_iops, unsigned int *iops) 300 { 301 int i; 302 303 for (i = 0; i < DDIR_RWDIR_CNT; i++) { 304 unsigned long long diff; 305 306 diff = io_iops[i] - prev_io_iops[i]; 307 if (unified_rw_rep) { 308 iops[i] = 0; 309 iops[0] += (diff * 1000) / mtime; 310 } else 311 iops[i] = (diff * 1000) / mtime; 312 313 prev_io_iops[i] = io_iops[i]; 314 } 315 } 316 317 /* 318 * Print status of the jobs we know about. This includes rate estimates, 319 * ETA, thread state, etc. 320 */ 321 int calc_thread_status(struct jobs_eta *je, int force) 322 { 323 struct thread_data *td; 324 int i, unified_rw_rep; 325 unsigned long rate_time, disp_time, bw_avg_time, *eta_secs; 326 unsigned long long io_bytes[DDIR_RWDIR_CNT]; 327 unsigned long long io_iops[DDIR_RWDIR_CNT]; 328 struct timeval now; 329 330 static unsigned long long rate_io_bytes[DDIR_RWDIR_CNT]; 331 static unsigned long long disp_io_bytes[DDIR_RWDIR_CNT]; 332 static unsigned long long disp_io_iops[DDIR_RWDIR_CNT]; 333 static struct timeval rate_prev_time, disp_prev_time; 334 335 if (!force) { 336 if (output_format != FIO_OUTPUT_NORMAL && 337 f_out == stdout) 338 return 0; 339 if (temp_stall_ts || eta_print == FIO_ETA_NEVER) 340 return 0; 341 342 if (!isatty(STDOUT_FILENO) && (eta_print != FIO_ETA_ALWAYS)) 343 return 0; 344 } 345 346 if (!ddir_rw_sum(rate_io_bytes)) 347 fill_start_time(&rate_prev_time); 348 if (!ddir_rw_sum(disp_io_bytes)) 349 fill_start_time(&disp_prev_time); 350 351 eta_secs = malloc(thread_number * sizeof(unsigned long)); 352 memset(eta_secs, 0, thread_number * sizeof(unsigned long)); 353 354 je->elapsed_sec = (mtime_since_genesis() + 999) / 1000; 355 356 io_bytes[DDIR_READ] = io_bytes[DDIR_WRITE] = io_bytes[DDIR_TRIM] = 0; 357 io_iops[DDIR_READ] = io_iops[DDIR_WRITE] = io_iops[DDIR_TRIM] = 0; 358 bw_avg_time = ULONG_MAX; 359 unified_rw_rep = 0; 360 for_each_td(td, i) { 361 unified_rw_rep += td->o.unified_rw_rep; 362 if (is_power_of_2(td->o.kb_base)) 363 je->is_pow2 = 1; 364 je->unit_base = td->o.unit_base; 365 if (td->o.bw_avg_time < bw_avg_time) 366 bw_avg_time = td->o.bw_avg_time; 367 if (td->runstate == TD_RUNNING || td->runstate == TD_VERIFYING 368 || td->runstate == TD_FSYNCING 369 || td->runstate == TD_PRE_READING 370 || td->runstate == TD_FINISHING) { 371 je->nr_running++; 372 if (td_read(td)) { 373 je->t_rate[0] += td->o.rate[DDIR_READ]; 374 je->t_iops[0] += td->o.rate_iops[DDIR_READ]; 375 je->m_rate[0] += td->o.ratemin[DDIR_READ]; 376 je->m_iops[0] += td->o.rate_iops_min[DDIR_READ]; 377 } 378 if (td_write(td)) { 379 je->t_rate[1] += td->o.rate[DDIR_WRITE]; 380 je->t_iops[1] += td->o.rate_iops[DDIR_WRITE]; 381 je->m_rate[1] += td->o.ratemin[DDIR_WRITE]; 382 je->m_iops[1] += td->o.rate_iops_min[DDIR_WRITE]; 383 } 384 if (td_trim(td)) { 385 je->t_rate[2] += td->o.rate[DDIR_TRIM]; 386 je->t_iops[2] += td->o.rate_iops[DDIR_TRIM]; 387 je->m_rate[2] += td->o.ratemin[DDIR_TRIM]; 388 je->m_iops[2] += td->o.rate_iops_min[DDIR_TRIM]; 389 } 390 391 je->files_open += td->nr_open_files; 392 } else if (td->runstate == TD_RAMP) { 393 je->nr_running++; 394 je->nr_ramp++; 395 } else if (td->runstate == TD_SETTING_UP) 396 je->nr_setting_up++; 397 else if (td->runstate < TD_RUNNING) 398 je->nr_pending++; 399 400 if (je->elapsed_sec >= 3) 401 eta_secs[i] = thread_eta(td); 402 else 403 eta_secs[i] = INT_MAX; 404 405 check_str_update(td); 406 407 if (td->runstate > TD_SETTING_UP) { 408 int ddir; 409 410 for (ddir = DDIR_READ; ddir < DDIR_RWDIR_CNT; ddir++) { 411 if (unified_rw_rep) { 412 io_bytes[0] += td->io_bytes[ddir]; 413 io_iops[0] += td->io_blocks[ddir]; 414 } else { 415 io_bytes[ddir] += td->io_bytes[ddir]; 416 io_iops[ddir] += td->io_blocks[ddir]; 417 } 418 } 419 } 420 } 421 422 if (exitall_on_terminate) 423 je->eta_sec = INT_MAX; 424 else 425 je->eta_sec = 0; 426 427 for_each_td(td, i) { 428 if (exitall_on_terminate) { 429 if (eta_secs[i] < je->eta_sec) 430 je->eta_sec = eta_secs[i]; 431 } else { 432 if (eta_secs[i] > je->eta_sec) 433 je->eta_sec = eta_secs[i]; 434 } 435 } 436 437 free(eta_secs); 438 439 fio_gettime(&now, NULL); 440 rate_time = mtime_since(&rate_prev_time, &now); 441 442 if (write_bw_log && rate_time > bw_avg_time && !in_ramp_time(td)) { 443 calc_rate(unified_rw_rep, rate_time, io_bytes, rate_io_bytes, 444 je->rate); 445 memcpy(&rate_prev_time, &now, sizeof(now)); 446 add_agg_sample(je->rate[DDIR_READ], DDIR_READ, 0); 447 add_agg_sample(je->rate[DDIR_WRITE], DDIR_WRITE, 0); 448 add_agg_sample(je->rate[DDIR_TRIM], DDIR_TRIM, 0); 449 } 450 451 disp_time = mtime_since(&disp_prev_time, &now); 452 453 /* 454 * Allow a little slack, the target is to print it every 1000 msecs 455 */ 456 if (!force && disp_time < 900) 457 return 0; 458 459 calc_rate(unified_rw_rep, disp_time, io_bytes, disp_io_bytes, je->rate); 460 calc_iops(unified_rw_rep, disp_time, io_iops, disp_io_iops, je->iops); 461 462 memcpy(&disp_prev_time, &now, sizeof(now)); 463 464 if (!force && !je->nr_running && !je->nr_pending) 465 return 0; 466 467 je->nr_threads = thread_number; 468 update_condensed_str(__run_str, run_str); 469 memcpy(je->run_str, run_str, strlen(run_str)); 470 return 1; 471 } 472 473 void display_thread_status(struct jobs_eta *je) 474 { 475 static struct timeval disp_eta_new_line; 476 static int eta_new_line_init, eta_new_line_pending; 477 static int linelen_last; 478 static int eta_good; 479 char output[REAL_MAX_JOBS + 512], *p = output; 480 char eta_str[128]; 481 double perc = 0.0; 482 483 if (je->eta_sec != INT_MAX && je->elapsed_sec) { 484 perc = (double) je->elapsed_sec / (double) (je->elapsed_sec + je->eta_sec); 485 eta_to_str(eta_str, je->eta_sec); 486 } 487 488 if (eta_new_line_pending) { 489 eta_new_line_pending = 0; 490 p += sprintf(p, "\n"); 491 } 492 493 p += sprintf(p, "Jobs: %d (f=%d)", je->nr_running, je->files_open); 494 if (je->m_rate[0] || je->m_rate[1] || je->t_rate[0] || je->t_rate[1]) { 495 char *tr, *mr; 496 497 mr = num2str(je->m_rate[0] + je->m_rate[1], 4, 0, je->is_pow2, 8); 498 tr = num2str(je->t_rate[0] + je->t_rate[1], 4, 0, je->is_pow2, 8); 499 p += sprintf(p, ", CR=%s/%s KB/s", tr, mr); 500 free(tr); 501 free(mr); 502 } else if (je->m_iops[0] || je->m_iops[1] || je->t_iops[0] || je->t_iops[1]) { 503 p += sprintf(p, ", CR=%d/%d IOPS", 504 je->t_iops[0] + je->t_iops[1], 505 je->m_iops[0] + je->m_iops[1]); 506 } 507 if (je->eta_sec != INT_MAX && je->nr_running) { 508 char perc_str[32]; 509 char *iops_str[DDIR_RWDIR_CNT]; 510 char *rate_str[DDIR_RWDIR_CNT]; 511 size_t left; 512 int l; 513 int ddir; 514 515 if ((!je->eta_sec && !eta_good) || je->nr_ramp == je->nr_running) 516 strcpy(perc_str, "-.-% done"); 517 else { 518 double mult = 100.0; 519 520 if (je->nr_setting_up && je->nr_running) 521 mult *= (1.0 - (double) je->nr_setting_up / (double) je->nr_running); 522 523 eta_good = 1; 524 perc *= mult; 525 sprintf(perc_str, "%3.1f%% done", perc); 526 } 527 528 for (ddir = DDIR_READ; ddir < DDIR_RWDIR_CNT; ddir++) { 529 rate_str[ddir] = num2str(je->rate[ddir], 5, 530 1024, je->is_pow2, je->unit_base); 531 iops_str[ddir] = num2str(je->iops[ddir], 4, 1, 0, 0); 532 } 533 534 left = sizeof(output) - (p - output) - 1; 535 536 l = snprintf(p, left, ": [%s] [%s] [%s/%s/%s /s] [%s/%s/%s iops] [eta %s]", 537 je->run_str, perc_str, rate_str[DDIR_READ], 538 rate_str[DDIR_WRITE], rate_str[DDIR_TRIM], 539 iops_str[DDIR_READ], iops_str[DDIR_WRITE], 540 iops_str[DDIR_TRIM], eta_str); 541 p += l; 542 if (l >= 0 && l < linelen_last) 543 p += sprintf(p, "%*s", linelen_last - l, ""); 544 linelen_last = l; 545 546 for (ddir = DDIR_READ; ddir < DDIR_RWDIR_CNT; ddir++) { 547 free(rate_str[ddir]); 548 free(iops_str[ddir]); 549 } 550 } 551 p += sprintf(p, "\r"); 552 553 printf("%s", output); 554 555 if (!eta_new_line_init) { 556 fio_gettime(&disp_eta_new_line, NULL); 557 eta_new_line_init = 1; 558 } else if (eta_new_line && mtime_since_now(&disp_eta_new_line) > eta_new_line) { 559 fio_gettime(&disp_eta_new_line, NULL); 560 eta_new_line_pending = 1; 561 } 562 563 fflush(stdout); 564 } 565 566 struct jobs_eta *get_jobs_eta(int force, size_t *size) 567 { 568 struct jobs_eta *je; 569 570 if (!thread_number) 571 return NULL; 572 573 *size = sizeof(*je) + THREAD_RUNSTR_SZ; 574 je = malloc(*size); 575 if (!je) 576 return NULL; 577 memset(je, 0, *size); 578 579 if (!calc_thread_status(je, force)) { 580 free(je); 581 return NULL; 582 } 583 584 *size = sizeof(*je) + strlen((char *) je->run_str) + 1; 585 return je; 586 } 587 588 void print_thread_status(void) 589 { 590 struct jobs_eta *je; 591 size_t size; 592 593 je = get_jobs_eta(0, &size); 594 if (je) 595 display_thread_status(je); 596 597 free(je); 598 } 599 600 void print_status_init(int thr_number) 601 { 602 __run_str[thr_number] = 'P'; 603 update_condensed_str(__run_str, run_str); 604 } 605