Home | History | Annotate | Download | only in ltp-aiodio
      1 /*
      2  * Copyright (c) 2004 SuSE, Inc.  All Rights Reserved.
      3  *
      4  * This program is free software; you can redistribute it and/or modify it
      5  * under the terms of version 2 of the GNU General Public License as
      6  * published by the Free Software Foundation.
      7  *
      8  * This program is distributed in the hope that it would be useful, but
      9  * WITHOUT ANY WARRANTY; without even the implied warranty of
     10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
     11  *
     12  * Further, this software is distributed without any warranty that it is
     13  * free of the rightful claim of any third person regarding infringement
     14  * or the like.  Any license provided herein, whether implied or
     15  * otherwise, applies only to this software file.  Patent licenses, if
     16  * any, provided herein do not apply to combinations of this program with
     17  * other software, or any other product whatsoever.
     18  *
     19  * You should have received a copy of the GNU General Public License along
     20  * with this program; if not, write the Free Software Foundation, Inc.,
     21  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
     22  *
     23  * Contact information: Silicon Graphics, Inc., 1600 Amphitheatre Pkwy,
     24  * Mountain View, CA  94043, or:
     25  *
     26  *
     27  * aio-stress
     28  *
     29  * will open or create each file on the command line, and start a series
     30  * of aio to it.
     31  *
     32  * aio is done in a rotating loop.  first file1 gets 8 requests, then
     33  * file2, then file3 etc.  As each file finishes writing, it is switched
     34  * to reads
     35  *
     36  * io buffers are aligned in case you want to do raw io
     37  *
     38  * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c
     39  *
     40  * run aio-stress -h to see the options
     41  *
     42  * Please mail Chris Mason (mason (at) suse.com) with bug reports or patches
     43  */
     44 #define _FILE_OFFSET_BITS 64
     45 #define PROG_VERSION "0.21"
     46 #define NEW_GETEVENTS
     47 
     48 #define _GNU_SOURCE
     49 #include <stdio.h>
     50 #include <errno.h>
     51 #include <assert.h>
     52 #include <stdlib.h>
     53 #include <sys/types.h>
     54 #include <sys/stat.h>
     55 #include <fcntl.h>
     56 #include <unistd.h>
     57 #include <sys/time.h>
     58 #include <sys/ipc.h>
     59 #include <sys/shm.h>
     60 #include <sys/mman.h>
     61 #include <string.h>
     62 #include <pthread.h>
     63 
     64 #include "config.h"
     65 #include "tst_res_flags.h"
     66 
     67 #ifdef HAVE_LIBAIO
     68 #include <libaio.h>
     69 
     70 #define IO_FREE 0
     71 #define IO_PENDING 1
     72 #define RUN_FOREVER -1
     73 
     74 enum {
     75 	WRITE,
     76 	READ,
     77 	RWRITE,
     78 	RREAD,
     79 	LAST_STAGE,
     80 };
     81 
     82 #define USE_MALLOC 0
     83 #define USE_SHM 1
     84 #define USE_SHMFS 2
     85 
     86 /*
     87  * various globals, these are effectively read only by the time the threads
     88  * are started
     89  */
     90 long stages = 0;
     91 unsigned long page_size_mask;
     92 int o_direct = 0;
     93 int o_sync = 0;
     94 int latency_stats = 0;
     95 int completion_latency_stats = 0;
     96 int io_iter = 8;
     97 int iterations = RUN_FOREVER;
     98 int max_io_submit = 0;
     99 long rec_len = 64 * 1024;
    100 int depth = 64;
    101 int num_threads = 1;
    102 int num_contexts = 1;
    103 off_t context_offset = 2 * 1024 * 1024;
    104 int fsync_stages = 1;
    105 int use_shm = 0;
    106 int shm_id;
    107 char *unaligned_buffer = NULL;
    108 char *aligned_buffer = NULL;
    109 int padded_reclen = 0;
    110 int stonewall = 1;
    111 int verify = 0;
    112 char *verify_buf = NULL;
    113 int unlink_files = 0;
    114 
    115 struct io_unit;
    116 struct thread_info;
    117 
    118 /* pthread mutexes and other globals for keeping the threads in sync */
    119 pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER;
    120 pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER;
    121 int threads_ending = 0;
    122 int threads_starting = 0;
    123 struct timeval global_stage_start_time;
    124 struct thread_info *global_thread_info;
    125 
    126 /*
    127  * latencies during io_submit are measured, these are the
    128  * granularities for deviations
    129  */
    130 #define DEVIATIONS 6
    131 int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 };
    132 
    133 struct io_latency {
    134 	double max;
    135 	double min;
    136 	double total_io;
    137 	double total_lat;
    138 	double deviations[DEVIATIONS];
    139 };
    140 
    141 /* container for a series of operations to a file */
    142 struct io_oper {
    143 	/* already open file descriptor, valid for whatever operation you want */
    144 	int fd;
    145 
    146 	/* starting byte of the operation */
    147 	off_t start;
    148 
    149 	/* ending byte of the operation */
    150 	off_t end;
    151 
    152 	/* size of the read/write buffer */
    153 	int reclen;
    154 
    155 	/* max number of pending requests before a wait is triggered */
    156 	int depth;
    157 
    158 	/* current number of pending requests */
    159 	int num_pending;
    160 
    161 	/* last error, zero if there were none */
    162 	int last_err;
    163 
    164 	/* total number of errors hit. */
    165 	int num_err;
    166 
    167 	/* read,write, random, etc */
    168 	int rw;
    169 
    170 	/* number of I/O that will get sent to aio */
    171 	int total_ios;
    172 
    173 	/* number of I/O we've already sent */
    174 	int started_ios;
    175 
    176 	/* last offset used in an io operation */
    177 	off_t last_offset;
    178 
    179 	/* stonewalled = 1 when we got cut off before submitting all our I/O */
    180 	int stonewalled;
    181 
    182 	/* list management */
    183 	struct io_oper *next;
    184 	struct io_oper *prev;
    185 
    186 	struct timeval start_time;
    187 
    188 	char *file_name;
    189 };
    190 
    191 /* a single io, and all the tracking needed for it */
    192 struct io_unit {
    193 	/* note, iocb must go first! */
    194 	struct iocb iocb;
    195 
    196 	/* pointer to parent io operation struct */
    197 	struct io_oper *io_oper;
    198 
    199 	/* aligned buffer */
    200 	char *buf;
    201 
    202 	/* size of the aligned buffer (record size) */
    203 	int buf_size;
    204 
    205 	/* state of this io unit (free, pending, done) */
    206 	int busy;
    207 
    208 	/* result of last operation */
    209 	long res;
    210 
    211 	struct io_unit *next;
    212 
    213 	struct timeval io_start_time;	/* time of io_submit */
    214 };
    215 
    216 struct thread_info {
    217 	io_context_t io_ctx;
    218 	pthread_t tid;
    219 
    220 	/* allocated array of io_unit structs */
    221 	struct io_unit *ios;
    222 
    223 	/* list of io units available for io */
    224 	struct io_unit *free_ious;
    225 
    226 	/* number of io units in the I/O array */
    227 	int num_global_ios;
    228 
    229 	/* number of io units in flight */
    230 	int num_global_pending;
    231 
    232 	/* preallocated array of iocb pointers, only used in run_active */
    233 	struct iocb **iocbs;
    234 
    235 	/* preallocated array of events */
    236 	struct io_event *events;
    237 
    238 	/* size of the events array */
    239 	int num_global_events;
    240 
    241 	/* latency stats for io_submit */
    242 	struct io_latency io_submit_latency;
    243 
    244 	/* list of operations still in progress, and of those finished */
    245 	struct io_oper *active_opers;
    246 	struct io_oper *finished_opers;
    247 
    248 	/* number of files this thread is doing io on */
    249 	int num_files;
    250 
    251 	/* how much io this thread did in the last stage */
    252 	double stage_mb_trans;
    253 
    254 	/* latency completion stats i/o time from io_submit until io_getevents */
    255 	struct io_latency io_completion_latency;
    256 };
    257 
    258 /*
    259  * return seconds between start_tv and stop_tv in double precision
    260  */
    261 static double time_since(struct timeval *start_tv, struct timeval *stop_tv)
    262 {
    263 	double sec, usec;
    264 	double ret;
    265 	sec = stop_tv->tv_sec - start_tv->tv_sec;
    266 	usec = stop_tv->tv_usec - start_tv->tv_usec;
    267 	if (sec > 0 && usec < 0) {
    268 		sec--;
    269 		usec += 1000000;
    270 	}
    271 	ret = sec + usec / (double)1000000;
    272 	if (ret < 0)
    273 		ret = 0;
    274 	return ret;
    275 }
    276 
    277 /*
    278  * return seconds between start_tv and now in double precision
    279  */
    280 static double time_since_now(struct timeval *start_tv)
    281 {
    282 	struct timeval stop_time;
    283 	gettimeofday(&stop_time, NULL);
    284 	return time_since(start_tv, &stop_time);
    285 }
    286 
    287 /*
    288  * Add latency info to latency struct
    289  */
    290 static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv,
    291 			 struct io_latency *lat)
    292 {
    293 	double delta;
    294 	int i;
    295 	delta = time_since(start_tv, stop_tv);
    296 	delta = delta * 1000;
    297 
    298 	if (delta > lat->max)
    299 		lat->max = delta;
    300 	if (!lat->min || delta < lat->min)
    301 		lat->min = delta;
    302 	lat->total_io++;
    303 	lat->total_lat += delta;
    304 	for (i = 0; i < DEVIATIONS; i++) {
    305 		if (delta < deviations[i]) {
    306 			lat->deviations[i]++;
    307 			break;
    308 		}
    309 	}
    310 }
    311 
    312 static void oper_list_add(struct io_oper *oper, struct io_oper **list)
    313 {
    314 	if (!*list) {
    315 		*list = oper;
    316 		oper->prev = oper->next = oper;
    317 		return;
    318 	}
    319 	oper->prev = (*list)->prev;
    320 	oper->next = *list;
    321 	(*list)->prev->next = oper;
    322 	(*list)->prev = oper;
    323 	return;
    324 }
    325 
    326 static void oper_list_del(struct io_oper *oper, struct io_oper **list)
    327 {
    328 	if ((*list)->next == (*list)->prev && *list == (*list)->next) {
    329 		*list = NULL;
    330 		return;
    331 	}
    332 	oper->prev->next = oper->next;
    333 	oper->next->prev = oper->prev;
    334 	if (*list == oper)
    335 		*list = oper->next;
    336 }
    337 
    338 /* worker func to check error fields in the io unit */
    339 static int check_finished_io(struct io_unit *io)
    340 {
    341 	int i;
    342 	if (io->res != io->buf_size) {
    343 
    344 		struct stat s;
    345 		fstat(io->io_oper->fd, &s);
    346 
    347 		/*
    348 		 * If file size is large enough for the read, then this short
    349 		 * read is an error.
    350 		 */
    351 		if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) &&
    352 		    s.st_size > (io->iocb.u.c.offset + io->res)) {
    353 
    354 			fprintf(stderr,
    355 				"io err %lu (%s) op %d, off %Lu size %d\n",
    356 				io->res, strerror(-io->res),
    357 				io->iocb.aio_lio_opcode, io->iocb.u.c.offset,
    358 				io->buf_size);
    359 			io->io_oper->last_err = io->res;
    360 			io->io_oper->num_err++;
    361 			return -1;
    362 		}
    363 	}
    364 	if (verify && io->io_oper->rw == READ) {
    365 		if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) {
    366 			fprintf(stderr,
    367 				"verify error, file %s offset %Lu contents (offset:bad:good):\n",
    368 				io->io_oper->file_name, io->iocb.u.c.offset);
    369 
    370 			for (i = 0; i < io->io_oper->reclen; i++) {
    371 				if (io->buf[i] != verify_buf[i]) {
    372 					fprintf(stderr, "%d:%c:%c ", i,
    373 						io->buf[i], verify_buf[i]);
    374 				}
    375 			}
    376 			fprintf(stderr, "\n");
    377 		}
    378 
    379 	}
    380 	return 0;
    381 }
    382 
    383 /* worker func to check the busy bits and get an io unit ready for use */
    384 static int grab_iou(struct io_unit *io, struct io_oper *oper)
    385 {
    386 	if (io->busy == IO_PENDING)
    387 		return -1;
    388 
    389 	io->busy = IO_PENDING;
    390 	io->res = 0;
    391 	io->io_oper = oper;
    392 	return 0;
    393 }
    394 
    395 char *stage_name(int rw)
    396 {
    397 	switch (rw) {
    398 	case WRITE:
    399 		return "write";
    400 	case READ:
    401 		return "read";
    402 	case RWRITE:
    403 		return "random write";
    404 	case RREAD:
    405 		return "random read";
    406 	}
    407 	return "unknown";
    408 }
    409 
    410 static inline double oper_mb_trans(struct io_oper *oper)
    411 {
    412 	return ((double)oper->started_ios * (double)oper->reclen) /
    413 	    (double)(1024 * 1024);
    414 }
    415 
    416 static void print_time(struct io_oper *oper)
    417 {
    418 	double runtime;
    419 	double tput;
    420 	double mb;
    421 
    422 	runtime = time_since_now(&oper->start_time);
    423 	mb = oper_mb_trans(oper);
    424 	tput = mb / runtime;
    425 	fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n",
    426 		stage_name(oper->rw), oper->file_name, tput, mb, runtime);
    427 }
    428 
    429 static void print_lat(char *str, struct io_latency *lat)
    430 {
    431 	double avg = lat->total_lat / lat->total_io;
    432 	int i;
    433 	double total_counted = 0;
    434 	fprintf(stderr, "%s min %.2f avg %.2f max %.2f\n\t",
    435 		str, lat->min, avg, lat->max);
    436 
    437 	for (i = 0; i < DEVIATIONS; i++) {
    438 		fprintf(stderr, " %.0f < %d", lat->deviations[i],
    439 			deviations[i]);
    440 		total_counted += lat->deviations[i];
    441 	}
    442 	if (total_counted && lat->total_io - total_counted)
    443 		fprintf(stderr, " < %.0f", lat->total_io - total_counted);
    444 	fprintf(stderr, "\n");
    445 	memset(lat, 0, sizeof(*lat));
    446 }
    447 
    448 static void print_latency(struct thread_info *t)
    449 {
    450 	struct io_latency *lat = &t->io_submit_latency;
    451 	print_lat("latency", lat);
    452 }
    453 
    454 static void print_completion_latency(struct thread_info *t)
    455 {
    456 	struct io_latency *lat = &t->io_completion_latency;
    457 	print_lat("completion latency", lat);
    458 }
    459 
    460 /*
    461  * updates the fields in the io operation struct that belongs to this
    462  * io unit, and make the io unit reusable again
    463  */
    464 void finish_io(struct thread_info *t, struct io_unit *io, long result,
    465 	       struct timeval *tv_now)
    466 {
    467 	struct io_oper *oper = io->io_oper;
    468 
    469 	calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency);
    470 	io->res = result;
    471 	io->busy = IO_FREE;
    472 	io->next = t->free_ious;
    473 	t->free_ious = io;
    474 	oper->num_pending--;
    475 	t->num_global_pending--;
    476 	check_finished_io(io);
    477 	if (oper->num_pending == 0 &&
    478 	    (oper->started_ios == oper->total_ios || oper->stonewalled)) {
    479 		print_time(oper);
    480 	}
    481 }
    482 
    483 int read_some_events(struct thread_info *t)
    484 {
    485 	struct io_unit *event_io;
    486 	struct io_event *event;
    487 	int nr;
    488 	int i;
    489 	int min_nr = io_iter;
    490 	struct timeval stop_time;
    491 
    492 	if (t->num_global_pending < io_iter)
    493 		min_nr = t->num_global_pending;
    494 
    495 #ifdef NEW_GETEVENTS
    496 	nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,
    497 			  NULL);
    498 #else
    499 	nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL);
    500 #endif
    501 	if (nr <= 0)
    502 		return nr;
    503 
    504 	gettimeofday(&stop_time, NULL);
    505 	for (i = 0; i < nr; i++) {
    506 		event = t->events + i;
    507 		event_io = (struct io_unit *)((unsigned long)event->obj);
    508 		finish_io(t, event_io, event->res, &stop_time);
    509 	}
    510 	return nr;
    511 }
    512 
    513 /*
    514  * finds a free io unit, waiting for pending requests if required.  returns
    515  * null if none could be found
    516  */
    517 static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper)
    518 {
    519 	struct io_unit *event_io;
    520 	int nr;
    521 
    522 retry:
    523 	if (t->free_ious) {
    524 		event_io = t->free_ious;
    525 		t->free_ious = t->free_ious->next;
    526 		if (grab_iou(event_io, oper)) {
    527 			fprintf(stderr, "io unit on free list but not free\n");
    528 			abort();
    529 		}
    530 		return event_io;
    531 	}
    532 	nr = read_some_events(t);
    533 	if (nr > 0)
    534 		goto retry;
    535 	else
    536 		fprintf(stderr, "no free ious after read_some_events\n");
    537 	return NULL;
    538 }
    539 
    540 /*
    541  * wait for all pending requests for this io operation to finish
    542  */
    543 static int io_oper_wait(struct thread_info *t, struct io_oper *oper)
    544 {
    545 	struct io_event event;
    546 	struct io_unit *event_io;
    547 
    548 	if (oper == NULL) {
    549 		return 0;
    550 	}
    551 
    552 	if (oper->num_pending == 0)
    553 		goto done;
    554 
    555 	/* this func is not speed sensitive, no need to go wild reading
    556 	 * more than one event at a time
    557 	 */
    558 #ifdef NEW_GETEVENTS
    559 	while (io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) {
    560 #else
    561 	while (io_getevents(t->io_ctx, 1, &event, NULL) > 0) {
    562 #endif
    563 		struct timeval tv_now;
    564 		event_io = (struct io_unit *)((unsigned long)event.obj);
    565 
    566 		gettimeofday(&tv_now, NULL);
    567 		finish_io(t, event_io, event.res, &tv_now);
    568 
    569 		if (oper->num_pending == 0)
    570 			break;
    571 	}
    572 done:
    573 	if (oper->num_err) {
    574 		fprintf(stderr, "%u errors on oper, last %u\n",
    575 			oper->num_err, oper->last_err);
    576 	}
    577 	return 0;
    578 }
    579 
    580 off_t random_byte_offset(struct io_oper * oper)
    581 {
    582 	off_t num;
    583 	off_t rand_byte = oper->start;
    584 	off_t range;
    585 	off_t offset = 1;
    586 
    587 	range = (oper->end - oper->start) / (1024 * 1024);
    588 	if ((page_size_mask + 1) > (1024 * 1024))
    589 		offset = (page_size_mask + 1) / (1024 * 1024);
    590 	if (range < offset)
    591 		range = 0;
    592 	else
    593 		range -= offset;
    594 
    595 	/* find a random mb offset */
    596 	num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0));
    597 	rand_byte += num * 1024 * 1024;
    598 
    599 	/* find a random byte offset */
    600 	num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0));
    601 
    602 	/* page align */
    603 	num = (num + page_size_mask) & ~page_size_mask;
    604 	rand_byte += num;
    605 
    606 	if (rand_byte + oper->reclen > oper->end) {
    607 		rand_byte -= oper->reclen;
    608 	}
    609 	return rand_byte;
    610 }
    611 
    612 /*
    613  * build an aio iocb for an operation, based on oper->rw and the
    614  * last offset used.  This finds the struct io_unit that will be attached
    615  * to the iocb, and things are ready for submission to aio after this
    616  * is called.
    617  *
    618  * returns null on error
    619  */
    620 static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper)
    621 {
    622 	struct io_unit *io;
    623 	off_t rand_byte;
    624 
    625 	io = find_iou(t, oper);
    626 	if (!io) {
    627 		fprintf(stderr, "unable to find io unit\n");
    628 		return NULL;
    629 	}
    630 
    631 	switch (oper->rw) {
    632 	case WRITE:
    633 		io_prep_pwrite(&io->iocb, oper->fd, io->buf, oper->reclen,
    634 			       oper->last_offset);
    635 		oper->last_offset += oper->reclen;
    636 		break;
    637 	case READ:
    638 		io_prep_pread(&io->iocb, oper->fd, io->buf, oper->reclen,
    639 			      oper->last_offset);
    640 		oper->last_offset += oper->reclen;
    641 		break;
    642 	case RREAD:
    643 		rand_byte = random_byte_offset(oper);
    644 		oper->last_offset = rand_byte;
    645 		io_prep_pread(&io->iocb, oper->fd, io->buf, oper->reclen,
    646 			      rand_byte);
    647 		break;
    648 	case RWRITE:
    649 		rand_byte = random_byte_offset(oper);
    650 		oper->last_offset = rand_byte;
    651 		io_prep_pwrite(&io->iocb, oper->fd, io->buf, oper->reclen,
    652 			       rand_byte);
    653 
    654 		break;
    655 	}
    656 
    657 	return io;
    658 }
    659 
    660 /*
    661  * wait for any pending requests, and then free all ram associated with
    662  * an operation.  returns the last error the operation hit (zero means none)
    663  */
    664 static int finish_oper(struct thread_info *t, struct io_oper *oper)
    665 {
    666 	unsigned long last_err;
    667 
    668 	io_oper_wait(t, oper);
    669 	last_err = oper->last_err;
    670 	if (oper->num_pending > 0) {
    671 		fprintf(stderr, "oper num_pending is %d\n", oper->num_pending);
    672 	}
    673 	close(oper->fd);
    674 	free(oper);
    675 	return last_err;
    676 }
    677 
    678 /*
    679  * allocates an io operation and fills in all the fields.  returns
    680  * null on error
    681  */
    682 static struct io_oper *create_oper(int fd, int rw, off_t start, off_t end,
    683 				   int reclen, int depth, int iter,
    684 				   char *file_name)
    685 {
    686 	struct io_oper *oper;
    687 
    688 	oper = malloc(sizeof(*oper));
    689 	if (!oper) {
    690 		fprintf(stderr, "unable to allocate io oper\n");
    691 		return NULL;
    692 	}
    693 	memset(oper, 0, sizeof(*oper));
    694 
    695 	oper->depth = depth;
    696 	oper->start = start;
    697 	oper->end = end;
    698 	oper->last_offset = oper->start;
    699 	oper->fd = fd;
    700 	oper->reclen = reclen;
    701 	oper->rw = rw;
    702 	oper->total_ios = (oper->end - oper->start) / oper->reclen;
    703 	oper->file_name = file_name;
    704 
    705 	return oper;
    706 }
    707 
    708 /*
    709  * does setup on num_ios worth of iocbs, but does not actually
    710  * start any io
    711  */
    712 int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios,
    713 	       struct iocb **my_iocbs)
    714 {
    715 	int i;
    716 	struct io_unit *io;
    717 
    718 	if (oper->started_ios == 0)
    719 		gettimeofday(&oper->start_time, NULL);
    720 
    721 	if (num_ios == 0)
    722 		num_ios = oper->total_ios;
    723 
    724 	if ((oper->started_ios + num_ios) > oper->total_ios)
    725 		num_ios = oper->total_ios - oper->started_ios;
    726 
    727 	for (i = 0; i < num_ios; i++) {
    728 		io = build_iocb(t, oper);
    729 		if (!io) {
    730 			return -1;
    731 		}
    732 		my_iocbs[i] = &io->iocb;
    733 	}
    734 	return num_ios;
    735 }
    736 
    737 /*
    738  * runs through the iocbs in the array provided and updates
    739  * counters in the associated oper struct
    740  */
    741 static void update_iou_counters(struct iocb **my_iocbs, int nr,
    742 				struct timeval *tv_now)
    743 {
    744 	struct io_unit *io;
    745 	int i;
    746 	for (i = 0; i < nr; i++) {
    747 		io = (struct io_unit *)(my_iocbs[i]);
    748 		io->io_oper->num_pending++;
    749 		io->io_oper->started_ios++;
    750 		io->io_start_time = *tv_now;	/* set time of io_submit */
    751 	}
    752 }
    753 
    754 /* starts some io for a given file, returns zero if all went well */
    755 int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs)
    756 {
    757 	int ret;
    758 	struct timeval start_time;
    759 	struct timeval stop_time;
    760 
    761 resubmit:
    762 	gettimeofday(&start_time, NULL);
    763 	ret = io_submit(t->io_ctx, num_ios, my_iocbs);
    764 	gettimeofday(&stop_time, NULL);
    765 	calc_latency(&start_time, &stop_time, &t->io_submit_latency);
    766 
    767 	if (ret != num_ios) {
    768 		/* some I/O got through */
    769 		if (ret > 0) {
    770 			update_iou_counters(my_iocbs, ret, &stop_time);
    771 			my_iocbs += ret;
    772 			t->num_global_pending += ret;
    773 			num_ios -= ret;
    774 		}
    775 		/*
    776 		 * we've used all the requests allocated in aio_init, wait and
    777 		 * retry
    778 		 */
    779 		if (ret > 0 || ret == -EAGAIN) {
    780 			int old_ret = ret;
    781 			if ((ret = read_some_events(t) > 0)) {
    782 				goto resubmit;
    783 			} else {
    784 				fprintf(stderr, "ret was %d and now is %d\n",
    785 					ret, old_ret);
    786 				abort();
    787 			}
    788 		}
    789 
    790 		fprintf(stderr, "ret %d (%s) on io_submit\n", ret,
    791 			strerror(-ret));
    792 		return -1;
    793 	}
    794 	update_iou_counters(my_iocbs, ret, &stop_time);
    795 	t->num_global_pending += ret;
    796 	return 0;
    797 }
    798 
    799 /*
    800  * changes oper->rw to the next in a command sequence, or returns zero
    801  * to say this operation is really, completely done for
    802  */
    803 static int restart_oper(struct io_oper *oper)
    804 {
    805 	int new_rw = 0;
    806 	if (oper->last_err)
    807 		return 0;
    808 
    809 	/* this switch falls through */
    810 	switch (oper->rw) {
    811 	case WRITE:
    812 		if (stages & (1 << READ))
    813 			new_rw = READ;
    814 	case READ:
    815 		if (!new_rw && stages & (1 << RWRITE))
    816 			new_rw = RWRITE;
    817 	case RWRITE:
    818 		if (!new_rw && stages & (1 << RREAD))
    819 			new_rw = RREAD;
    820 	}
    821 
    822 	if (new_rw) {
    823 		oper->started_ios = 0;
    824 		oper->last_offset = oper->start;
    825 		oper->stonewalled = 0;
    826 
    827 		/*
    828 		 * we're restarting an operation with pending requests, so the
    829 		 * timing info won't be printed by finish_io.  Printing it here
    830 		 */
    831 		if (oper->num_pending)
    832 			print_time(oper);
    833 
    834 		oper->rw = new_rw;
    835 		return 1;
    836 	}
    837 	return 0;
    838 }
    839 
    840 static int oper_runnable(struct io_oper *oper)
    841 {
    842 	struct stat buf;
    843 	int ret;
    844 
    845 	/* first context is always runnable, if started_ios > 0, no need to
    846 	 * redo the calculations
    847 	 */
    848 	if (oper->started_ios || oper->start == 0)
    849 		return 1;
    850 	/*
    851 	 * only the sequential phases force delays in starting */
    852 	if (oper->rw >= RWRITE)
    853 		return 1;
    854 	ret = fstat(oper->fd, &buf);
    855 	if (ret < 0) {
    856 		perror("fstat");
    857 		exit(1);
    858 	}
    859 	if (S_ISREG(buf.st_mode) && buf.st_size < oper->start)
    860 		return 0;
    861 	return 1;
    862 }
    863 
    864 /*
    865  * runs through all the io operations on the active list, and starts
    866  * a chunk of io on each.  If any io operations are completely finished,
    867  * it either switches them to the next stage or puts them on the
    868  * finished list.
    869  *
    870  * this function stops after max_io_submit iocbs are sent down the
    871  * pipe, even if it has not yet touched all the operations on the
    872  * active list.  Any operations that have finished are moved onto
    873  * the finished_opers list.
    874  */
    875 static int run_active_list(struct thread_info *t,
    876 			   int io_iter, int max_io_submit)
    877 {
    878 	struct io_oper *oper;
    879 	struct io_oper *built_opers = NULL;
    880 	struct iocb **my_iocbs = t->iocbs;
    881 	int ret = 0;
    882 	int num_built = 0;
    883 
    884 	oper = t->active_opers;
    885 	while (oper) {
    886 		if (!oper_runnable(oper)) {
    887 			oper = oper->next;
    888 			if (oper == t->active_opers)
    889 				break;
    890 			continue;
    891 		}
    892 		ret = build_oper(t, oper, io_iter, my_iocbs);
    893 		if (ret >= 0) {
    894 			my_iocbs += ret;
    895 			num_built += ret;
    896 			oper_list_del(oper, &t->active_opers);
    897 			oper_list_add(oper, &built_opers);
    898 			oper = t->active_opers;
    899 			if (num_built + io_iter > max_io_submit)
    900 				break;
    901 		} else
    902 			break;
    903 	}
    904 	if (num_built) {
    905 		ret = run_built(t, num_built, t->iocbs);
    906 		if (ret < 0) {
    907 			fprintf(stderr, "error %d on run_built\n", ret);
    908 			exit(1);
    909 		}
    910 		while (built_opers) {
    911 			oper = built_opers;
    912 			oper_list_del(oper, &built_opers);
    913 			oper_list_add(oper, &t->active_opers);
    914 			if (oper->started_ios == oper->total_ios) {
    915 				oper_list_del(oper, &t->active_opers);
    916 				oper_list_add(oper, &t->finished_opers);
    917 			}
    918 		}
    919 	}
    920 	return 0;
    921 }
    922 
    923 void drop_shm()
    924 {
    925 	int ret;
    926 	struct shmid_ds ds;
    927 	if (use_shm != USE_SHM)
    928 		return;
    929 
    930 	ret = shmctl(shm_id, IPC_RMID, &ds);
    931 	if (ret) {
    932 		perror("shmctl IPC_RMID");
    933 	}
    934 }
    935 
    936 void aio_setup(io_context_t * io_ctx, int n)
    937 {
    938 	int res = io_queue_init(n, io_ctx);
    939 	if (res != 0) {
    940 		fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n",
    941 			n, res, strerror(-res));
    942 		exit(3);
    943 	}
    944 }
    945 
    946 /*
    947  * allocate io operation and event arrays for a given thread
    948  */
    949 int setup_ious(struct thread_info *t,
    950 	       int num_files, int depth, int reclen, int max_io_submit)
    951 {
    952 	int i;
    953 	size_t bytes = num_files * depth * sizeof(*t->ios);
    954 
    955 	t->ios = malloc(bytes);
    956 	if (!t->ios) {
    957 		fprintf(stderr, "unable to allocate io units\n");
    958 		return -1;
    959 	}
    960 	memset(t->ios, 0, bytes);
    961 
    962 	for (i = 0; i < depth * num_files; i++) {
    963 		t->ios[i].buf = aligned_buffer;
    964 		aligned_buffer += padded_reclen;
    965 		t->ios[i].buf_size = reclen;
    966 		if (verify)
    967 			memset(t->ios[i].buf, 'b', reclen);
    968 		else
    969 			memset(t->ios[i].buf, 0, reclen);
    970 		t->ios[i].next = t->free_ious;
    971 		t->free_ious = t->ios + i;
    972 	}
    973 	if (verify) {
    974 		verify_buf = aligned_buffer;
    975 		memset(verify_buf, 'b', reclen);
    976 	}
    977 
    978 	t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit);
    979 	if (!t->iocbs) {
    980 		fprintf(stderr, "unable to allocate iocbs\n");
    981 		goto free_buffers;
    982 	}
    983 
    984 	memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *));
    985 
    986 	t->events = malloc(sizeof(struct io_event) * depth * num_files);
    987 	if (!t->events) {
    988 		fprintf(stderr, "unable to allocate ram for events\n");
    989 		goto free_buffers;
    990 	}
    991 	memset(t->events, 0, num_files * sizeof(struct io_event) * depth);
    992 
    993 	t->num_global_ios = num_files * depth;
    994 	t->num_global_events = t->num_global_ios;
    995 	return 0;
    996 
    997 free_buffers:
    998 	free(t->ios);
    999 	free(t->iocbs);
   1000 	free(t->events);
   1001 	return -1;
   1002 }
   1003 
   1004 /*
   1005  * The buffers used for file data are allocated as a single big
   1006  * malloc, and then each thread and operation takes a piece and uses
   1007  * that for file data.  This lets us do a large shm or bigpages alloc
   1008  * and without trying to find a special place in each thread to map the
   1009  * buffers to
   1010  */
   1011 int setup_shared_mem(int num_threads, int num_files, int depth,
   1012 		     int reclen, int max_io_submit)
   1013 {
   1014 	char *p = NULL;
   1015 	size_t total_ram;
   1016 
   1017 	padded_reclen = (reclen + page_size_mask) / (page_size_mask + 1);
   1018 	padded_reclen = padded_reclen * (page_size_mask + 1);
   1019 	total_ram = num_files * depth * padded_reclen + num_threads;
   1020 	if (verify)
   1021 		total_ram += padded_reclen;
   1022 
   1023 	/* for aligning buffer after the allocation */
   1024 	total_ram += page_size_mask;
   1025 
   1026 	if (use_shm == USE_MALLOC) {
   1027 		p = malloc(total_ram);
   1028 	} else if (use_shm == USE_SHM) {
   1029 		shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700);
   1030 		if (shm_id < 0) {
   1031 			perror("shmget");
   1032 			drop_shm();
   1033 			goto free_buffers;
   1034 		}
   1035 		p = shmat(shm_id, (char *)0x50000000, 0);
   1036 		if ((long)p == -1) {
   1037 			perror("shmat");
   1038 			goto free_buffers;
   1039 		}
   1040 		/* won't really be dropped until we shmdt */
   1041 		drop_shm();
   1042 	} else if (use_shm == USE_SHMFS) {
   1043 		char mmap_name[16];	/* /dev/shm/ + null + XXXXXX */
   1044 		int fd;
   1045 
   1046 		strcpy(mmap_name, "/dev/shm/XXXXXX");
   1047 		fd = mkstemp(mmap_name);
   1048 		if (fd < 0) {
   1049 			perror("mkstemp");
   1050 			goto free_buffers;
   1051 		}
   1052 		unlink(mmap_name);
   1053 		ftruncate(fd, total_ram);
   1054 		shm_id = fd;
   1055 		p = mmap((char *)0x50000000, total_ram,
   1056 			 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
   1057 
   1058 		if (p == MAP_FAILED) {
   1059 			perror("mmap");
   1060 			goto free_buffers;
   1061 		}
   1062 	}
   1063 	if (!p) {
   1064 		fprintf(stderr, "unable to allocate buffers\n");
   1065 		goto free_buffers;
   1066 	}
   1067 	unaligned_buffer = p;
   1068 	p = (char *)((intptr_t) (p + page_size_mask) & ~page_size_mask);
   1069 	aligned_buffer = p;
   1070 	return 0;
   1071 
   1072 free_buffers:
   1073 	drop_shm();
   1074 	if (unaligned_buffer)
   1075 		free(unaligned_buffer);
   1076 	return -1;
   1077 }
   1078 
   1079 /*
   1080  * runs through all the thread_info structs and calculates a combined
   1081  * throughput
   1082  */
   1083 void global_thread_throughput(struct thread_info *t, char *this_stage)
   1084 {
   1085 	int i;
   1086 	double runtime = time_since_now(&global_stage_start_time);
   1087 	double total_mb = 0;
   1088 	double min_trans = 0;
   1089 
   1090 	for (i = 0; i < num_threads; i++) {
   1091 		total_mb += global_thread_info[i].stage_mb_trans;
   1092 		if (!min_trans || t->stage_mb_trans < min_trans)
   1093 			min_trans = t->stage_mb_trans;
   1094 	}
   1095 	if (total_mb) {
   1096 		fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage,
   1097 			total_mb / runtime);
   1098 		fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime);
   1099 		if (stonewall)
   1100 			fprintf(stderr, " min transfer %.2fMB", min_trans);
   1101 		fprintf(stderr, "\n");
   1102 	}
   1103 }
   1104 
   1105 /* this is the meat of the state machine.  There is a list of
   1106  * active operations structs, and as each one finishes the required
   1107  * io it is moved to a list of finished operations.  Once they have
   1108  * all finished whatever stage they were in, they are given the chance
   1109  * to restart and pick a different stage (read/write/random read etc)
   1110  *
   1111  * various timings are printed in between the stages, along with
   1112  * thread synchronization if there are more than one threads.
   1113  */
   1114 int worker(struct thread_info *t)
   1115 {
   1116 	struct io_oper *oper;
   1117 	char *this_stage = NULL;
   1118 	struct timeval stage_time;
   1119 	int status = 0;
   1120 	int iteration = 0;
   1121 	int cnt;
   1122 
   1123 	aio_setup(&t->io_ctx, 512);
   1124 
   1125 restart:
   1126 	if (num_threads > 1) {
   1127 		pthread_mutex_lock(&stage_mutex);
   1128 		threads_starting++;
   1129 		if (threads_starting == num_threads) {
   1130 			threads_ending = 0;
   1131 			gettimeofday(&global_stage_start_time, NULL);
   1132 			pthread_cond_broadcast(&stage_cond);
   1133 		}
   1134 		while (threads_starting != num_threads)
   1135 			pthread_cond_wait(&stage_cond, &stage_mutex);
   1136 		pthread_mutex_unlock(&stage_mutex);
   1137 	}
   1138 	if (t->active_opers) {
   1139 		this_stage = stage_name(t->active_opers->rw);
   1140 		gettimeofday(&stage_time, NULL);
   1141 		t->stage_mb_trans = 0;
   1142 	}
   1143 
   1144 	cnt = 0;
   1145 	/* first we send everything through aio */
   1146 	while (t->active_opers
   1147 	       && (cnt < iterations || iterations == RUN_FOREVER)) {
   1148 		if (stonewall && threads_ending) {
   1149 			oper = t->active_opers;
   1150 			oper->stonewalled = 1;
   1151 			oper_list_del(oper, &t->active_opers);
   1152 			oper_list_add(oper, &t->finished_opers);
   1153 		} else {
   1154 			run_active_list(t, io_iter, max_io_submit);
   1155 		}
   1156 		cnt++;
   1157 	}
   1158 	if (latency_stats)
   1159 		print_latency(t);
   1160 
   1161 	if (completion_latency_stats)
   1162 		print_completion_latency(t);
   1163 
   1164 	/* then we wait for all the operations to finish */
   1165 	oper = t->finished_opers;
   1166 	do {
   1167 		if (!oper)
   1168 			break;
   1169 		io_oper_wait(t, oper);
   1170 		oper = oper->next;
   1171 	} while (oper != t->finished_opers);
   1172 
   1173 	/* then we do an fsync to get the timing for any future operations
   1174 	 * right, and check to see if any of these need to get restarted
   1175 	 */
   1176 	oper = t->finished_opers;
   1177 	while (oper) {
   1178 		if (fsync_stages)
   1179 			fsync(oper->fd);
   1180 		t->stage_mb_trans += oper_mb_trans(oper);
   1181 		if (restart_oper(oper)) {
   1182 			oper_list_del(oper, &t->finished_opers);
   1183 			oper_list_add(oper, &t->active_opers);
   1184 			oper = t->finished_opers;
   1185 			continue;
   1186 		}
   1187 		oper = oper->next;
   1188 		if (oper == t->finished_opers)
   1189 			break;
   1190 	}
   1191 
   1192 	if (t->stage_mb_trans && t->num_files > 0) {
   1193 		double seconds = time_since_now(&stage_time);
   1194 		fprintf(stderr,
   1195 			"thread %td %s totals (%.2f MB/s) %.2f MB in %.2fs\n",
   1196 			t - global_thread_info, this_stage,
   1197 			t->stage_mb_trans / seconds, t->stage_mb_trans,
   1198 			seconds);
   1199 	}
   1200 
   1201 	if (num_threads > 1) {
   1202 		pthread_mutex_lock(&stage_mutex);
   1203 		threads_ending++;
   1204 		if (threads_ending == num_threads) {
   1205 			threads_starting = 0;
   1206 			pthread_cond_broadcast(&stage_cond);
   1207 			global_thread_throughput(t, this_stage);
   1208 		}
   1209 		while (threads_ending != num_threads)
   1210 			pthread_cond_wait(&stage_cond, &stage_mutex);
   1211 		pthread_mutex_unlock(&stage_mutex);
   1212 	}
   1213 
   1214 	/* someone got restarted, go back to the beginning */
   1215 	if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
   1216 		iteration++;
   1217 		goto restart;
   1218 	}
   1219 
   1220 	/* finally, free all the ram */
   1221 	while (t->finished_opers) {
   1222 		oper = t->finished_opers;
   1223 		oper_list_del(oper, &t->finished_opers);
   1224 		status = finish_oper(t, oper);
   1225 	}
   1226 
   1227 	if (t->num_global_pending) {
   1228 		fprintf(stderr, "global num pending is %d\n",
   1229 			t->num_global_pending);
   1230 	}
   1231 	io_queue_release(t->io_ctx);
   1232 
   1233 	return status;
   1234 }
   1235 
   1236 typedef void *(*start_routine) (void *);
   1237 int run_workers(struct thread_info *t, int num_threads)
   1238 {
   1239 	int ret;
   1240 	int i;
   1241 
   1242 	for (i = 0; i < num_threads; i++) {
   1243 		ret =
   1244 		    pthread_create(&t[i].tid, NULL, (start_routine) worker,
   1245 				   t + i);
   1246 		if (ret) {
   1247 			perror("pthread_create");
   1248 			exit(1);
   1249 		}
   1250 	}
   1251 	for (i = 0; i < num_threads; i++) {
   1252 		ret = pthread_join(t[i].tid, NULL);
   1253 		if (ret) {
   1254 			perror("pthread_join");
   1255 			exit(1);
   1256 		}
   1257 	}
   1258 	return 0;
   1259 }
   1260 
   1261 off_t parse_size(char *size_arg, off_t mult)
   1262 {
   1263 	char c;
   1264 	int num;
   1265 	off_t ret;
   1266 	c = size_arg[strlen(size_arg) - 1];
   1267 	if (c > '9') {
   1268 		size_arg[strlen(size_arg) - 1] = '\0';
   1269 	}
   1270 	num = atoi(size_arg);
   1271 	switch (c) {
   1272 	case 'g':
   1273 	case 'G':
   1274 		mult = 1024 * 1024 * 1024;
   1275 		break;
   1276 	case 'm':
   1277 	case 'M':
   1278 		mult = 1024 * 1024;
   1279 		break;
   1280 	case 'k':
   1281 	case 'K':
   1282 		mult = 1024;
   1283 		break;
   1284 	case 'b':
   1285 	case 'B':
   1286 		mult = 1;
   1287 		break;
   1288 	}
   1289 	ret = mult * num;
   1290 	return ret;
   1291 }
   1292 
   1293 void print_usage(void)
   1294 {
   1295 	printf
   1296 	    ("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n");
   1297 	printf
   1298 	    ("                  [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n");
   1299 	printf("                  file1 [file2 ...]\n");
   1300 	printf("\t-a size in KB at which to align buffers\n");
   1301 	printf("\t-b max number of iocbs to give io_submit at once\n");
   1302 	printf("\t-c number of io contexts per file\n");
   1303 	printf("\t-C offset between contexts, default 2MB\n");
   1304 	printf("\t-s size in MB of the test file(s), default 1024MB\n");
   1305 	printf("\t-r record size in KB used for each io, default 64KB\n");
   1306 	printf
   1307 	    ("\t-d number of pending aio requests for each file, default 64\n");
   1308 	printf("\t-i number of I/O per file sent before switching\n"
   1309 	       "\t   to the next file, default 8\n");
   1310 	printf("\t-I total number of ayncs I/O the program will run, "
   1311 	       "default is run until Cntl-C\n");
   1312 	printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n");
   1313 	printf("\t-S Use O_SYNC for writes\n");
   1314 	printf("\t-o add an operation to the list: write=0, read=1,\n");
   1315 	printf("\t   random write=2, random read=3.\n");
   1316 	printf("\t   repeat -o to specify multiple ops: -o 0 -o 1 etc.\n");
   1317 	printf
   1318 	    ("\t-m shm use ipc shared memory for io buffers instead of malloc\n");
   1319 	printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n");
   1320 	printf("\t-n no fsyncs between write stage and read stage\n");
   1321 	printf("\t-l print io_submit latencies after each stage\n");
   1322 	printf("\t-L print io completion latencies after each stage\n");
   1323 	printf("\t-t number of threads to run\n");
   1324 	printf("\t-u unlink files after completion\n");
   1325 	printf("\t-v verification of bytes written\n");
   1326 	printf("\t-x turn off thread stonewalling\n");
   1327 	printf("\t-h this message\n");
   1328 	printf
   1329 	    ("\n\t   the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n");
   1330 	printf("\t   translate to 400KB, 400MB and 400GB\n");
   1331 	printf("version %s\n", PROG_VERSION);
   1332 }
   1333 
   1334 int main(int ac, char **av)
   1335 {
   1336 	int rwfd;
   1337 	int i;
   1338 	int j;
   1339 	int c;
   1340 
   1341 	off_t file_size = 1 * 1024 * 1024 * 1024;
   1342 	int first_stage = WRITE;
   1343 	struct io_oper *oper;
   1344 	int status = 0;
   1345 	int num_files = 0;
   1346 	int open_fds = 0;
   1347 	struct thread_info *t;
   1348 
   1349 	page_size_mask = getpagesize() - 1;
   1350 
   1351 	while (1) {
   1352 		c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lLnhOSxvu");
   1353 		if (c < 0)
   1354 			break;
   1355 
   1356 		switch (c) {
   1357 		case 'a':
   1358 			page_size_mask = parse_size(optarg, 1024);
   1359 			page_size_mask--;
   1360 			break;
   1361 		case 'c':
   1362 			num_contexts = atoi(optarg);
   1363 			break;
   1364 		case 'C':
   1365 			context_offset = parse_size(optarg, 1024 * 1024);
   1366 		case 'b':
   1367 			max_io_submit = atoi(optarg);
   1368 			break;
   1369 		case 's':
   1370 			file_size = parse_size(optarg, 1024 * 1024);
   1371 			break;
   1372 		case 'd':
   1373 			depth = atoi(optarg);
   1374 			break;
   1375 		case 'r':
   1376 			rec_len = parse_size(optarg, 1024);
   1377 			break;
   1378 		case 'i':
   1379 			io_iter = atoi(optarg);
   1380 			break;
   1381 		case 'I':
   1382 			iterations = atoi(optarg);
   1383 			break;
   1384 		case 'n':
   1385 			fsync_stages = 0;
   1386 			break;
   1387 		case 'l':
   1388 			latency_stats = 1;
   1389 			break;
   1390 		case 'L':
   1391 			completion_latency_stats = 1;
   1392 			break;
   1393 		case 'm':
   1394 			if (!strcmp(optarg, "shm")) {
   1395 				fprintf(stderr, "using ipc shm\n");
   1396 				use_shm = USE_SHM;
   1397 			} else if (!strcmp(optarg, "shmfs")) {
   1398 				fprintf(stderr, "using /dev/shm for buffers\n");
   1399 				use_shm = USE_SHMFS;
   1400 			}
   1401 			break;
   1402 		case 'o':
   1403 			i = atoi(optarg);
   1404 			stages |= 1 << i;
   1405 			fprintf(stderr, "adding stage %s\n", stage_name(i));
   1406 			break;
   1407 		case 'O':
   1408 			o_direct = O_DIRECT;
   1409 			break;
   1410 		case 'S':
   1411 			o_sync = O_SYNC;
   1412 			break;
   1413 		case 't':
   1414 			num_threads = atoi(optarg);
   1415 			break;
   1416 		case 'x':
   1417 			stonewall = 0;
   1418 			break;
   1419 		case 'u':
   1420 			unlink_files = 1;
   1421 			break;
   1422 		case 'v':
   1423 			verify = 1;
   1424 			break;
   1425 		case 'h':
   1426 		default:
   1427 			print_usage();
   1428 			exit(1);
   1429 		}
   1430 	}
   1431 
   1432 	/*
   1433 	 * make sure we don't try to submit more I/O than we have allocated
   1434 	 * memory for
   1435 	 */
   1436 	if (depth < io_iter) {
   1437 		io_iter = depth;
   1438 		fprintf(stderr, "dropping io_iter to %d\n", io_iter);
   1439 	}
   1440 
   1441 	if (optind >= ac) {
   1442 		print_usage();
   1443 		exit(1);
   1444 	}
   1445 
   1446 	num_files = ac - optind;
   1447 
   1448 	if (num_threads > (num_files * num_contexts)) {
   1449 		num_threads = num_files * num_contexts;
   1450 		fprintf(stderr,
   1451 			"dropping thread count to the number of contexts %d\n",
   1452 			num_threads);
   1453 	}
   1454 
   1455 	t = malloc(num_threads * sizeof(*t));
   1456 	if (!t) {
   1457 		perror("malloc");
   1458 		exit(1);
   1459 	}
   1460 	memset(t, 0, num_threads * sizeof(*t));
   1461 	global_thread_info = t;
   1462 
   1463 	/* by default, allow a huge number of iocbs to be sent towards
   1464 	 * io_submit
   1465 	 */
   1466 	if (!max_io_submit)
   1467 		max_io_submit = num_files * io_iter * num_contexts;
   1468 
   1469 	/*
   1470 	 * make sure we don't try to submit more I/O than max_io_submit allows
   1471 	 */
   1472 	if (max_io_submit < io_iter) {
   1473 		io_iter = max_io_submit;
   1474 		fprintf(stderr, "dropping io_iter to %d\n", io_iter);
   1475 	}
   1476 
   1477 	if (!stages) {
   1478 		stages =
   1479 		    (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE);
   1480 	} else {
   1481 		for (i = 0; i < LAST_STAGE; i++) {
   1482 			if (stages & (1 << i)) {
   1483 				first_stage = i;
   1484 				fprintf(stderr, "starting with %s\n",
   1485 					stage_name(i));
   1486 				break;
   1487 			}
   1488 		}
   1489 	}
   1490 
   1491 	if (file_size < num_contexts * context_offset) {
   1492 		fprintf(stderr, "file size %ld too small for %d contexts\n",
   1493 			(long)file_size, num_contexts);
   1494 		exit(1);
   1495 	}
   1496 
   1497 	fprintf(stderr, "file size %ldMB, record size %ldKB, depth %d, "
   1498 		"I/O per iteration %d\n",
   1499 		(long)(file_size / (1024 * 1024)),
   1500 		rec_len / 1024, depth, io_iter);
   1501 	fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n",
   1502 		max_io_submit, (page_size_mask + 1) / 1024);
   1503 	fprintf(stderr, "threads %d files %d contexts %d context offset %ldMB "
   1504 		"verification %s\n", num_threads, num_files, num_contexts,
   1505 		(long)(context_offset / (1024 * 1024)), verify ? "on" : "off");
   1506 	/* open all the files and do any required setup for them */
   1507 	for (i = optind; i < ac; i++) {
   1508 		int thread_index;
   1509 		for (j = 0; j < num_contexts; j++) {
   1510 			thread_index = open_fds % num_threads;
   1511 			open_fds++;
   1512 
   1513 			rwfd =
   1514 			    open(av[i], O_CREAT | O_RDWR | o_direct | o_sync,
   1515 				 0600);
   1516 			if (rwfd == -1) {
   1517 				fprintf(stderr,
   1518 					"error while creating file %s: %s",
   1519 					av[i], strerror(errno));
   1520 				exit(1);
   1521 			}
   1522 
   1523 			oper =
   1524 			    create_oper(rwfd, first_stage, j * context_offset,
   1525 					file_size - j * context_offset, rec_len,
   1526 					depth, io_iter, av[i]);
   1527 			if (!oper) {
   1528 				fprintf(stderr, "error in create_oper\n");
   1529 				exit(-1);
   1530 			}
   1531 			oper_list_add(oper, &t[thread_index].active_opers);
   1532 			t[thread_index].num_files++;
   1533 		}
   1534 	}
   1535 	if (setup_shared_mem(num_threads, num_files * num_contexts,
   1536 			     depth, rec_len, max_io_submit)) {
   1537 		exit(1);
   1538 	}
   1539 	for (i = 0; i < num_threads; i++) {
   1540 		if (setup_ious
   1541 		    (&t[i], t[i].num_files, depth, rec_len, max_io_submit))
   1542 			exit(1);
   1543 	}
   1544 	if (num_threads > 1) {
   1545 		printf("Running multi thread version num_threads:%d\n",
   1546 		       num_threads);
   1547 		run_workers(t, num_threads);
   1548 	} else {
   1549 		printf("Running single thread version \n");
   1550 		status = worker(t);
   1551 	}
   1552 	if (unlink_files) {
   1553 		for (i = optind; i < ac; i++) {
   1554 			printf("Cleaning up file %s \n", av[i]);
   1555 			unlink(av[i]);
   1556 		}
   1557 	}
   1558 
   1559 	if (status) {
   1560 		exit(1);
   1561 	}
   1562 	return status;
   1563 }
   1564 #else
   1565 int main(void)
   1566 {
   1567 	fprintf(stderr, "test requires libaio and it's development packages\n");
   1568 	return TCONF;
   1569 }
   1570 #endif
   1571