1 /* 2 * Copyright (c) 2004 SuSE, Inc. All Rights Reserved. 3 * 4 * This program is free software; you can redistribute it and/or modify it 5 * under the terms of version 2 of the GNU General Public License as 6 * published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it would be useful, but 9 * WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 11 * 12 * Further, this software is distributed without any warranty that it is 13 * free of the rightful claim of any third person regarding infringement 14 * or the like. Any license provided herein, whether implied or 15 * otherwise, applies only to this software file. Patent licenses, if 16 * any, provided herein do not apply to combinations of this program with 17 * other software, or any other product whatsoever. 18 * 19 * You should have received a copy of the GNU General Public License along 20 * with this program; if not, write the Free Software Foundation, Inc., 21 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 22 * 23 * Contact information: Silicon Graphics, Inc., 1600 Amphitheatre Pkwy, 24 * Mountain View, CA 94043, or: 25 * 26 * 27 * aio-stress 28 * 29 * will open or create each file on the command line, and start a series 30 * of aio to it. 31 * 32 * aio is done in a rotating loop. first file1 gets 8 requests, then 33 * file2, then file3 etc. As each file finishes writing, it is switched 34 * to reads 35 * 36 * io buffers are aligned in case you want to do raw io 37 * 38 * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c 39 * 40 * run aio-stress -h to see the options 41 * 42 * Please mail Chris Mason (mason (at) suse.com) with bug reports or patches 43 */ 44 #define _FILE_OFFSET_BITS 64 45 #define PROG_VERSION "0.21" 46 #define NEW_GETEVENTS 47 48 #define _GNU_SOURCE 49 #include <stdio.h> 50 #include <errno.h> 51 #include <assert.h> 52 #include <stdlib.h> 53 #include <sys/types.h> 54 #include <sys/stat.h> 55 #include <fcntl.h> 56 #include <unistd.h> 57 #include <sys/time.h> 58 #include <sys/ipc.h> 59 #include <sys/shm.h> 60 #include <sys/mman.h> 61 #include <string.h> 62 #include <pthread.h> 63 64 #include "config.h" 65 #include "tst_res_flags.h" 66 67 #ifdef HAVE_LIBAIO 68 #include <libaio.h> 69 70 #define IO_FREE 0 71 #define IO_PENDING 1 72 #define RUN_FOREVER -1 73 74 enum { 75 WRITE, 76 READ, 77 RWRITE, 78 RREAD, 79 LAST_STAGE, 80 }; 81 82 #define USE_MALLOC 0 83 #define USE_SHM 1 84 #define USE_SHMFS 2 85 86 /* 87 * various globals, these are effectively read only by the time the threads 88 * are started 89 */ 90 long stages = 0; 91 unsigned long page_size_mask; 92 int o_direct = 0; 93 int o_sync = 0; 94 int latency_stats = 0; 95 int completion_latency_stats = 0; 96 int io_iter = 8; 97 int iterations = RUN_FOREVER; 98 int max_io_submit = 0; 99 long rec_len = 64 * 1024; 100 int depth = 64; 101 int num_threads = 1; 102 int num_contexts = 1; 103 off_t context_offset = 2 * 1024 * 1024; 104 int fsync_stages = 1; 105 int use_shm = 0; 106 int shm_id; 107 char *unaligned_buffer = NULL; 108 char *aligned_buffer = NULL; 109 int padded_reclen = 0; 110 int stonewall = 1; 111 int verify = 0; 112 char *verify_buf = NULL; 113 int unlink_files = 0; 114 115 struct io_unit; 116 struct thread_info; 117 118 /* pthread mutexes and other globals for keeping the threads in sync */ 119 pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER; 120 pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER; 121 int threads_ending = 0; 122 int threads_starting = 0; 123 struct timeval global_stage_start_time; 124 struct thread_info *global_thread_info; 125 126 /* 127 * latencies during io_submit are measured, these are the 128 * granularities for deviations 129 */ 130 #define DEVIATIONS 6 131 int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 }; 132 133 struct io_latency { 134 double max; 135 double min; 136 double total_io; 137 double total_lat; 138 double deviations[DEVIATIONS]; 139 }; 140 141 /* container for a series of operations to a file */ 142 struct io_oper { 143 /* already open file descriptor, valid for whatever operation you want */ 144 int fd; 145 146 /* starting byte of the operation */ 147 off_t start; 148 149 /* ending byte of the operation */ 150 off_t end; 151 152 /* size of the read/write buffer */ 153 int reclen; 154 155 /* max number of pending requests before a wait is triggered */ 156 int depth; 157 158 /* current number of pending requests */ 159 int num_pending; 160 161 /* last error, zero if there were none */ 162 int last_err; 163 164 /* total number of errors hit. */ 165 int num_err; 166 167 /* read,write, random, etc */ 168 int rw; 169 170 /* number of I/O that will get sent to aio */ 171 int total_ios; 172 173 /* number of I/O we've already sent */ 174 int started_ios; 175 176 /* last offset used in an io operation */ 177 off_t last_offset; 178 179 /* stonewalled = 1 when we got cut off before submitting all our I/O */ 180 int stonewalled; 181 182 /* list management */ 183 struct io_oper *next; 184 struct io_oper *prev; 185 186 struct timeval start_time; 187 188 char *file_name; 189 }; 190 191 /* a single io, and all the tracking needed for it */ 192 struct io_unit { 193 /* note, iocb must go first! */ 194 struct iocb iocb; 195 196 /* pointer to parent io operation struct */ 197 struct io_oper *io_oper; 198 199 /* aligned buffer */ 200 char *buf; 201 202 /* size of the aligned buffer (record size) */ 203 int buf_size; 204 205 /* state of this io unit (free, pending, done) */ 206 int busy; 207 208 /* result of last operation */ 209 long res; 210 211 struct io_unit *next; 212 213 struct timeval io_start_time; /* time of io_submit */ 214 }; 215 216 struct thread_info { 217 io_context_t io_ctx; 218 pthread_t tid; 219 220 /* allocated array of io_unit structs */ 221 struct io_unit *ios; 222 223 /* list of io units available for io */ 224 struct io_unit *free_ious; 225 226 /* number of io units in the I/O array */ 227 int num_global_ios; 228 229 /* number of io units in flight */ 230 int num_global_pending; 231 232 /* preallocated array of iocb pointers, only used in run_active */ 233 struct iocb **iocbs; 234 235 /* preallocated array of events */ 236 struct io_event *events; 237 238 /* size of the events array */ 239 int num_global_events; 240 241 /* latency stats for io_submit */ 242 struct io_latency io_submit_latency; 243 244 /* list of operations still in progress, and of those finished */ 245 struct io_oper *active_opers; 246 struct io_oper *finished_opers; 247 248 /* number of files this thread is doing io on */ 249 int num_files; 250 251 /* how much io this thread did in the last stage */ 252 double stage_mb_trans; 253 254 /* latency completion stats i/o time from io_submit until io_getevents */ 255 struct io_latency io_completion_latency; 256 }; 257 258 /* 259 * return seconds between start_tv and stop_tv in double precision 260 */ 261 static double time_since(struct timeval *start_tv, struct timeval *stop_tv) 262 { 263 double sec, usec; 264 double ret; 265 sec = stop_tv->tv_sec - start_tv->tv_sec; 266 usec = stop_tv->tv_usec - start_tv->tv_usec; 267 if (sec > 0 && usec < 0) { 268 sec--; 269 usec += 1000000; 270 } 271 ret = sec + usec / (double)1000000; 272 if (ret < 0) 273 ret = 0; 274 return ret; 275 } 276 277 /* 278 * return seconds between start_tv and now in double precision 279 */ 280 static double time_since_now(struct timeval *start_tv) 281 { 282 struct timeval stop_time; 283 gettimeofday(&stop_time, NULL); 284 return time_since(start_tv, &stop_time); 285 } 286 287 /* 288 * Add latency info to latency struct 289 */ 290 static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv, 291 struct io_latency *lat) 292 { 293 double delta; 294 int i; 295 delta = time_since(start_tv, stop_tv); 296 delta = delta * 1000; 297 298 if (delta > lat->max) 299 lat->max = delta; 300 if (!lat->min || delta < lat->min) 301 lat->min = delta; 302 lat->total_io++; 303 lat->total_lat += delta; 304 for (i = 0; i < DEVIATIONS; i++) { 305 if (delta < deviations[i]) { 306 lat->deviations[i]++; 307 break; 308 } 309 } 310 } 311 312 static void oper_list_add(struct io_oper *oper, struct io_oper **list) 313 { 314 if (!*list) { 315 *list = oper; 316 oper->prev = oper->next = oper; 317 return; 318 } 319 oper->prev = (*list)->prev; 320 oper->next = *list; 321 (*list)->prev->next = oper; 322 (*list)->prev = oper; 323 return; 324 } 325 326 static void oper_list_del(struct io_oper *oper, struct io_oper **list) 327 { 328 if ((*list)->next == (*list)->prev && *list == (*list)->next) { 329 *list = NULL; 330 return; 331 } 332 oper->prev->next = oper->next; 333 oper->next->prev = oper->prev; 334 if (*list == oper) 335 *list = oper->next; 336 } 337 338 /* worker func to check error fields in the io unit */ 339 static int check_finished_io(struct io_unit *io) 340 { 341 int i; 342 if (io->res != io->buf_size) { 343 344 struct stat s; 345 fstat(io->io_oper->fd, &s); 346 347 /* 348 * If file size is large enough for the read, then this short 349 * read is an error. 350 */ 351 if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) && 352 s.st_size > (io->iocb.u.c.offset + io->res)) { 353 354 fprintf(stderr, 355 "io err %lu (%s) op %d, off %Lu size %d\n", 356 io->res, strerror(-io->res), 357 io->iocb.aio_lio_opcode, io->iocb.u.c.offset, 358 io->buf_size); 359 io->io_oper->last_err = io->res; 360 io->io_oper->num_err++; 361 return -1; 362 } 363 } 364 if (verify && io->io_oper->rw == READ) { 365 if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) { 366 fprintf(stderr, 367 "verify error, file %s offset %Lu contents (offset:bad:good):\n", 368 io->io_oper->file_name, io->iocb.u.c.offset); 369 370 for (i = 0; i < io->io_oper->reclen; i++) { 371 if (io->buf[i] != verify_buf[i]) { 372 fprintf(stderr, "%d:%c:%c ", i, 373 io->buf[i], verify_buf[i]); 374 } 375 } 376 fprintf(stderr, "\n"); 377 } 378 379 } 380 return 0; 381 } 382 383 /* worker func to check the busy bits and get an io unit ready for use */ 384 static int grab_iou(struct io_unit *io, struct io_oper *oper) 385 { 386 if (io->busy == IO_PENDING) 387 return -1; 388 389 io->busy = IO_PENDING; 390 io->res = 0; 391 io->io_oper = oper; 392 return 0; 393 } 394 395 char *stage_name(int rw) 396 { 397 switch (rw) { 398 case WRITE: 399 return "write"; 400 case READ: 401 return "read"; 402 case RWRITE: 403 return "random write"; 404 case RREAD: 405 return "random read"; 406 } 407 return "unknown"; 408 } 409 410 static inline double oper_mb_trans(struct io_oper *oper) 411 { 412 return ((double)oper->started_ios * (double)oper->reclen) / 413 (double)(1024 * 1024); 414 } 415 416 static void print_time(struct io_oper *oper) 417 { 418 double runtime; 419 double tput; 420 double mb; 421 422 runtime = time_since_now(&oper->start_time); 423 mb = oper_mb_trans(oper); 424 tput = mb / runtime; 425 fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n", 426 stage_name(oper->rw), oper->file_name, tput, mb, runtime); 427 } 428 429 static void print_lat(char *str, struct io_latency *lat) 430 { 431 double avg = lat->total_lat / lat->total_io; 432 int i; 433 double total_counted = 0; 434 fprintf(stderr, "%s min %.2f avg %.2f max %.2f\n\t", 435 str, lat->min, avg, lat->max); 436 437 for (i = 0; i < DEVIATIONS; i++) { 438 fprintf(stderr, " %.0f < %d", lat->deviations[i], 439 deviations[i]); 440 total_counted += lat->deviations[i]; 441 } 442 if (total_counted && lat->total_io - total_counted) 443 fprintf(stderr, " < %.0f", lat->total_io - total_counted); 444 fprintf(stderr, "\n"); 445 memset(lat, 0, sizeof(*lat)); 446 } 447 448 static void print_latency(struct thread_info *t) 449 { 450 struct io_latency *lat = &t->io_submit_latency; 451 print_lat("latency", lat); 452 } 453 454 static void print_completion_latency(struct thread_info *t) 455 { 456 struct io_latency *lat = &t->io_completion_latency; 457 print_lat("completion latency", lat); 458 } 459 460 /* 461 * updates the fields in the io operation struct that belongs to this 462 * io unit, and make the io unit reusable again 463 */ 464 void finish_io(struct thread_info *t, struct io_unit *io, long result, 465 struct timeval *tv_now) 466 { 467 struct io_oper *oper = io->io_oper; 468 469 calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency); 470 io->res = result; 471 io->busy = IO_FREE; 472 io->next = t->free_ious; 473 t->free_ious = io; 474 oper->num_pending--; 475 t->num_global_pending--; 476 check_finished_io(io); 477 if (oper->num_pending == 0 && 478 (oper->started_ios == oper->total_ios || oper->stonewalled)) { 479 print_time(oper); 480 } 481 } 482 483 int read_some_events(struct thread_info *t) 484 { 485 struct io_unit *event_io; 486 struct io_event *event; 487 int nr; 488 int i; 489 int min_nr = io_iter; 490 struct timeval stop_time; 491 492 if (t->num_global_pending < io_iter) 493 min_nr = t->num_global_pending; 494 495 #ifdef NEW_GETEVENTS 496 nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events, 497 NULL); 498 #else 499 nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL); 500 #endif 501 if (nr <= 0) 502 return nr; 503 504 gettimeofday(&stop_time, NULL); 505 for (i = 0; i < nr; i++) { 506 event = t->events + i; 507 event_io = (struct io_unit *)((unsigned long)event->obj); 508 finish_io(t, event_io, event->res, &stop_time); 509 } 510 return nr; 511 } 512 513 /* 514 * finds a free io unit, waiting for pending requests if required. returns 515 * null if none could be found 516 */ 517 static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper) 518 { 519 struct io_unit *event_io; 520 int nr; 521 522 retry: 523 if (t->free_ious) { 524 event_io = t->free_ious; 525 t->free_ious = t->free_ious->next; 526 if (grab_iou(event_io, oper)) { 527 fprintf(stderr, "io unit on free list but not free\n"); 528 abort(); 529 } 530 return event_io; 531 } 532 nr = read_some_events(t); 533 if (nr > 0) 534 goto retry; 535 else 536 fprintf(stderr, "no free ious after read_some_events\n"); 537 return NULL; 538 } 539 540 /* 541 * wait for all pending requests for this io operation to finish 542 */ 543 static int io_oper_wait(struct thread_info *t, struct io_oper *oper) 544 { 545 struct io_event event; 546 struct io_unit *event_io; 547 548 if (oper == NULL) { 549 return 0; 550 } 551 552 if (oper->num_pending == 0) 553 goto done; 554 555 /* this func is not speed sensitive, no need to go wild reading 556 * more than one event at a time 557 */ 558 #ifdef NEW_GETEVENTS 559 while (io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) { 560 #else 561 while (io_getevents(t->io_ctx, 1, &event, NULL) > 0) { 562 #endif 563 struct timeval tv_now; 564 event_io = (struct io_unit *)((unsigned long)event.obj); 565 566 gettimeofday(&tv_now, NULL); 567 finish_io(t, event_io, event.res, &tv_now); 568 569 if (oper->num_pending == 0) 570 break; 571 } 572 done: 573 if (oper->num_err) { 574 fprintf(stderr, "%u errors on oper, last %u\n", 575 oper->num_err, oper->last_err); 576 } 577 return 0; 578 } 579 580 off_t random_byte_offset(struct io_oper * oper) 581 { 582 off_t num; 583 off_t rand_byte = oper->start; 584 off_t range; 585 off_t offset = 1; 586 587 range = (oper->end - oper->start) / (1024 * 1024); 588 if ((page_size_mask + 1) > (1024 * 1024)) 589 offset = (page_size_mask + 1) / (1024 * 1024); 590 if (range < offset) 591 range = 0; 592 else 593 range -= offset; 594 595 /* find a random mb offset */ 596 num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0)); 597 rand_byte += num * 1024 * 1024; 598 599 /* find a random byte offset */ 600 num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0)); 601 602 /* page align */ 603 num = (num + page_size_mask) & ~page_size_mask; 604 rand_byte += num; 605 606 if (rand_byte + oper->reclen > oper->end) { 607 rand_byte -= oper->reclen; 608 } 609 return rand_byte; 610 } 611 612 /* 613 * build an aio iocb for an operation, based on oper->rw and the 614 * last offset used. This finds the struct io_unit that will be attached 615 * to the iocb, and things are ready for submission to aio after this 616 * is called. 617 * 618 * returns null on error 619 */ 620 static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper) 621 { 622 struct io_unit *io; 623 off_t rand_byte; 624 625 io = find_iou(t, oper); 626 if (!io) { 627 fprintf(stderr, "unable to find io unit\n"); 628 return NULL; 629 } 630 631 switch (oper->rw) { 632 case WRITE: 633 io_prep_pwrite(&io->iocb, oper->fd, io->buf, oper->reclen, 634 oper->last_offset); 635 oper->last_offset += oper->reclen; 636 break; 637 case READ: 638 io_prep_pread(&io->iocb, oper->fd, io->buf, oper->reclen, 639 oper->last_offset); 640 oper->last_offset += oper->reclen; 641 break; 642 case RREAD: 643 rand_byte = random_byte_offset(oper); 644 oper->last_offset = rand_byte; 645 io_prep_pread(&io->iocb, oper->fd, io->buf, oper->reclen, 646 rand_byte); 647 break; 648 case RWRITE: 649 rand_byte = random_byte_offset(oper); 650 oper->last_offset = rand_byte; 651 io_prep_pwrite(&io->iocb, oper->fd, io->buf, oper->reclen, 652 rand_byte); 653 654 break; 655 } 656 657 return io; 658 } 659 660 /* 661 * wait for any pending requests, and then free all ram associated with 662 * an operation. returns the last error the operation hit (zero means none) 663 */ 664 static int finish_oper(struct thread_info *t, struct io_oper *oper) 665 { 666 unsigned long last_err; 667 668 io_oper_wait(t, oper); 669 last_err = oper->last_err; 670 if (oper->num_pending > 0) { 671 fprintf(stderr, "oper num_pending is %d\n", oper->num_pending); 672 } 673 close(oper->fd); 674 free(oper); 675 return last_err; 676 } 677 678 /* 679 * allocates an io operation and fills in all the fields. returns 680 * null on error 681 */ 682 static struct io_oper *create_oper(int fd, int rw, off_t start, off_t end, 683 int reclen, int depth, int iter, 684 char *file_name) 685 { 686 struct io_oper *oper; 687 688 oper = malloc(sizeof(*oper)); 689 if (!oper) { 690 fprintf(stderr, "unable to allocate io oper\n"); 691 return NULL; 692 } 693 memset(oper, 0, sizeof(*oper)); 694 695 oper->depth = depth; 696 oper->start = start; 697 oper->end = end; 698 oper->last_offset = oper->start; 699 oper->fd = fd; 700 oper->reclen = reclen; 701 oper->rw = rw; 702 oper->total_ios = (oper->end - oper->start) / oper->reclen; 703 oper->file_name = file_name; 704 705 return oper; 706 } 707 708 /* 709 * does setup on num_ios worth of iocbs, but does not actually 710 * start any io 711 */ 712 int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios, 713 struct iocb **my_iocbs) 714 { 715 int i; 716 struct io_unit *io; 717 718 if (oper->started_ios == 0) 719 gettimeofday(&oper->start_time, NULL); 720 721 if (num_ios == 0) 722 num_ios = oper->total_ios; 723 724 if ((oper->started_ios + num_ios) > oper->total_ios) 725 num_ios = oper->total_ios - oper->started_ios; 726 727 for (i = 0; i < num_ios; i++) { 728 io = build_iocb(t, oper); 729 if (!io) { 730 return -1; 731 } 732 my_iocbs[i] = &io->iocb; 733 } 734 return num_ios; 735 } 736 737 /* 738 * runs through the iocbs in the array provided and updates 739 * counters in the associated oper struct 740 */ 741 static void update_iou_counters(struct iocb **my_iocbs, int nr, 742 struct timeval *tv_now) 743 { 744 struct io_unit *io; 745 int i; 746 for (i = 0; i < nr; i++) { 747 io = (struct io_unit *)(my_iocbs[i]); 748 io->io_oper->num_pending++; 749 io->io_oper->started_ios++; 750 io->io_start_time = *tv_now; /* set time of io_submit */ 751 } 752 } 753 754 /* starts some io for a given file, returns zero if all went well */ 755 int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs) 756 { 757 int ret; 758 struct timeval start_time; 759 struct timeval stop_time; 760 761 resubmit: 762 gettimeofday(&start_time, NULL); 763 ret = io_submit(t->io_ctx, num_ios, my_iocbs); 764 gettimeofday(&stop_time, NULL); 765 calc_latency(&start_time, &stop_time, &t->io_submit_latency); 766 767 if (ret != num_ios) { 768 /* some I/O got through */ 769 if (ret > 0) { 770 update_iou_counters(my_iocbs, ret, &stop_time); 771 my_iocbs += ret; 772 t->num_global_pending += ret; 773 num_ios -= ret; 774 } 775 /* 776 * we've used all the requests allocated in aio_init, wait and 777 * retry 778 */ 779 if (ret > 0 || ret == -EAGAIN) { 780 int old_ret = ret; 781 if ((ret = read_some_events(t) > 0)) { 782 goto resubmit; 783 } else { 784 fprintf(stderr, "ret was %d and now is %d\n", 785 ret, old_ret); 786 abort(); 787 } 788 } 789 790 fprintf(stderr, "ret %d (%s) on io_submit\n", ret, 791 strerror(-ret)); 792 return -1; 793 } 794 update_iou_counters(my_iocbs, ret, &stop_time); 795 t->num_global_pending += ret; 796 return 0; 797 } 798 799 /* 800 * changes oper->rw to the next in a command sequence, or returns zero 801 * to say this operation is really, completely done for 802 */ 803 static int restart_oper(struct io_oper *oper) 804 { 805 int new_rw = 0; 806 if (oper->last_err) 807 return 0; 808 809 /* this switch falls through */ 810 switch (oper->rw) { 811 case WRITE: 812 if (stages & (1 << READ)) 813 new_rw = READ; 814 case READ: 815 if (!new_rw && stages & (1 << RWRITE)) 816 new_rw = RWRITE; 817 case RWRITE: 818 if (!new_rw && stages & (1 << RREAD)) 819 new_rw = RREAD; 820 } 821 822 if (new_rw) { 823 oper->started_ios = 0; 824 oper->last_offset = oper->start; 825 oper->stonewalled = 0; 826 827 /* 828 * we're restarting an operation with pending requests, so the 829 * timing info won't be printed by finish_io. Printing it here 830 */ 831 if (oper->num_pending) 832 print_time(oper); 833 834 oper->rw = new_rw; 835 return 1; 836 } 837 return 0; 838 } 839 840 static int oper_runnable(struct io_oper *oper) 841 { 842 struct stat buf; 843 int ret; 844 845 /* first context is always runnable, if started_ios > 0, no need to 846 * redo the calculations 847 */ 848 if (oper->started_ios || oper->start == 0) 849 return 1; 850 /* 851 * only the sequential phases force delays in starting */ 852 if (oper->rw >= RWRITE) 853 return 1; 854 ret = fstat(oper->fd, &buf); 855 if (ret < 0) { 856 perror("fstat"); 857 exit(1); 858 } 859 if (S_ISREG(buf.st_mode) && buf.st_size < oper->start) 860 return 0; 861 return 1; 862 } 863 864 /* 865 * runs through all the io operations on the active list, and starts 866 * a chunk of io on each. If any io operations are completely finished, 867 * it either switches them to the next stage or puts them on the 868 * finished list. 869 * 870 * this function stops after max_io_submit iocbs are sent down the 871 * pipe, even if it has not yet touched all the operations on the 872 * active list. Any operations that have finished are moved onto 873 * the finished_opers list. 874 */ 875 static int run_active_list(struct thread_info *t, 876 int io_iter, int max_io_submit) 877 { 878 struct io_oper *oper; 879 struct io_oper *built_opers = NULL; 880 struct iocb **my_iocbs = t->iocbs; 881 int ret = 0; 882 int num_built = 0; 883 884 oper = t->active_opers; 885 while (oper) { 886 if (!oper_runnable(oper)) { 887 oper = oper->next; 888 if (oper == t->active_opers) 889 break; 890 continue; 891 } 892 ret = build_oper(t, oper, io_iter, my_iocbs); 893 if (ret >= 0) { 894 my_iocbs += ret; 895 num_built += ret; 896 oper_list_del(oper, &t->active_opers); 897 oper_list_add(oper, &built_opers); 898 oper = t->active_opers; 899 if (num_built + io_iter > max_io_submit) 900 break; 901 } else 902 break; 903 } 904 if (num_built) { 905 ret = run_built(t, num_built, t->iocbs); 906 if (ret < 0) { 907 fprintf(stderr, "error %d on run_built\n", ret); 908 exit(1); 909 } 910 while (built_opers) { 911 oper = built_opers; 912 oper_list_del(oper, &built_opers); 913 oper_list_add(oper, &t->active_opers); 914 if (oper->started_ios == oper->total_ios) { 915 oper_list_del(oper, &t->active_opers); 916 oper_list_add(oper, &t->finished_opers); 917 } 918 } 919 } 920 return 0; 921 } 922 923 void drop_shm() 924 { 925 int ret; 926 struct shmid_ds ds; 927 if (use_shm != USE_SHM) 928 return; 929 930 ret = shmctl(shm_id, IPC_RMID, &ds); 931 if (ret) { 932 perror("shmctl IPC_RMID"); 933 } 934 } 935 936 void aio_setup(io_context_t * io_ctx, int n) 937 { 938 int res = io_queue_init(n, io_ctx); 939 if (res != 0) { 940 fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n", 941 n, res, strerror(-res)); 942 exit(3); 943 } 944 } 945 946 /* 947 * allocate io operation and event arrays for a given thread 948 */ 949 int setup_ious(struct thread_info *t, 950 int num_files, int depth, int reclen, int max_io_submit) 951 { 952 int i; 953 size_t bytes = num_files * depth * sizeof(*t->ios); 954 955 t->ios = malloc(bytes); 956 if (!t->ios) { 957 fprintf(stderr, "unable to allocate io units\n"); 958 return -1; 959 } 960 memset(t->ios, 0, bytes); 961 962 for (i = 0; i < depth * num_files; i++) { 963 t->ios[i].buf = aligned_buffer; 964 aligned_buffer += padded_reclen; 965 t->ios[i].buf_size = reclen; 966 if (verify) 967 memset(t->ios[i].buf, 'b', reclen); 968 else 969 memset(t->ios[i].buf, 0, reclen); 970 t->ios[i].next = t->free_ious; 971 t->free_ious = t->ios + i; 972 } 973 if (verify) { 974 verify_buf = aligned_buffer; 975 memset(verify_buf, 'b', reclen); 976 } 977 978 t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit); 979 if (!t->iocbs) { 980 fprintf(stderr, "unable to allocate iocbs\n"); 981 goto free_buffers; 982 } 983 984 memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *)); 985 986 t->events = malloc(sizeof(struct io_event) * depth * num_files); 987 if (!t->events) { 988 fprintf(stderr, "unable to allocate ram for events\n"); 989 goto free_buffers; 990 } 991 memset(t->events, 0, num_files * sizeof(struct io_event) * depth); 992 993 t->num_global_ios = num_files * depth; 994 t->num_global_events = t->num_global_ios; 995 return 0; 996 997 free_buffers: 998 free(t->ios); 999 free(t->iocbs); 1000 free(t->events); 1001 return -1; 1002 } 1003 1004 /* 1005 * The buffers used for file data are allocated as a single big 1006 * malloc, and then each thread and operation takes a piece and uses 1007 * that for file data. This lets us do a large shm or bigpages alloc 1008 * and without trying to find a special place in each thread to map the 1009 * buffers to 1010 */ 1011 int setup_shared_mem(int num_threads, int num_files, int depth, 1012 int reclen, int max_io_submit) 1013 { 1014 char *p = NULL; 1015 size_t total_ram; 1016 1017 padded_reclen = (reclen + page_size_mask) / (page_size_mask + 1); 1018 padded_reclen = padded_reclen * (page_size_mask + 1); 1019 total_ram = num_files * depth * padded_reclen + num_threads; 1020 if (verify) 1021 total_ram += padded_reclen; 1022 1023 /* for aligning buffer after the allocation */ 1024 total_ram += page_size_mask; 1025 1026 if (use_shm == USE_MALLOC) { 1027 p = malloc(total_ram); 1028 } else if (use_shm == USE_SHM) { 1029 shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700); 1030 if (shm_id < 0) { 1031 perror("shmget"); 1032 drop_shm(); 1033 goto free_buffers; 1034 } 1035 p = shmat(shm_id, (char *)0x50000000, 0); 1036 if ((long)p == -1) { 1037 perror("shmat"); 1038 goto free_buffers; 1039 } 1040 /* won't really be dropped until we shmdt */ 1041 drop_shm(); 1042 } else if (use_shm == USE_SHMFS) { 1043 char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */ 1044 int fd; 1045 1046 strcpy(mmap_name, "/dev/shm/XXXXXX"); 1047 fd = mkstemp(mmap_name); 1048 if (fd < 0) { 1049 perror("mkstemp"); 1050 goto free_buffers; 1051 } 1052 unlink(mmap_name); 1053 ftruncate(fd, total_ram); 1054 shm_id = fd; 1055 p = mmap((char *)0x50000000, total_ram, 1056 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 1057 1058 if (p == MAP_FAILED) { 1059 perror("mmap"); 1060 goto free_buffers; 1061 } 1062 } 1063 if (!p) { 1064 fprintf(stderr, "unable to allocate buffers\n"); 1065 goto free_buffers; 1066 } 1067 unaligned_buffer = p; 1068 p = (char *)((intptr_t) (p + page_size_mask) & ~page_size_mask); 1069 aligned_buffer = p; 1070 return 0; 1071 1072 free_buffers: 1073 drop_shm(); 1074 if (unaligned_buffer) 1075 free(unaligned_buffer); 1076 return -1; 1077 } 1078 1079 /* 1080 * runs through all the thread_info structs and calculates a combined 1081 * throughput 1082 */ 1083 void global_thread_throughput(struct thread_info *t, char *this_stage) 1084 { 1085 int i; 1086 double runtime = time_since_now(&global_stage_start_time); 1087 double total_mb = 0; 1088 double min_trans = 0; 1089 1090 for (i = 0; i < num_threads; i++) { 1091 total_mb += global_thread_info[i].stage_mb_trans; 1092 if (!min_trans || t->stage_mb_trans < min_trans) 1093 min_trans = t->stage_mb_trans; 1094 } 1095 if (total_mb) { 1096 fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage, 1097 total_mb / runtime); 1098 fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime); 1099 if (stonewall) 1100 fprintf(stderr, " min transfer %.2fMB", min_trans); 1101 fprintf(stderr, "\n"); 1102 } 1103 } 1104 1105 /* this is the meat of the state machine. There is a list of 1106 * active operations structs, and as each one finishes the required 1107 * io it is moved to a list of finished operations. Once they have 1108 * all finished whatever stage they were in, they are given the chance 1109 * to restart and pick a different stage (read/write/random read etc) 1110 * 1111 * various timings are printed in between the stages, along with 1112 * thread synchronization if there are more than one threads. 1113 */ 1114 int worker(struct thread_info *t) 1115 { 1116 struct io_oper *oper; 1117 char *this_stage = NULL; 1118 struct timeval stage_time; 1119 int status = 0; 1120 int iteration = 0; 1121 int cnt; 1122 1123 aio_setup(&t->io_ctx, 512); 1124 1125 restart: 1126 if (num_threads > 1) { 1127 pthread_mutex_lock(&stage_mutex); 1128 threads_starting++; 1129 if (threads_starting == num_threads) { 1130 threads_ending = 0; 1131 gettimeofday(&global_stage_start_time, NULL); 1132 pthread_cond_broadcast(&stage_cond); 1133 } 1134 while (threads_starting != num_threads) 1135 pthread_cond_wait(&stage_cond, &stage_mutex); 1136 pthread_mutex_unlock(&stage_mutex); 1137 } 1138 if (t->active_opers) { 1139 this_stage = stage_name(t->active_opers->rw); 1140 gettimeofday(&stage_time, NULL); 1141 t->stage_mb_trans = 0; 1142 } 1143 1144 cnt = 0; 1145 /* first we send everything through aio */ 1146 while (t->active_opers 1147 && (cnt < iterations || iterations == RUN_FOREVER)) { 1148 if (stonewall && threads_ending) { 1149 oper = t->active_opers; 1150 oper->stonewalled = 1; 1151 oper_list_del(oper, &t->active_opers); 1152 oper_list_add(oper, &t->finished_opers); 1153 } else { 1154 run_active_list(t, io_iter, max_io_submit); 1155 } 1156 cnt++; 1157 } 1158 if (latency_stats) 1159 print_latency(t); 1160 1161 if (completion_latency_stats) 1162 print_completion_latency(t); 1163 1164 /* then we wait for all the operations to finish */ 1165 oper = t->finished_opers; 1166 do { 1167 if (!oper) 1168 break; 1169 io_oper_wait(t, oper); 1170 oper = oper->next; 1171 } while (oper != t->finished_opers); 1172 1173 /* then we do an fsync to get the timing for any future operations 1174 * right, and check to see if any of these need to get restarted 1175 */ 1176 oper = t->finished_opers; 1177 while (oper) { 1178 if (fsync_stages) 1179 fsync(oper->fd); 1180 t->stage_mb_trans += oper_mb_trans(oper); 1181 if (restart_oper(oper)) { 1182 oper_list_del(oper, &t->finished_opers); 1183 oper_list_add(oper, &t->active_opers); 1184 oper = t->finished_opers; 1185 continue; 1186 } 1187 oper = oper->next; 1188 if (oper == t->finished_opers) 1189 break; 1190 } 1191 1192 if (t->stage_mb_trans && t->num_files > 0) { 1193 double seconds = time_since_now(&stage_time); 1194 fprintf(stderr, 1195 "thread %td %s totals (%.2f MB/s) %.2f MB in %.2fs\n", 1196 t - global_thread_info, this_stage, 1197 t->stage_mb_trans / seconds, t->stage_mb_trans, 1198 seconds); 1199 } 1200 1201 if (num_threads > 1) { 1202 pthread_mutex_lock(&stage_mutex); 1203 threads_ending++; 1204 if (threads_ending == num_threads) { 1205 threads_starting = 0; 1206 pthread_cond_broadcast(&stage_cond); 1207 global_thread_throughput(t, this_stage); 1208 } 1209 while (threads_ending != num_threads) 1210 pthread_cond_wait(&stage_cond, &stage_mutex); 1211 pthread_mutex_unlock(&stage_mutex); 1212 } 1213 1214 /* someone got restarted, go back to the beginning */ 1215 if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) { 1216 iteration++; 1217 goto restart; 1218 } 1219 1220 /* finally, free all the ram */ 1221 while (t->finished_opers) { 1222 oper = t->finished_opers; 1223 oper_list_del(oper, &t->finished_opers); 1224 status = finish_oper(t, oper); 1225 } 1226 1227 if (t->num_global_pending) { 1228 fprintf(stderr, "global num pending is %d\n", 1229 t->num_global_pending); 1230 } 1231 io_queue_release(t->io_ctx); 1232 1233 return status; 1234 } 1235 1236 typedef void *(*start_routine) (void *); 1237 int run_workers(struct thread_info *t, int num_threads) 1238 { 1239 int ret; 1240 int i; 1241 1242 for (i = 0; i < num_threads; i++) { 1243 ret = 1244 pthread_create(&t[i].tid, NULL, (start_routine) worker, 1245 t + i); 1246 if (ret) { 1247 perror("pthread_create"); 1248 exit(1); 1249 } 1250 } 1251 for (i = 0; i < num_threads; i++) { 1252 ret = pthread_join(t[i].tid, NULL); 1253 if (ret) { 1254 perror("pthread_join"); 1255 exit(1); 1256 } 1257 } 1258 return 0; 1259 } 1260 1261 off_t parse_size(char *size_arg, off_t mult) 1262 { 1263 char c; 1264 int num; 1265 off_t ret; 1266 c = size_arg[strlen(size_arg) - 1]; 1267 if (c > '9') { 1268 size_arg[strlen(size_arg) - 1] = '\0'; 1269 } 1270 num = atoi(size_arg); 1271 switch (c) { 1272 case 'g': 1273 case 'G': 1274 mult = 1024 * 1024 * 1024; 1275 break; 1276 case 'm': 1277 case 'M': 1278 mult = 1024 * 1024; 1279 break; 1280 case 'k': 1281 case 'K': 1282 mult = 1024; 1283 break; 1284 case 'b': 1285 case 'B': 1286 mult = 1; 1287 break; 1288 } 1289 ret = mult * num; 1290 return ret; 1291 } 1292 1293 void print_usage(void) 1294 { 1295 printf 1296 ("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n"); 1297 printf 1298 (" [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n"); 1299 printf(" file1 [file2 ...]\n"); 1300 printf("\t-a size in KB at which to align buffers\n"); 1301 printf("\t-b max number of iocbs to give io_submit at once\n"); 1302 printf("\t-c number of io contexts per file\n"); 1303 printf("\t-C offset between contexts, default 2MB\n"); 1304 printf("\t-s size in MB of the test file(s), default 1024MB\n"); 1305 printf("\t-r record size in KB used for each io, default 64KB\n"); 1306 printf 1307 ("\t-d number of pending aio requests for each file, default 64\n"); 1308 printf("\t-i number of I/O per file sent before switching\n" 1309 "\t to the next file, default 8\n"); 1310 printf("\t-I total number of ayncs I/O the program will run, " 1311 "default is run until Cntl-C\n"); 1312 printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n"); 1313 printf("\t-S Use O_SYNC for writes\n"); 1314 printf("\t-o add an operation to the list: write=0, read=1,\n"); 1315 printf("\t random write=2, random read=3.\n"); 1316 printf("\t repeat -o to specify multiple ops: -o 0 -o 1 etc.\n"); 1317 printf 1318 ("\t-m shm use ipc shared memory for io buffers instead of malloc\n"); 1319 printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n"); 1320 printf("\t-n no fsyncs between write stage and read stage\n"); 1321 printf("\t-l print io_submit latencies after each stage\n"); 1322 printf("\t-L print io completion latencies after each stage\n"); 1323 printf("\t-t number of threads to run\n"); 1324 printf("\t-u unlink files after completion\n"); 1325 printf("\t-v verification of bytes written\n"); 1326 printf("\t-x turn off thread stonewalling\n"); 1327 printf("\t-h this message\n"); 1328 printf 1329 ("\n\t the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n"); 1330 printf("\t translate to 400KB, 400MB and 400GB\n"); 1331 printf("version %s\n", PROG_VERSION); 1332 } 1333 1334 int main(int ac, char **av) 1335 { 1336 int rwfd; 1337 int i; 1338 int j; 1339 int c; 1340 1341 off_t file_size = 1 * 1024 * 1024 * 1024; 1342 int first_stage = WRITE; 1343 struct io_oper *oper; 1344 int status = 0; 1345 int num_files = 0; 1346 int open_fds = 0; 1347 struct thread_info *t; 1348 1349 page_size_mask = getpagesize() - 1; 1350 1351 while (1) { 1352 c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lLnhOSxvu"); 1353 if (c < 0) 1354 break; 1355 1356 switch (c) { 1357 case 'a': 1358 page_size_mask = parse_size(optarg, 1024); 1359 page_size_mask--; 1360 break; 1361 case 'c': 1362 num_contexts = atoi(optarg); 1363 break; 1364 case 'C': 1365 context_offset = parse_size(optarg, 1024 * 1024); 1366 case 'b': 1367 max_io_submit = atoi(optarg); 1368 break; 1369 case 's': 1370 file_size = parse_size(optarg, 1024 * 1024); 1371 break; 1372 case 'd': 1373 depth = atoi(optarg); 1374 break; 1375 case 'r': 1376 rec_len = parse_size(optarg, 1024); 1377 break; 1378 case 'i': 1379 io_iter = atoi(optarg); 1380 break; 1381 case 'I': 1382 iterations = atoi(optarg); 1383 break; 1384 case 'n': 1385 fsync_stages = 0; 1386 break; 1387 case 'l': 1388 latency_stats = 1; 1389 break; 1390 case 'L': 1391 completion_latency_stats = 1; 1392 break; 1393 case 'm': 1394 if (!strcmp(optarg, "shm")) { 1395 fprintf(stderr, "using ipc shm\n"); 1396 use_shm = USE_SHM; 1397 } else if (!strcmp(optarg, "shmfs")) { 1398 fprintf(stderr, "using /dev/shm for buffers\n"); 1399 use_shm = USE_SHMFS; 1400 } 1401 break; 1402 case 'o': 1403 i = atoi(optarg); 1404 stages |= 1 << i; 1405 fprintf(stderr, "adding stage %s\n", stage_name(i)); 1406 break; 1407 case 'O': 1408 o_direct = O_DIRECT; 1409 break; 1410 case 'S': 1411 o_sync = O_SYNC; 1412 break; 1413 case 't': 1414 num_threads = atoi(optarg); 1415 break; 1416 case 'x': 1417 stonewall = 0; 1418 break; 1419 case 'u': 1420 unlink_files = 1; 1421 break; 1422 case 'v': 1423 verify = 1; 1424 break; 1425 case 'h': 1426 default: 1427 print_usage(); 1428 exit(1); 1429 } 1430 } 1431 1432 /* 1433 * make sure we don't try to submit more I/O than we have allocated 1434 * memory for 1435 */ 1436 if (depth < io_iter) { 1437 io_iter = depth; 1438 fprintf(stderr, "dropping io_iter to %d\n", io_iter); 1439 } 1440 1441 if (optind >= ac) { 1442 print_usage(); 1443 exit(1); 1444 } 1445 1446 num_files = ac - optind; 1447 1448 if (num_threads > (num_files * num_contexts)) { 1449 num_threads = num_files * num_contexts; 1450 fprintf(stderr, 1451 "dropping thread count to the number of contexts %d\n", 1452 num_threads); 1453 } 1454 1455 t = malloc(num_threads * sizeof(*t)); 1456 if (!t) { 1457 perror("malloc"); 1458 exit(1); 1459 } 1460 memset(t, 0, num_threads * sizeof(*t)); 1461 global_thread_info = t; 1462 1463 /* by default, allow a huge number of iocbs to be sent towards 1464 * io_submit 1465 */ 1466 if (!max_io_submit) 1467 max_io_submit = num_files * io_iter * num_contexts; 1468 1469 /* 1470 * make sure we don't try to submit more I/O than max_io_submit allows 1471 */ 1472 if (max_io_submit < io_iter) { 1473 io_iter = max_io_submit; 1474 fprintf(stderr, "dropping io_iter to %d\n", io_iter); 1475 } 1476 1477 if (!stages) { 1478 stages = 1479 (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE); 1480 } else { 1481 for (i = 0; i < LAST_STAGE; i++) { 1482 if (stages & (1 << i)) { 1483 first_stage = i; 1484 fprintf(stderr, "starting with %s\n", 1485 stage_name(i)); 1486 break; 1487 } 1488 } 1489 } 1490 1491 if (file_size < num_contexts * context_offset) { 1492 fprintf(stderr, "file size %ld too small for %d contexts\n", 1493 (long)file_size, num_contexts); 1494 exit(1); 1495 } 1496 1497 fprintf(stderr, "file size %ldMB, record size %ldKB, depth %d, " 1498 "I/O per iteration %d\n", 1499 (long)(file_size / (1024 * 1024)), 1500 rec_len / 1024, depth, io_iter); 1501 fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n", 1502 max_io_submit, (page_size_mask + 1) / 1024); 1503 fprintf(stderr, "threads %d files %d contexts %d context offset %ldMB " 1504 "verification %s\n", num_threads, num_files, num_contexts, 1505 (long)(context_offset / (1024 * 1024)), verify ? "on" : "off"); 1506 /* open all the files and do any required setup for them */ 1507 for (i = optind; i < ac; i++) { 1508 int thread_index; 1509 for (j = 0; j < num_contexts; j++) { 1510 thread_index = open_fds % num_threads; 1511 open_fds++; 1512 1513 rwfd = 1514 open(av[i], O_CREAT | O_RDWR | o_direct | o_sync, 1515 0600); 1516 if (rwfd == -1) { 1517 fprintf(stderr, 1518 "error while creating file %s: %s", 1519 av[i], strerror(errno)); 1520 exit(1); 1521 } 1522 1523 oper = 1524 create_oper(rwfd, first_stage, j * context_offset, 1525 file_size - j * context_offset, rec_len, 1526 depth, io_iter, av[i]); 1527 if (!oper) { 1528 fprintf(stderr, "error in create_oper\n"); 1529 exit(-1); 1530 } 1531 oper_list_add(oper, &t[thread_index].active_opers); 1532 t[thread_index].num_files++; 1533 } 1534 } 1535 if (setup_shared_mem(num_threads, num_files * num_contexts, 1536 depth, rec_len, max_io_submit)) { 1537 exit(1); 1538 } 1539 for (i = 0; i < num_threads; i++) { 1540 if (setup_ious 1541 (&t[i], t[i].num_files, depth, rec_len, max_io_submit)) 1542 exit(1); 1543 } 1544 if (num_threads > 1) { 1545 printf("Running multi thread version num_threads:%d\n", 1546 num_threads); 1547 run_workers(t, num_threads); 1548 } else { 1549 printf("Running single thread version \n"); 1550 status = worker(t); 1551 } 1552 if (unlink_files) { 1553 for (i = optind; i < ac; i++) { 1554 printf("Cleaning up file %s \n", av[i]); 1555 unlink(av[i]); 1556 } 1557 } 1558 1559 if (status) { 1560 exit(1); 1561 } 1562 return status; 1563 } 1564 #else 1565 int main(void) 1566 { 1567 fprintf(stderr, "test requires libaio and it's development packages\n"); 1568 return TCONF; 1569 } 1570 #endif 1571