Home | History | Annotate | Download | only in aiostress
      1 /*
      2  * Copyright (c) 2004 SuSE, Inc.  All Rights Reserved.
      3  *
      4  * This program is free software; you can redistribute it and/or modify it
      5  * under the terms of version 2 of the GNU General Public License as
      6  * published by the Free Software Foundation.
      7  *
      8  * This program is distributed in the hope that it would be useful, but
      9  * WITHOUT ANY WARRANTY; without even the implied warranty of
     10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
     11  *
     12  * Further, this software is distributed without any warranty that it is
     13  * free of the rightful claim of any third person regarding infringement
     14  * or the like.  Any license provided herein, whether implied or
     15  * otherwise, applies only to this software file.  Patent licenses, if
     16  * any, provided herein do not apply to combinations of this program with
     17  * other software, or any other product whatsoever.
     18  *
     19  * You should have received a copy of the GNU General Public License along
     20  * with this program; if not, write the Free Software Foundation, Inc., 59
     21  * Temple Place - Suite 330, Boston MA 02111-1307, USA.
     22  *
     23  * Contact information: Silicon Graphics, Inc., 1600 Amphitheatre Pkwy,
     24  * Mountain View, CA  94043, or:
     25  *
     26  *
     27  * aio-stress
     28  *
     29  * will open or create each file on the command line, and start a series
     30  * of aio to it.
     31  *
     32  * aio is done in a rotating loop.  first file1 gets 8 requests, then
     33  * file2, then file3 etc.  As each file finishes writing, it is switched
     34  * to reads
     35  *
     36  * io buffers are aligned in case you want to do raw io
     37  *
     38  * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c
     39  *
     40  * run aio-stress -h to see the options
     41  *
     42  * Please mail Chris Mason (mason (at) suse.com) with bug reports or patches
     43  */
     44 #define _FILE_OFFSET_BITS 64
     45 #define PROG_VERSION "0.21"
     46 #define NEW_GETEVENTS
     47 
     48 #include <stdio.h>
     49 #include <errno.h>
     50 #include <assert.h>
     51 #include <stdlib.h>
     52 
     53 #include <sys/types.h>
     54 #include <sys/stat.h>
     55 #include <fcntl.h>
     56 #include <unistd.h>
     57 #include <sys/time.h>
     58 #include <libaio.h>
     59 #include <sys/ipc.h>
     60 #include <sys/shm.h>
     61 #include <sys/mman.h>
     62 #include <string.h>
     63 #include <pthread.h>
     64 
     65 #define IO_FREE 0
     66 #define IO_PENDING 1
     67 #define RUN_FOREVER -1
     68 
     69 #ifndef O_DIRECT
     70 #define O_DIRECT         040000 /* direct disk access hint */
     71 #endif
     72 
     73 enum {
     74     WRITE,
     75     READ,
     76     RWRITE,
     77     RREAD,
     78     LAST_STAGE,
     79 };
     80 
     81 #define USE_MALLOC 0
     82 #define USE_SHM 1
     83 #define USE_SHMFS 2
     84 
     85 /*
     86  * various globals, these are effectively read only by the time the threads
     87  * are started
     88  */
     89 long stages = 0;
     90 unsigned long page_size_mask;
     91 int o_direct = 0;
     92 int o_sync = 0;
     93 int latency_stats = 0;
     94 int completion_latency_stats = 0;
     95 int io_iter = 8;
     96 int iterations = RUN_FOREVER;
     97 int max_io_submit = 0;
     98 long rec_len = 64 * 1024;
     99 int depth = 64;
    100 int num_threads = 1;
    101 int num_contexts = 1;
    102 off_t context_offset = 2 * 1024 * 1024;
    103 int fsync_stages = 1;
    104 int use_shm = 0;
    105 int shm_id;
    106 char *unaligned_buffer = NULL;
    107 char *aligned_buffer = NULL;
    108 int padded_reclen = 0;
    109 int stonewall = 1;
    110 int verify = 0;
    111 char *verify_buf = NULL;
    112 int unlink_files = 0;
    113 
    114 struct io_unit;
    115 struct thread_info;
    116 
    117 /* pthread mutexes and other globals for keeping the threads in sync */
    118 pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER;
    119 pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER;
    120 int threads_ending = 0;
    121 int threads_starting = 0;
    122 struct timeval global_stage_start_time;
    123 struct thread_info *global_thread_info;
    124 
    125 /*
    126  * latencies during io_submit are measured, these are the
    127  * granularities for deviations
    128  */
    129 #define DEVIATIONS 6
    130 int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 };
    131 struct io_latency {
    132     double max;
    133     double min;
    134     double total_io;
    135     double total_lat;
    136     double deviations[DEVIATIONS];
    137 };
    138 
    139 /* container for a series of operations to a file */
    140 struct io_oper {
    141     /* already open file descriptor, valid for whatever operation you want */
    142     int fd;
    143 
    144     /* starting byte of the operation */
    145     off_t start;
    146 
    147     /* ending byte of the operation */
    148     off_t end;
    149 
    150     /* size of the read/write buffer */
    151     int reclen;
    152 
    153     /* max number of pending requests before a wait is triggered */
    154     int depth;
    155 
    156     /* current number of pending requests */
    157     int num_pending;
    158 
    159     /* last error, zero if there were none */
    160     int last_err;
    161 
    162     /* total number of errors hit. */
    163     int num_err;
    164 
    165     /* read,write, random, etc */
    166     int rw;
    167 
    168     /* number of ios that will get sent to aio */
    169     int total_ios;
    170 
    171     /* number of ios we've already sent */
    172     int started_ios;
    173 
    174     /* last offset used in an io operation */
    175     off_t last_offset;
    176 
    177     /* stonewalled = 1 when we got cut off before submitting all our ios */
    178     int stonewalled;
    179 
    180     /* list management */
    181     struct io_oper *next;
    182     struct io_oper *prev;
    183 
    184     struct timeval start_time;
    185 
    186     char *file_name;
    187 };
    188 
    189 /* a single io, and all the tracking needed for it */
    190 struct io_unit {
    191     /* note, iocb must go first! */
    192     struct iocb iocb;
    193 
    194     /* pointer to parent io operation struct */
    195     struct io_oper *io_oper;
    196 
    197     /* aligned buffer */
    198     char *buf;
    199 
    200     /* size of the aligned buffer (record size) */
    201     int buf_size;
    202 
    203     /* state of this io unit (free, pending, done) */
    204     int busy;
    205 
    206     /* result of last operation */
    207     long res;
    208 
    209     struct io_unit *next;
    210 
    211     struct timeval io_start_time;		/* time of io_submit */
    212 };
    213 
    214 struct thread_info {
    215     io_context_t io_ctx;
    216     pthread_t tid;
    217 
    218     /* allocated array of io_unit structs */
    219     struct io_unit *ios;
    220 
    221     /* list of io units available for io */
    222     struct io_unit *free_ious;
    223 
    224     /* number of io units in the ios array */
    225     int num_global_ios;
    226 
    227     /* number of io units in flight */
    228     int num_global_pending;
    229 
    230     /* preallocated array of iocb pointers, only used in run_active */
    231     struct iocb **iocbs;
    232 
    233     /* preallocated array of events */
    234     struct io_event *events;
    235 
    236     /* size of the events array */
    237     int num_global_events;
    238 
    239     /* latency stats for io_submit */
    240     struct io_latency io_submit_latency;
    241 
    242     /* list of operations still in progress, and of those finished */
    243     struct io_oper *active_opers;
    244     struct io_oper *finished_opers;
    245 
    246     /* number of files this thread is doing io on */
    247     int num_files;
    248 
    249     /* how much io this thread did in the last stage */
    250     double stage_mb_trans;
    251 
    252     /* latency completion stats i/o time from io_submit until io_getevents */
    253     struct io_latency io_completion_latency;
    254 };
    255 
    256 /*
    257  * return seconds between start_tv and stop_tv in double precision
    258  */
    259 static double time_since(struct timeval *start_tv, struct timeval *stop_tv)
    260 {
    261     double sec, usec;
    262     double ret;
    263     sec = stop_tv->tv_sec - start_tv->tv_sec;
    264     usec = stop_tv->tv_usec - start_tv->tv_usec;
    265     if (sec > 0 && usec < 0) {
    266         sec--;
    267 	usec += 1000000;
    268     }
    269     ret = sec + usec / (double)1000000;
    270     if (ret < 0)
    271         ret = 0;
    272     return ret;
    273 }
    274 
    275 /*
    276  * return seconds between start_tv and now in double precision
    277  */
    278 static double time_since_now(struct timeval *start_tv)
    279 {
    280     struct timeval stop_time;
    281     gettimeofday(&stop_time, NULL);
    282     return time_since(start_tv, &stop_time);
    283 }
    284 
    285 /*
    286  * Add latency info to latency struct
    287  */
    288 static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv,
    289 			struct io_latency *lat)
    290 {
    291     double delta;
    292     int i;
    293     delta = time_since(start_tv, stop_tv);
    294     delta = delta * 1000;
    295 
    296     if (delta > lat->max)
    297     	lat->max = delta;
    298     if (!lat->min || delta < lat->min)
    299     	lat->min = delta;
    300     lat->total_io++;
    301     lat->total_lat += delta;
    302     for (i = 0 ; i < DEVIATIONS ; i++) {
    303         if (delta < deviations[i]) {
    304 	    lat->deviations[i]++;
    305 	    break;
    306 	}
    307     }
    308 }
    309 
    310 static void oper_list_add(struct io_oper *oper, struct io_oper **list)
    311 {
    312     if (!*list) {
    313         *list = oper;
    314 	oper->prev = oper->next = oper;
    315 	return;
    316     }
    317     oper->prev = (*list)->prev;
    318     oper->next = *list;
    319     (*list)->prev->next = oper;
    320     (*list)->prev = oper;
    321     return;
    322 }
    323 
    324 static void oper_list_del(struct io_oper *oper, struct io_oper **list)
    325 {
    326     if ((*list)->next == (*list)->prev && *list == (*list)->next) {
    327         *list = NULL;
    328 	return;
    329     }
    330     oper->prev->next = oper->next;
    331     oper->next->prev = oper->prev;
    332     if (*list == oper)
    333         *list = oper->next;
    334 }
    335 
    336 /* worker func to check error fields in the io unit */
    337 static int check_finished_io(struct io_unit *io) {
    338     int i;
    339     if (io->res != io->buf_size) {
    340 
    341   		 struct stat s;
    342   		 fstat(io->io_oper->fd, &s);
    343 
    344   		 /*
    345   		  * If file size is large enough for the read, then this short
    346   		  * read is an error.
    347   		  */
    348   		 if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) &&
    349   		     s.st_size > (io->iocb.u.c.offset + io->res)) {
    350 
    351   		 		 fprintf(stderr, "io err %lu (%s) op %d, off %Lu size %d\n",
    352   		 		 		 io->res, strerror(-io->res), io->iocb.aio_lio_opcode,
    353   		 		 		 io->iocb.u.c.offset, io->buf_size);
    354   		 		 io->io_oper->last_err = io->res;
    355   		 		 io->io_oper->num_err++;
    356   		 		 return -1;
    357   		 }
    358     }
    359     if (verify && io->io_oper->rw == READ) {
    360         if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) {
    361 	    fprintf(stderr, "verify error, file %s offset %Lu contents (offset:bad:good):\n",
    362 	            io->io_oper->file_name, io->iocb.u.c.offset);
    363 
    364 	    for (i = 0 ; i < io->io_oper->reclen ; i++) {
    365 	        if (io->buf[i] != verify_buf[i]) {
    366 		    fprintf(stderr, "%d:%c:%c ", i, io->buf[i], verify_buf[i]);
    367 		}
    368 	    }
    369 	    fprintf(stderr, "\n");
    370 	}
    371 
    372     }
    373     return 0;
    374 }
    375 
    376 /* worker func to check the busy bits and get an io unit ready for use */
    377 static int grab_iou(struct io_unit *io, struct io_oper *oper) {
    378     if (io->busy == IO_PENDING)
    379         return -1;
    380 
    381     io->busy = IO_PENDING;
    382     io->res = 0;
    383     io->io_oper = oper;
    384     return 0;
    385 }
    386 
    387 char *stage_name(int rw) {
    388     switch(rw) {
    389     case WRITE:
    390         return "write";
    391     case READ:
    392         return "read";
    393     case RWRITE:
    394         return "random write";
    395     case RREAD:
    396         return "random read";
    397     }
    398     return "unknown";
    399 }
    400 
    401 static inline double oper_mb_trans(struct io_oper *oper) {
    402     return ((double)oper->started_ios * (double)oper->reclen) /
    403                 (double)(1024 * 1024);
    404 }
    405 
    406 static void print_time(struct io_oper *oper) {
    407     double runtime;
    408     double tput;
    409     double mb;
    410 
    411     runtime = time_since_now(&oper->start_time);
    412     mb = oper_mb_trans(oper);
    413     tput = mb / runtime;
    414     fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n",
    415 	    stage_name(oper->rw), oper->file_name, tput, mb, runtime);
    416 }
    417 
    418 static void print_lat(char *str, struct io_latency *lat) {
    419     double avg = lat->total_lat / lat->total_io;
    420     int i;
    421     double total_counted = 0;
    422     fprintf(stderr, "%s min %.2f avg %.2f max %.2f\n\t",
    423             str, lat->min, avg, lat->max);
    424 
    425     for (i = 0 ; i < DEVIATIONS ; i++) {
    426 	fprintf(stderr, " %.0f < %d", lat->deviations[i], deviations[i]);
    427 	total_counted += lat->deviations[i];
    428     }
    429     if (total_counted && lat->total_io - total_counted)
    430         fprintf(stderr, " < %.0f", lat->total_io - total_counted);
    431     fprintf(stderr, "\n");
    432     memset(lat, 0, sizeof(*lat));
    433 }
    434 
    435 static void print_latency(struct thread_info *t)
    436 {
    437     struct io_latency *lat = &t->io_submit_latency;
    438     print_lat("latency", lat);
    439 }
    440 
    441 static void print_completion_latency(struct thread_info *t)
    442 {
    443     struct io_latency *lat = &t->io_completion_latency;
    444     print_lat("completion latency", lat);
    445 }
    446 
    447 /*
    448  * updates the fields in the io operation struct that belongs to this
    449  * io unit, and make the io unit reusable again
    450  */
    451 void finish_io(struct thread_info *t, struct io_unit *io, long result,
    452 		struct timeval *tv_now) {
    453     struct io_oper *oper = io->io_oper;
    454 
    455     calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency);
    456     io->res = result;
    457     io->busy = IO_FREE;
    458     io->next = t->free_ious;
    459     t->free_ious = io;
    460     oper->num_pending--;
    461     t->num_global_pending--;
    462     check_finished_io(io);
    463     if (oper->num_pending == 0 &&
    464        (oper->started_ios == oper->total_ios || oper->stonewalled))
    465     {
    466         print_time(oper);
    467     }
    468 }
    469 
    470 int read_some_events(struct thread_info *t) {
    471     struct io_unit *event_io;
    472     struct io_event *event;
    473     int nr;
    474     int i;
    475     int min_nr = io_iter;
    476     struct timeval stop_time;
    477 
    478     if (t->num_global_pending < io_iter)
    479         min_nr = t->num_global_pending;
    480 
    481 #ifdef NEW_GETEVENTS
    482     nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,NULL);
    483 #else
    484     nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL);
    485 #endif
    486     if (nr <= 0)
    487         return nr;
    488 
    489     gettimeofday(&stop_time, NULL);
    490     for (i = 0 ; i < nr ; i++) {
    491 	event = t->events + i;
    492 	event_io = (struct io_unit *)((unsigned long)event->obj);
    493 	finish_io(t, event_io, event->res, &stop_time);
    494     }
    495     return nr;
    496 }
    497 
    498 /*
    499  * finds a free io unit, waiting for pending requests if required.  returns
    500  * null if none could be found
    501  */
    502 static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper)
    503 {
    504     struct io_unit *event_io;
    505     int nr;
    506 
    507 retry:
    508     if (t->free_ious) {
    509         event_io = t->free_ious;
    510 	t->free_ious = t->free_ious->next;
    511 	if (grab_iou(event_io, oper)) {
    512 	    fprintf(stderr, "io unit on free list but not free\n");
    513 	    abort();
    514 	}
    515 	return event_io;
    516     }
    517     nr = read_some_events(t);
    518     if (nr > 0)
    519     	goto retry;
    520     else
    521     	fprintf(stderr, "no free ious after read_some_events\n");
    522     return NULL;
    523 }
    524 
    525 /*
    526  * wait for all pending requests for this io operation to finish
    527  */
    528 static int io_oper_wait(struct thread_info *t, struct io_oper *oper) {
    529     struct io_event event;
    530     struct io_unit *event_io;
    531 
    532     if (oper == NULL) {
    533         return 0;
    534     }
    535 
    536     if (oper->num_pending == 0)
    537         goto done;
    538 
    539     /* this func is not speed sensitive, no need to go wild reading
    540      * more than one event at a time
    541      */
    542 #ifdef NEW_GETEVENTS
    543     while(io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) {
    544 #else
    545     while(io_getevents(t->io_ctx, 1, &event, NULL) > 0) {
    546 #endif
    547 	struct timeval tv_now;
    548         event_io = (struct io_unit *)((unsigned long)event.obj);
    549 
    550 	gettimeofday(&tv_now, NULL);
    551 	finish_io(t, event_io, event.res, &tv_now);
    552 
    553 	if (oper->num_pending == 0)
    554 	    break;
    555     }
    556 done:
    557     if (oper->num_err) {
    558         fprintf(stderr, "%u errors on oper, last %u\n",
    559 	        oper->num_err, oper->last_err);
    560     }
    561     return 0;
    562 }
    563 
    564 off_t random_byte_offset(struct io_oper *oper) {
    565     off_t num;
    566     off_t rand_byte = oper->start;
    567     off_t range;
    568     off_t offset = 1;
    569 
    570     range = (oper->end - oper->start) / (1024 * 1024);
    571     if ((page_size_mask+1) > (1024 * 1024))
    572         offset = (page_size_mask+1) / (1024 * 1024);
    573     if (range < offset)
    574         range = 0;
    575     else
    576         range -= offset;
    577 
    578     /* find a random mb offset */
    579     num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0 ));
    580     rand_byte += num * 1024 * 1024;
    581 
    582     /* find a random byte offset */
    583     num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0));
    584 
    585     /* page align */
    586     num = (num + page_size_mask) & ~page_size_mask;
    587     rand_byte += num;
    588 
    589     if (rand_byte + oper->reclen > oper->end) {
    590 	rand_byte -= oper->reclen;
    591     }
    592     return rand_byte;
    593 }
    594 
    595 /*
    596  * build an aio iocb for an operation, based on oper->rw and the
    597  * last offset used.  This finds the struct io_unit that will be attached
    598  * to the iocb, and things are ready for submission to aio after this
    599  * is called.
    600  *
    601  * returns null on error
    602  */
    603 static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper)
    604 {
    605     struct io_unit *io;
    606     off_t rand_byte;
    607 
    608     io = find_iou(t, oper);
    609     if (!io) {
    610         fprintf(stderr, "unable to find io unit\n");
    611 	return NULL;
    612     }
    613 
    614     switch(oper->rw) {
    615     case WRITE:
    616         io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen,
    617 	               oper->last_offset);
    618 	oper->last_offset += oper->reclen;
    619 	break;
    620     case READ:
    621         io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen,
    622 	              oper->last_offset);
    623 	oper->last_offset += oper->reclen;
    624 	break;
    625     case RREAD:
    626 	rand_byte = random_byte_offset(oper);
    627 	oper->last_offset = rand_byte;
    628         io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen,
    629 	              rand_byte);
    630         break;
    631     case RWRITE:
    632 	rand_byte = random_byte_offset(oper);
    633 	oper->last_offset = rand_byte;
    634         io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen,
    635 	              rand_byte);
    636 
    637         break;
    638     }
    639 
    640     return io;
    641 }
    642 
    643 /*
    644  * wait for any pending requests, and then free all ram associated with
    645  * an operation.  returns the last error the operation hit (zero means none)
    646  */
    647 static int
    648 finish_oper(struct thread_info *t, struct io_oper *oper)
    649 {
    650     unsigned long last_err;
    651 
    652     io_oper_wait(t, oper);
    653     last_err = oper->last_err;
    654     if (oper->num_pending > 0) {
    655         fprintf(stderr, "oper num_pending is %d\n", oper->num_pending);
    656     }
    657     close(oper->fd);
    658     free(oper);
    659     return last_err;
    660 }
    661 
    662 /*
    663  * allocates an io operation and fills in all the fields.  returns
    664  * null on error
    665  */
    666 static struct io_oper *
    667 create_oper(int fd, int rw, off_t start, off_t end, int reclen, int depth,
    668             int iter, char *file_name)
    669 {
    670     struct io_oper *oper;
    671 
    672     oper = malloc (sizeof(*oper));
    673     if (!oper) {
    674 	fprintf(stderr, "unable to allocate io oper\n");
    675 	return NULL;
    676     }
    677     memset(oper, 0, sizeof(*oper));
    678 
    679     oper->depth = depth;
    680     oper->start = start;
    681     oper->end = end;
    682     oper->last_offset = oper->start;
    683     oper->fd = fd;
    684     oper->reclen = reclen;
    685     oper->rw = rw;
    686     oper->total_ios = (oper->end - oper->start) / oper->reclen;
    687     oper->file_name = file_name;
    688 
    689     return oper;
    690 }
    691 
    692 /*
    693  * does setup on num_ios worth of iocbs, but does not actually
    694  * start any io
    695  */
    696 int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios,
    697                struct iocb **my_iocbs)
    698 {
    699     int i;
    700     struct io_unit *io;
    701 
    702     if (oper->started_ios == 0)
    703 	gettimeofday(&oper->start_time, NULL);
    704 
    705     if (num_ios == 0)
    706         num_ios = oper->total_ios;
    707 
    708     if ((oper->started_ios + num_ios) > oper->total_ios)
    709         num_ios = oper->total_ios - oper->started_ios;
    710 
    711     for( i = 0 ; i < num_ios ; i++) {
    712 	io = build_iocb(t, oper);
    713 	if (!io) {
    714 	    return -1;
    715 	}
    716 	my_iocbs[i] = &io->iocb;
    717     }
    718     return num_ios;
    719 }
    720 
    721 /*
    722  * runs through the iocbs in the array provided and updates
    723  * counters in the associated oper struct
    724  */
    725 static void update_iou_counters(struct iocb **my_iocbs, int nr,
    726 	struct timeval *tv_now)
    727 {
    728     struct io_unit *io;
    729     int i;
    730     for (i = 0 ; i < nr ; i++) {
    731 	io = (struct io_unit *)(my_iocbs[i]);
    732 	io->io_oper->num_pending++;
    733 	io->io_oper->started_ios++;
    734 	io->io_start_time = *tv_now;	/* set time of io_submit */
    735     }
    736 }
    737 
    738 /* starts some io for a given file, returns zero if all went well */
    739 int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs)
    740 {
    741     int ret;
    742     struct timeval start_time;
    743     struct timeval stop_time;
    744 
    745 resubmit:
    746     gettimeofday(&start_time, NULL);
    747     ret = io_submit(t->io_ctx, num_ios, my_iocbs);
    748     gettimeofday(&stop_time, NULL);
    749     calc_latency(&start_time, &stop_time, &t->io_submit_latency);
    750 
    751     if (ret != num_ios) {
    752 	/* some ios got through */
    753 	if (ret > 0) {
    754 	    update_iou_counters(my_iocbs, ret, &stop_time);
    755 	    my_iocbs += ret;
    756 	    t->num_global_pending += ret;
    757 	    num_ios -= ret;
    758 	}
    759 	/*
    760 	 * we've used all the requests allocated in aio_init, wait and
    761 	 * retry
    762 	 */
    763 	if (ret > 0 || ret == -EAGAIN) {
    764 	    int old_ret = ret;
    765 	    if ((ret = read_some_events(t) > 0)) {
    766 		goto resubmit;
    767 	    } else {
    768 	    	fprintf(stderr, "ret was %d and now is %d\n", ret, old_ret);
    769 		abort();
    770 	    }
    771 	}
    772 
    773 	fprintf(stderr, "ret %d (%s) on io_submit\n", ret, strerror(-ret));
    774 	return -1;
    775     }
    776     update_iou_counters(my_iocbs, ret, &stop_time);
    777     t->num_global_pending += ret;
    778     return 0;
    779 }
    780 
    781 /*
    782  * changes oper->rw to the next in a command sequence, or returns zero
    783  * to say this operation is really, completely done for
    784  */
    785 static int restart_oper(struct io_oper *oper) {
    786     int new_rw  = 0;
    787     if (oper->last_err)
    788         return 0;
    789 
    790     /* this switch falls through */
    791     switch(oper->rw) {
    792     case WRITE:
    793 	if (stages & (1 << READ))
    794 	    new_rw = READ;
    795     case READ:
    796 	if (!new_rw && stages & (1 << RWRITE))
    797 	    new_rw = RWRITE;
    798     case RWRITE:
    799 	if (!new_rw && stages & (1 << RREAD))
    800 	    new_rw = RREAD;
    801     }
    802 
    803     if (new_rw) {
    804 	oper->started_ios = 0;
    805 	oper->last_offset = oper->start;
    806 	oper->stonewalled = 0;
    807 
    808 	/*
    809 	 * we're restarting an operation with pending requests, so the
    810 	 * timing info won't be printed by finish_io.  Printing it here
    811 	 */
    812 	if (oper->num_pending)
    813 	    print_time(oper);
    814 
    815 	oper->rw = new_rw;
    816 	return 1;
    817     }
    818     return 0;
    819 }
    820 
    821 static int oper_runnable(struct io_oper *oper) {
    822     struct stat buf;
    823     int ret;
    824 
    825     /* first context is always runnable, if started_ios > 0, no need to
    826      * redo the calculations
    827      */
    828     if (oper->started_ios || oper->start == 0)
    829         return 1;
    830     /*
    831      * only the sequential phases force delays in starting */
    832     if (oper->rw >= RWRITE)
    833         return 1;
    834     ret = fstat(oper->fd, &buf);
    835     if (ret < 0) {
    836         perror("fstat");
    837 	exit(1);
    838     }
    839     if (S_ISREG(buf.st_mode) && buf.st_size < oper->start)
    840         return 0;
    841     return 1;
    842 }
    843 
    844 /*
    845  * runs through all the io operations on the active list, and starts
    846  * a chunk of io on each.  If any io operations are completely finished,
    847  * it either switches them to the next stage or puts them on the
    848  * finished list.
    849  *
    850  * this function stops after max_io_submit iocbs are sent down the
    851  * pipe, even if it has not yet touched all the operations on the
    852  * active list.  Any operations that have finished are moved onto
    853  * the finished_opers list.
    854  */
    855 static int run_active_list(struct thread_info *t,
    856 			 int io_iter,
    857 			 int max_io_submit)
    858 {
    859     struct io_oper *oper;
    860     struct io_oper *built_opers = NULL;
    861     struct iocb **my_iocbs = t->iocbs;
    862     int ret = 0;
    863     int num_built = 0;
    864 
    865     oper = t->active_opers;
    866     while(oper) {
    867 	if (!oper_runnable(oper)) {
    868 	    oper = oper->next;
    869 	    if (oper == t->active_opers)
    870 	        break;
    871 	    continue;
    872 	}
    873 	ret = build_oper(t, oper, io_iter, my_iocbs);
    874 	if (ret >= 0) {
    875 	    my_iocbs += ret;
    876 	    num_built += ret;
    877 	    oper_list_del(oper, &t->active_opers);
    878 	    oper_list_add(oper, &built_opers);
    879 	    oper = t->active_opers;
    880 	    if (num_built + io_iter > max_io_submit)
    881 	        break;
    882 	} else
    883 	    break;
    884     }
    885     if (num_built) {
    886 	ret = run_built(t, num_built, t->iocbs);
    887 	if (ret < 0) {
    888 	    fprintf(stderr, "error %d on run_built\n", ret);
    889 	    exit(1);
    890 	}
    891 	while(built_opers) {
    892 	    oper = built_opers;
    893 	    oper_list_del(oper, &built_opers);
    894 	    oper_list_add(oper, &t->active_opers);
    895 	    if (oper->started_ios == oper->total_ios) {
    896 		oper_list_del(oper, &t->active_opers);
    897 		oper_list_add(oper, &t->finished_opers);
    898 	    }
    899 	}
    900     }
    901     return 0;
    902 }
    903 
    904 void drop_shm() {
    905     int ret;
    906     struct shmid_ds ds;
    907     if (use_shm != USE_SHM)
    908         return;
    909 
    910     ret = shmctl(shm_id, IPC_RMID, &ds);
    911     if (ret) {
    912         perror("shmctl IPC_RMID");
    913     }
    914 }
    915 
    916 void aio_setup(io_context_t *io_ctx, int n)
    917 {
    918     int res = io_queue_init(n, io_ctx);
    919     if (res != 0) {
    920 	fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n",
    921 		n, res, strerror(-res));
    922 	exit(3);
    923     }
    924 }
    925 
    926 /*
    927  * allocate io operation and event arrays for a given thread
    928  */
    929 int setup_ious(struct thread_info *t,
    930               int num_files, int depth,
    931 	      int reclen, int max_io_submit) {
    932     int i;
    933     size_t bytes = num_files * depth * sizeof(*t->ios);
    934 
    935     t->ios = malloc(bytes);
    936     if (!t->ios) {
    937 	fprintf(stderr, "unable to allocate io units\n");
    938 	return -1;
    939     }
    940     memset(t->ios, 0, bytes);
    941 
    942     for (i = 0 ; i < depth * num_files; i++) {
    943 	t->ios[i].buf = aligned_buffer;
    944 	aligned_buffer += padded_reclen;
    945 	t->ios[i].buf_size = reclen;
    946 	if (verify)
    947 	    memset(t->ios[i].buf, 'b', reclen);
    948 	else
    949 	    memset(t->ios[i].buf, 0, reclen);
    950 	t->ios[i].next = t->free_ious;
    951 	t->free_ious = t->ios + i;
    952     }
    953     if (verify) {
    954         verify_buf = aligned_buffer;
    955         memset(verify_buf, 'b', reclen);
    956     }
    957 
    958     t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit);
    959     if (!t->iocbs) {
    960         fprintf(stderr, "unable to allocate iocbs\n");
    961 	goto free_buffers;
    962     }
    963 
    964     memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *));
    965 
    966     t->events = malloc(sizeof(struct io_event) * depth * num_files);
    967     if (!t->events) {
    968         fprintf(stderr, "unable to allocate ram for events\n");
    969 	goto free_buffers;
    970     }
    971     memset(t->events, 0, num_files * sizeof(struct io_event)*depth);
    972 
    973     t->num_global_ios = num_files * depth;
    974     t->num_global_events = t->num_global_ios;
    975     return 0;
    976 
    977 free_buffers:
    978     if (t->ios)
    979         free(t->ios);
    980     if (t->iocbs)
    981         free(t->iocbs);
    982     if (t->events)
    983         free(t->events);
    984     return -1;
    985 }
    986 
    987 /*
    988  * The buffers used for file data are allocated as a single big
    989  * malloc, and then each thread and operation takes a piece and uses
    990  * that for file data.  This lets us do a large shm or bigpages alloc
    991  * and without trying to find a special place in each thread to map the
    992  * buffers to
    993  */
    994 int setup_shared_mem(int num_threads, int num_files, int depth,
    995                      int reclen, int max_io_submit)
    996 {
    997     char *p = NULL;
    998     size_t total_ram;
    999 
   1000     padded_reclen = (reclen + page_size_mask) / (page_size_mask+1);
   1001     padded_reclen = padded_reclen * (page_size_mask+1);
   1002     total_ram = num_files * depth * padded_reclen + num_threads;
   1003     if (verify)
   1004     	total_ram += padded_reclen;
   1005 
   1006     if (use_shm == USE_MALLOC) {
   1007 	p = malloc(total_ram + page_size_mask);
   1008     } else if (use_shm == USE_SHM) {
   1009         shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700);
   1010 	if (shm_id < 0) {
   1011 	    perror("shmget");
   1012 	    drop_shm();
   1013 	    goto free_buffers;
   1014 	}
   1015 	p = shmat(shm_id, (char *)0x50000000, 0);
   1016         if ((long)p == -1) {
   1017 	    perror("shmat");
   1018 	    goto free_buffers;
   1019 	}
   1020 	/* won't really be dropped until we shmdt */
   1021 	drop_shm();
   1022     } else if (use_shm == USE_SHMFS) {
   1023         char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */
   1024 	int fd;
   1025 
   1026 	strcpy(mmap_name, "/dev/shm/XXXXXX");
   1027 	fd = mkstemp(mmap_name);
   1028         if (fd < 0) {
   1029 	    perror("mkstemp");
   1030 	    goto free_buffers;
   1031 	}
   1032 	unlink(mmap_name);
   1033 	ftruncate(fd, total_ram);
   1034 	shm_id = fd;
   1035 	p = mmap((char *)0x50000000, total_ram,
   1036 	         PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
   1037 
   1038         if (p == MAP_FAILED) {
   1039 	    perror("mmap");
   1040 	    goto free_buffers;
   1041 	}
   1042     }
   1043     if (!p) {
   1044         fprintf(stderr, "unable to allocate buffers\n");
   1045 	goto free_buffers;
   1046     }
   1047     unaligned_buffer = p;
   1048     p = (char*)((intptr_t) (p + page_size_mask) & ~page_size_mask);
   1049     aligned_buffer = p;
   1050     return 0;
   1051 
   1052 free_buffers:
   1053     drop_shm();
   1054     if (unaligned_buffer)
   1055         free(unaligned_buffer);
   1056     return -1;
   1057 }
   1058 
   1059 /*
   1060  * runs through all the thread_info structs and calculates a combined
   1061  * throughput
   1062  */
   1063 void global_thread_throughput(struct thread_info *t, char *this_stage) {
   1064     int i;
   1065     double runtime = time_since_now(&global_stage_start_time);
   1066     double total_mb = 0;
   1067     double min_trans = 0;
   1068 
   1069     for (i = 0 ; i < num_threads ; i++) {
   1070         total_mb += global_thread_info[i].stage_mb_trans;
   1071 	if (!min_trans || t->stage_mb_trans < min_trans)
   1072 	    min_trans = t->stage_mb_trans;
   1073     }
   1074     if (total_mb) {
   1075 	fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage,
   1076 	        total_mb / runtime);
   1077 	fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime);
   1078         if (stonewall)
   1079 	    fprintf(stderr, " min transfer %.2fMB", min_trans);
   1080         fprintf(stderr, "\n");
   1081     }
   1082 }
   1083 
   1084 
   1085 /* this is the meat of the state machine.  There is a list of
   1086  * active operations structs, and as each one finishes the required
   1087  * io it is moved to a list of finished operations.  Once they have
   1088  * all finished whatever stage they were in, they are given the chance
   1089  * to restart and pick a different stage (read/write/random read etc)
   1090  *
   1091  * various timings are printed in between the stages, along with
   1092  * thread synchronization if there are more than one threads.
   1093  */
   1094 int worker(struct thread_info *t)
   1095 {
   1096     struct io_oper *oper;
   1097     char *this_stage = NULL;
   1098     struct timeval stage_time;
   1099     int status = 0;
   1100     int iteration = 0;
   1101     int cnt;
   1102 
   1103     aio_setup(&t->io_ctx, 512);
   1104 
   1105 restart:
   1106     if (num_threads > 1) {
   1107         pthread_mutex_lock(&stage_mutex);
   1108 	threads_starting++;
   1109 	if (threads_starting == num_threads) {
   1110 	    threads_ending = 0;
   1111 	    gettimeofday(&global_stage_start_time, NULL);
   1112 	    pthread_cond_broadcast(&stage_cond);
   1113 	}
   1114 	while (threads_starting != num_threads)
   1115 	    pthread_cond_wait(&stage_cond, &stage_mutex);
   1116         pthread_mutex_unlock(&stage_mutex);
   1117     }
   1118     if (t->active_opers) {
   1119         this_stage = stage_name(t->active_opers->rw);
   1120 	gettimeofday(&stage_time, NULL);
   1121 	t->stage_mb_trans = 0;
   1122     }
   1123 
   1124     cnt = 0;
   1125     /* first we send everything through aio */
   1126     while(t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
   1127 	if (stonewall && threads_ending) {
   1128 	    oper = t->active_opers;
   1129 	    oper->stonewalled = 1;
   1130 	    oper_list_del(oper, &t->active_opers);
   1131 	    oper_list_add(oper, &t->finished_opers);
   1132 	} else {
   1133 	    run_active_list(t, io_iter,  max_io_submit);
   1134         }
   1135 	cnt++;
   1136     }
   1137     if (latency_stats)
   1138         print_latency(t);
   1139 
   1140     if (completion_latency_stats)
   1141 	print_completion_latency(t);
   1142 
   1143     /* then we wait for all the operations to finish */
   1144     oper = t->finished_opers;
   1145     do {
   1146 	if (!oper)
   1147 		break;
   1148 	io_oper_wait(t, oper);
   1149 	oper = oper->next;
   1150     } while(oper != t->finished_opers);
   1151 
   1152     /* then we do an fsync to get the timing for any future operations
   1153      * right, and check to see if any of these need to get restarted
   1154      */
   1155     oper = t->finished_opers;
   1156     while(oper) {
   1157 	if (fsync_stages)
   1158             fsync(oper->fd);
   1159 	t->stage_mb_trans += oper_mb_trans(oper);
   1160 	if (restart_oper(oper)) {
   1161 	    oper_list_del(oper, &t->finished_opers);
   1162 	    oper_list_add(oper, &t->active_opers);
   1163 	    oper = t->finished_opers;
   1164 	    continue;
   1165 	}
   1166 	oper = oper->next;
   1167 	if (oper == t->finished_opers)
   1168 	    break;
   1169     }
   1170 
   1171     if (t->stage_mb_trans && t->num_files > 0) {
   1172         double seconds = time_since_now(&stage_time);
   1173 	fprintf(stderr, "thread %d %s totals (%.2f MB/s) %.2f MB in %.2fs\n",
   1174 	        t - global_thread_info, this_stage, t->stage_mb_trans/seconds,
   1175 		t->stage_mb_trans, seconds);
   1176     }
   1177 
   1178     if (num_threads > 1) {
   1179 	pthread_mutex_lock(&stage_mutex);
   1180 	threads_ending++;
   1181 	if (threads_ending == num_threads) {
   1182 	    threads_starting = 0;
   1183 	    pthread_cond_broadcast(&stage_cond);
   1184 	    global_thread_throughput(t, this_stage);
   1185 	}
   1186 	while(threads_ending != num_threads)
   1187 	    pthread_cond_wait(&stage_cond, &stage_mutex);
   1188 	pthread_mutex_unlock(&stage_mutex);
   1189     }
   1190 
   1191     /* someone got restarted, go back to the beginning */
   1192     if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
   1193 	iteration++;
   1194         goto restart;
   1195     }
   1196 
   1197     /* finally, free all the ram */
   1198     while(t->finished_opers) {
   1199 	oper = t->finished_opers;
   1200 	oper_list_del(oper, &t->finished_opers);
   1201 	status = finish_oper(t, oper);
   1202     }
   1203 
   1204     if (t->num_global_pending) {
   1205         fprintf(stderr, "global num pending is %d\n", t->num_global_pending);
   1206     }
   1207     io_queue_release(t->io_ctx);
   1208 
   1209     return status;
   1210 }
   1211 
   1212 typedef void * (*start_routine)(void *);
   1213 int run_workers(struct thread_info *t, int num_threads)
   1214 {
   1215     int ret;
   1216     int thread_ret;
   1217     int i;
   1218 
   1219     for(i = 0 ; i < num_threads ; i++) {
   1220         ret = pthread_create(&t[i].tid, NULL, (start_routine)worker, t + i);
   1221 	if (ret) {
   1222 	    perror("pthread_create");
   1223 	    exit(1);
   1224 	}
   1225     }
   1226     for(i = 0 ; i < num_threads ; i++) {
   1227         ret = pthread_join(t[i].tid, (void *)&thread_ret);
   1228         if (ret) {
   1229 	    perror("pthread_join");
   1230 	    exit(1);
   1231 	}
   1232     }
   1233     return 0;
   1234 }
   1235 
   1236 off_t parse_size(char *size_arg, off_t mult) {
   1237     char c;
   1238     int num;
   1239     off_t ret;
   1240     c = size_arg[strlen(size_arg) - 1];
   1241     if (c > '9') {
   1242         size_arg[strlen(size_arg) - 1] = '\0';
   1243     }
   1244     num = atoi(size_arg);
   1245     switch(c) {
   1246     case 'g':
   1247     case 'G':
   1248         mult = 1024 * 1024 * 1024;
   1249 	break;
   1250     case 'm':
   1251     case 'M':
   1252         mult = 1024 * 1024;
   1253 	break;
   1254     case 'k':
   1255     case 'K':
   1256         mult = 1024;
   1257 	break;
   1258     case 'b':
   1259     case 'B':
   1260         mult = 1;
   1261 	break;
   1262     }
   1263     ret = mult * num;
   1264     return ret;
   1265 }
   1266 
   1267 void print_usage(void) {
   1268     printf("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n");
   1269     printf("                  [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n");
   1270     printf("                  file1 [file2 ...]\n");
   1271     printf("\t-a size in KB at which to align buffers\n");
   1272     printf("\t-b max number of iocbs to give io_submit at once\n");
   1273     printf("\t-c number of io contexts per file\n");
   1274     printf("\t-C offset between contexts, default 2MB\n");
   1275     printf("\t-s size in MB of the test file(s), default 1024MB\n");
   1276     printf("\t-r record size in KB used for each io, default 64KB\n");
   1277     printf("\t-d number of pending aio requests for each file, default 64\n");
   1278     printf("\t-i number of ios per file sent before switching\n\t   to the next file, default 8\n");
   1279     printf("\t-I total number of ayncs IOs the program will run, default is run until Cntl-C\n");
   1280     printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n");
   1281     printf("\t-S Use O_SYNC for writes\n");
   1282     printf("\t-o add an operation to the list: write=0, read=1,\n");
   1283     printf("\t   random write=2, random read=3.\n");
   1284     printf("\t   repeat -o to specify multiple ops: -o 0 -o 1 etc.\n");
   1285     printf("\t-m shm use ipc shared memory for io buffers instead of malloc\n");
   1286     printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n");
   1287     printf("\t-n no fsyncs between write stage and read stage\n");
   1288     printf("\t-l print io_submit latencies after each stage\n");
   1289     printf("\t-L print io completion latencies after each stage\n");
   1290     printf("\t-t number of threads to run\n");
   1291     printf("\t-u unlink files after completion\n");
   1292     printf("\t-v verification of bytes written\n");
   1293     printf("\t-x turn off thread stonewalling\n");
   1294     printf("\t-h this message\n");
   1295     printf("\n\t   the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n");
   1296     printf("\t   translate to 400KB, 400MB and 400GB\n");
   1297     printf("version %s\n", PROG_VERSION);
   1298 }
   1299 
   1300 int main(int ac, char **av)
   1301 {
   1302     int rwfd;
   1303     int i;
   1304     int j;
   1305     int c;
   1306 
   1307     off_t file_size = 1 * 1024 * 1024 * 1024;
   1308     int first_stage = WRITE;
   1309     struct io_oper *oper;
   1310     int status = 0;
   1311     int num_files = 0;
   1312     int open_fds = 0;
   1313     struct thread_info *t;
   1314 
   1315     page_size_mask = getpagesize() - 1;
   1316 
   1317     while(1) {
   1318 	c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lLnhOSxvu");
   1319 	if  (c < 0)
   1320 	    break;
   1321 
   1322         switch(c) {
   1323 	case 'a':
   1324 	    page_size_mask = parse_size(optarg, 1024);
   1325 	    page_size_mask--;
   1326 	    break;
   1327 	case 'c':
   1328 	    num_contexts = atoi(optarg);
   1329 	    break;
   1330 	case 'C':
   1331 	    context_offset = parse_size(optarg, 1024 * 1024);
   1332 	case 'b':
   1333 	    max_io_submit = atoi(optarg);
   1334 	    break;
   1335 	case 's':
   1336 	    file_size = parse_size(optarg, 1024 * 1024);
   1337 	    break;
   1338 	case 'd':
   1339 	    depth = atoi(optarg);
   1340 	    break;
   1341 	case 'r':
   1342 	    rec_len = parse_size(optarg, 1024);
   1343 	    break;
   1344 	case 'i':
   1345 	    io_iter = atoi(optarg);
   1346 	    break;
   1347         case 'I':
   1348           iterations = atoi(optarg);
   1349         break;
   1350 	case 'n':
   1351 	    fsync_stages = 0;
   1352 	    break;
   1353 	case 'l':
   1354 	    latency_stats = 1;
   1355 	    break;
   1356 	case 'L':
   1357 	    completion_latency_stats = 1;
   1358 	    break;
   1359 	case 'm':
   1360 	    if (!strcmp(optarg, "shm")) {
   1361 		fprintf(stderr, "using ipc shm\n");
   1362 	        use_shm = USE_SHM;
   1363 	    } else if (!strcmp(optarg, "shmfs")) {
   1364 	        fprintf(stderr, "using /dev/shm for buffers\n");
   1365 		use_shm = USE_SHMFS;
   1366 	    }
   1367 	    break;
   1368 	case 'o':
   1369 	    i = atoi(optarg);
   1370 	    stages |= 1 << i;
   1371 	    fprintf(stderr, "adding stage %s\n", stage_name(i));
   1372 	    break;
   1373 	case 'O':
   1374 	    o_direct = O_DIRECT;
   1375 	    break;
   1376 	case 'S':
   1377 	    o_sync = O_SYNC;
   1378 	    break;
   1379 	case 't':
   1380 	    num_threads = atoi(optarg);
   1381 	    break;
   1382 	case 'x':
   1383 	    stonewall = 0;
   1384 	    break;
   1385 	case 'u':
   1386 	    unlink_files = 1;
   1387 	    break;
   1388 	case 'v':
   1389 	    verify = 1;
   1390 	    break;
   1391 	case 'h':
   1392 	default:
   1393 	    print_usage();
   1394 	    exit(1);
   1395 	}
   1396     }
   1397 
   1398     /*
   1399      * make sure we don't try to submit more ios than we have allocated
   1400      * memory for
   1401      */
   1402     if (depth < io_iter) {
   1403 	io_iter = depth;
   1404         fprintf(stderr, "dropping io_iter to %d\n", io_iter);
   1405     }
   1406 
   1407     if (optind >= ac) {
   1408 	print_usage();
   1409 	exit(1);
   1410     }
   1411 
   1412     num_files = ac - optind;
   1413 
   1414     if (num_threads > (num_files * num_contexts)) {
   1415         num_threads = num_files * num_contexts;
   1416 	fprintf(stderr, "dropping thread count to the number of contexts %d\n",
   1417 	        num_threads);
   1418     }
   1419 
   1420     t = malloc(num_threads * sizeof(*t));
   1421     if (!t) {
   1422         perror("malloc");
   1423 	exit(1);
   1424     }
   1425     global_thread_info = t;
   1426 
   1427     /* by default, allow a huge number of iocbs to be sent towards
   1428      * io_submit
   1429      */
   1430     if (!max_io_submit)
   1431         max_io_submit = num_files * io_iter * num_contexts;
   1432 
   1433     /*
   1434      * make sure we don't try to submit more ios than max_io_submit allows
   1435      */
   1436     if (max_io_submit < io_iter) {
   1437         io_iter = max_io_submit;
   1438 	fprintf(stderr, "dropping io_iter to %d\n", io_iter);
   1439     }
   1440 
   1441     if (!stages) {
   1442         stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE);
   1443     } else {
   1444         for (i = 0 ; i < LAST_STAGE; i++) {
   1445 	    if (stages & (1 << i)) {
   1446 	        first_stage = i;
   1447 		fprintf(stderr, "starting with %s\n", stage_name(i));
   1448 		break;
   1449 	    }
   1450 	}
   1451     }
   1452 
   1453     if (file_size < num_contexts * context_offset) {
   1454         fprintf(stderr, "file size %Lu too small for %d contexts\n",
   1455 	        file_size, num_contexts);
   1456 	exit(1);
   1457     }
   1458 
   1459     fprintf(stderr, "file size %LuMB, record size %luKB, depth %d, ios per iteration %d\n", file_size / (1024 * 1024), rec_len / 1024, depth, io_iter);
   1460     fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n",
   1461             max_io_submit, (page_size_mask + 1)/1024);
   1462     fprintf(stderr, "threads %d files %d contexts %d context offset %LuMB verification %s\n",
   1463             num_threads, num_files, num_contexts,
   1464 	    context_offset / (1024 * 1024), verify ? "on" : "off");
   1465     /* open all the files and do any required setup for them */
   1466     for (i = optind ; i < ac ; i++) {
   1467 	int thread_index;
   1468 	for (j = 0 ; j < num_contexts ; j++) {
   1469 	    thread_index = open_fds % num_threads;
   1470 	    open_fds++;
   1471 
   1472 	    rwfd = open(av[i], O_CREAT | O_RDWR | o_direct | o_sync, 0600);
   1473 	    assert(rwfd != -1);
   1474 
   1475 	    oper = create_oper(rwfd, first_stage, j * context_offset,
   1476 	                       file_size - j * context_offset, rec_len,
   1477 			       depth, io_iter, av[i]);
   1478 	    if (!oper) {
   1479 		fprintf(stderr, "error in create_oper\n");
   1480 		exit(-1);
   1481 	    }
   1482 	    oper_list_add(oper, &t[thread_index].active_opers);
   1483 	    t[thread_index].num_files++;
   1484 	}
   1485     }
   1486     if (setup_shared_mem(num_threads, num_files * num_contexts,
   1487                          depth, rec_len, max_io_submit))
   1488     {
   1489         exit(1);
   1490     }
   1491     for (i = 0 ; i < num_threads ; i++) {
   1492 	if (setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit))
   1493 		exit(1);
   1494     }
   1495     if (num_threads > 1){
   1496         printf("Running multi thread version num_threads:%d\n", num_threads);
   1497         run_workers(t, num_threads);
   1498     } else {
   1499         printf("Running single thread version \n");
   1500 	status = worker(t);
   1501     }
   1502     if (unlink_files) {
   1503 	for (i = optind ; i < ac ; i++) {
   1504 	    printf("Cleaning up file %s \n", av[i]);
   1505 	    unlink(av[i]);
   1506 	}
   1507     }
   1508 
   1509     if (status) {
   1510 	exit(1);
   1511     }
   1512     return status;
   1513 }
   1514 
   1515