Home | History | Annotate | Download | only in engines
      1 /*
      2  * libhdfs engine
      3  *
      4  * this engine helps perform read/write operations on hdfs cluster using
      5  * libhdfs. hdfs doesnot support modification of data once file is created.
      6  *
      7  * so to mimic that create many files of small size (e.g 256k), and this
      8  * engine select a file based on the offset generated by fio.
      9  *
     10  * thus, random reads and writes can also be achieved with this logic.
     11  *
     12  */
     13 
     14 #include <math.h>
     15 #include <hdfs.h>
     16 
     17 #include "../fio.h"
     18 #include "../optgroup.h"
     19 
     20 #define CHUNCK_NAME_LENGTH_MAX 80
     21 #define CHUNCK_CREATION_BUFFER_SIZE 65536
     22 
     23 struct hdfsio_data {
     24 	hdfsFS fs;
     25 	hdfsFile fp;
     26 	uint64_t curr_file_id;
     27 };
     28 
     29 struct hdfsio_options {
     30 	void *pad;			/* needed because offset can't be 0 for a option defined used offsetof */
     31 	char *host;
     32 	char *directory;
     33 	unsigned int port;
     34 	unsigned int chunck_size;
     35 	unsigned int single_instance;
     36 	unsigned int use_direct;
     37 };
     38 
     39 static struct fio_option options[] = {
     40 	{
     41 		.name	= "namenode",
     42 		.lname	= "hfds namenode",
     43 		.type	= FIO_OPT_STR_STORE,
     44 		.off1   = offsetof(struct hdfsio_options, host),
     45 		.def    = "localhost",
     46 		.help	= "Namenode of the HDFS cluster",
     47 		.category = FIO_OPT_C_ENGINE,
     48 		.group	= FIO_OPT_G_HDFS,
     49 	},
     50 	{
     51 		.name	= "hostname",
     52 		.lname	= "hfds namenode",
     53 		.type	= FIO_OPT_STR_STORE,
     54 		.off1   = offsetof(struct hdfsio_options, host),
     55 		.def    = "localhost",
     56 		.help	= "Namenode of the HDFS cluster",
     57 		.category = FIO_OPT_C_ENGINE,
     58 		.group	= FIO_OPT_G_HDFS,
     59 	},
     60 	{
     61 		.name	= "port",
     62 		.lname	= "hdfs namenode port",
     63 		.type	= FIO_OPT_INT,
     64 		.off1	= offsetof(struct hdfsio_options, port),
     65 		.def    = "9000",
     66 		.minval	= 1,
     67 		.maxval	= 65535,
     68 		.help	= "Port used by the HDFS cluster namenode",
     69 		.category = FIO_OPT_C_ENGINE,
     70 		.group	= FIO_OPT_G_HDFS,
     71 	},
     72 	{
     73 		.name	= "hdfsdirectory",
     74 		.lname	= "hfds directory",
     75 		.type	= FIO_OPT_STR_STORE,
     76 		.off1   = offsetof(struct hdfsio_options, directory),
     77 		.def    = "/",
     78 		.help	= "The HDFS directory where fio will create chuncks",
     79 		.category = FIO_OPT_C_ENGINE,
     80 		.group	= FIO_OPT_G_HDFS,
     81 	},
     82 	{
     83 		.name	= "chunk_size",
     84 		.alias	= "chunck_size",
     85 		.lname	= "Chunk size",
     86 		.type	= FIO_OPT_INT,
     87 		.off1	= offsetof(struct hdfsio_options, chunck_size),
     88 		.def    = "1048576",
     89 		.help	= "Size of individual chunck",
     90 		.category = FIO_OPT_C_ENGINE,
     91 		.group	= FIO_OPT_G_HDFS,
     92 	},
     93 	{
     94 		.name	= "single_instance",
     95 		.lname	= "Single Instance",
     96 		.type	= FIO_OPT_BOOL,
     97 		.off1	= offsetof(struct hdfsio_options, single_instance),
     98 		.def    = "1",
     99 		.help	= "Use a single instance",
    100 		.category = FIO_OPT_C_ENGINE,
    101 		.group	= FIO_OPT_G_HDFS,
    102 	},
    103 	{
    104 		.name	= "hdfs_use_direct",
    105 		.lname	= "HDFS Use Direct",
    106 		.type	= FIO_OPT_BOOL,
    107 		.off1	= offsetof(struct hdfsio_options, use_direct),
    108 		.def    = "0",
    109 		.help	= "Use readDirect instead of hdfsRead",
    110 		.category = FIO_OPT_C_ENGINE,
    111 		.group	= FIO_OPT_G_HDFS,
    112 	},
    113 	{
    114 		.name	= NULL,
    115 	},
    116 };
    117 
    118 
    119 static int get_chunck_name(char *dest, char *file_name, uint64_t chunk_id) {
    120 	return snprintf(dest, CHUNCK_NAME_LENGTH_MAX, "%s_%lu", file_name, chunk_id);
    121 }
    122 
    123 static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
    124 {
    125 	struct hdfsio_options *options = td->eo;
    126 	struct hdfsio_data *hd = td->io_ops_data;
    127 	unsigned long f_id;
    128 	char fname[CHUNCK_NAME_LENGTH_MAX];
    129 	int open_flags;
    130 
    131 	/* find out file id based on the offset generated by fio */
    132 	f_id = floor(io_u->offset / options-> chunck_size);
    133 
    134 	if (f_id == hd->curr_file_id) {
    135 		/* file is already open */
    136 		return 0;
    137 	}
    138 
    139 	if (hd->curr_file_id != -1) {
    140 		if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
    141 			log_err("hdfs: unable to close file: %s\n", strerror(errno));
    142 			return errno;
    143 		}
    144 		hd->curr_file_id = -1;
    145 	}
    146 
    147 	if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_SYNC) {
    148 		open_flags = O_RDONLY;
    149 	} else if (io_u->ddir == DDIR_WRITE) {
    150 		open_flags = O_WRONLY;
    151 	} else {
    152 		log_err("hdfs: Invalid I/O Operation\n");
    153 		return 0;
    154 	}
    155 
    156 	get_chunck_name(fname, io_u->file->file_name, f_id);
    157 	hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
    158 			      options->chunck_size);
    159 	if(hd->fp == NULL) {
    160 		log_err("hdfs: unable to open file: %s: %d\n", fname, strerror(errno));
    161 		return errno;
    162 	}
    163 	hd->curr_file_id = f_id;
    164 
    165 	return 0;
    166 }
    167 
    168 static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
    169 {
    170 	struct hdfsio_data *hd = td->io_ops_data;
    171 	struct hdfsio_options *options = td->eo;
    172 	int ret;
    173 	unsigned long offset;
    174 
    175 	offset = io_u->offset % options->chunck_size;
    176 
    177 	if( (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) &&
    178 	     hdfsTell(hd->fs, hd->fp) != offset && hdfsSeek(hd->fs, hd->fp, offset) != 0 ) {
    179 		log_err("hdfs: seek failed: %s, are you doing random write smaller than chunck size ?\n", strerror(errno));
    180 		io_u->error = errno;
    181 		return FIO_Q_COMPLETED;
    182 	};
    183 
    184 	// do the IO
    185 	if (io_u->ddir == DDIR_READ) {
    186 		if (options->use_direct) {
    187 			ret = readDirect(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
    188 		} else {
    189 			ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
    190 		}
    191 	} else if (io_u->ddir == DDIR_WRITE) {
    192 		ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
    193 				io_u->xfer_buflen);
    194 	} else if (io_u->ddir == DDIR_SYNC) {
    195 		ret = hdfsFlush(hd->fs, hd->fp);
    196 	} else {
    197 		log_err("hdfs: Invalid I/O Operation: %d\n", io_u->ddir);
    198 		ret = EINVAL;
    199 	}
    200 
    201 	// Check if the IO went fine, or is incomplete
    202 	if (ret != (int)io_u->xfer_buflen) {
    203 		if (ret >= 0) {
    204 			io_u->resid = io_u->xfer_buflen - ret;
    205 			io_u->error = 0;
    206 			return FIO_Q_COMPLETED;
    207 		} else {
    208 			io_u->error = errno;
    209 		}
    210 	}
    211 
    212 	if (io_u->error)
    213 		td_verror(td, io_u->error, "xfer");
    214 
    215 	return FIO_Q_COMPLETED;
    216 }
    217 
    218 int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
    219 {
    220 	if (td->o.odirect) {
    221 		td->error = EINVAL;
    222 		return 0;
    223 	}
    224 
    225 	return 0;
    226 }
    227 
    228 int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
    229 {
    230 	struct hdfsio_data *hd = td->io_ops_data;
    231 
    232 	if (hd->curr_file_id != -1) {
    233 		if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
    234 			log_err("hdfs: unable to close file: %s\n", strerror(errno));
    235 			return errno;
    236 		}
    237 		hd->curr_file_id = -1;
    238 	}
    239 	return 0;
    240 }
    241 
    242 static int fio_hdfsio_init(struct thread_data *td)
    243 {
    244 	struct hdfsio_options *options = td->eo;
    245 	struct hdfsio_data *hd = td->io_ops_data;
    246 	struct fio_file *f;
    247 	uint64_t j,k;
    248 	int i, failure = 0;
    249 	uint8_t buffer[CHUNCK_CREATION_BUFFER_SIZE];
    250 	uint64_t bytes_left;
    251 	char fname[CHUNCK_NAME_LENGTH_MAX];
    252 	hdfsFile fp;
    253 	hdfsFileInfo *fi;
    254 	tOffset fi_size;
    255 
    256 	for_each_file(td, f, i) {
    257 		k = 0;
    258 		for(j=0; j < f->real_file_size; j += options->chunck_size) {
    259 			get_chunck_name(fname, f->file_name, k++);
    260 			fi = hdfsGetPathInfo(hd->fs, fname);
    261 			fi_size = fi ? fi->mSize : 0;
    262 			// fill exist and is big enough, nothing to do
    263 			if( fi && fi_size >= options->chunck_size) {
    264 				continue;
    265 			}
    266 			fp = hdfsOpenFile(hd->fs, fname, O_WRONLY, 0, 0,
    267 					  options->chunck_size);
    268 			if(fp == NULL) {
    269 				failure = errno;
    270 				log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
    271 				break;
    272 			}
    273 			bytes_left = options->chunck_size;
    274 			memset(buffer, 0, CHUNCK_CREATION_BUFFER_SIZE);
    275 			while( bytes_left > CHUNCK_CREATION_BUFFER_SIZE) {
    276 				if( hdfsWrite(hd->fs, fp, buffer, CHUNCK_CREATION_BUFFER_SIZE)
    277 				    != CHUNCK_CREATION_BUFFER_SIZE) {
    278     					failure = errno;
    279 	    				log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
    280 					break;
    281 				};
    282 				bytes_left -= CHUNCK_CREATION_BUFFER_SIZE;
    283 			}
    284 			if(bytes_left > 0) {
    285 				if( hdfsWrite(hd->fs, fp, buffer, bytes_left)
    286 				    != bytes_left) {
    287 					failure = errno;
    288 					break;
    289 				};
    290 			}
    291 			if( hdfsCloseFile(hd->fs, fp) != 0) {
    292 				failure = errno;
    293 				log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
    294 				break;
    295 			}
    296 		}
    297 		if(failure) {
    298 			break;
    299 		}
    300 	}
    301 
    302 	if( !failure ) {
    303 		fio_file_set_size_known(f);
    304 	}
    305 
    306 	return failure;
    307 }
    308 
    309 static int fio_hdfsio_setup(struct thread_data *td)
    310 {
    311 	struct hdfsio_data *hd;
    312 	struct fio_file *f;
    313 	int i;
    314 	uint64_t file_size, total_file_size;
    315 
    316 	if (!td->io_ops_data) {
    317 		hd = malloc(sizeof(*hd));
    318 		memset(hd, 0, sizeof(*hd));
    319 
    320 		hd->curr_file_id = -1;
    321 
    322 		td->io_ops_data = hd;
    323 	}
    324 
    325 	total_file_size = 0;
    326 	file_size = 0;
    327 
    328 	for_each_file(td, f, i) {
    329 		if(!td->o.file_size_low) {
    330 			file_size = floor(td->o.size / td->o.nr_files);
    331 			total_file_size += file_size;
    332 		}
    333 		else if (td->o.file_size_low == td->o.file_size_high)
    334 			file_size = td->o.file_size_low;
    335 		else {
    336 			file_size = get_rand_file_size(td);
    337 		}
    338 		f->real_file_size = file_size;
    339 	}
    340 	/* If the size doesn't divide nicely with the chunck size,
    341 	 * make the last files bigger.
    342 	 * Used only if filesize was not explicitely given
    343 	 */
    344 	if (!td->o.file_size_low && total_file_size < td->o.size) {
    345 		f->real_file_size += (td->o.size - total_file_size);
    346 	}
    347 
    348 	return 0;
    349 }
    350 
    351 static int fio_hdfsio_io_u_init(struct thread_data *td, struct io_u *io_u)
    352 {
    353 	struct hdfsio_data *hd = td->io_ops_data;
    354 	struct hdfsio_options *options = td->eo;
    355 	int failure;
    356 	struct hdfsBuilder *bld;
    357 
    358 	if (options->host == NULL || options->port == 0) {
    359 		log_err("hdfs: server not defined\n");
    360 		return EINVAL;
    361 	}
    362 
    363 	bld = hdfsNewBuilder();
    364 	if (!bld) {
    365 		failure = errno;
    366 		log_err("hdfs: unable to allocate connect builder\n");
    367 		return failure;
    368 	}
    369 	hdfsBuilderSetNameNode(bld, options->host);
    370 	hdfsBuilderSetNameNodePort(bld, options->port);
    371 	if(! options->single_instance) {
    372 		hdfsBuilderSetForceNewInstance(bld);
    373 	}
    374 	hd->fs = hdfsBuilderConnect(bld);
    375 
    376 	/* hdfsSetWorkingDirectory succeed on non existend directory */
    377 	if (hdfsExists(hd->fs, options->directory) < 0 || hdfsSetWorkingDirectory(hd->fs, options->directory) < 0) {
    378 		failure = errno;
    379 		log_err("hdfs: invalid working directory %s: %s\n", options->directory, strerror(errno));
    380 		return failure;
    381 	}
    382 
    383 	return 0;
    384 }
    385 
    386 static void fio_hdfsio_io_u_free(struct thread_data *td, struct io_u *io_u)
    387 {
    388 	struct hdfsio_data *hd = td->io_ops_data;
    389 
    390 	if (hd->fs && hdfsDisconnect(hd->fs) < 0) {
    391 		log_err("hdfs: disconnect failed: %d\n", errno);
    392 	}
    393 }
    394 
    395 static struct ioengine_ops ioengine_hdfs = {
    396 	.name = "libhdfs",
    397 	.version = FIO_IOOPS_VERSION,
    398 	.flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_NODISKUTIL,
    399 	.setup = fio_hdfsio_setup,
    400 	.init = fio_hdfsio_init,
    401 	.prep = fio_hdfsio_prep,
    402 	.queue = fio_hdfsio_queue,
    403 	.open_file = fio_hdfsio_open_file,
    404 	.close_file = fio_hdfsio_close_file,
    405 	.io_u_init = fio_hdfsio_io_u_init,
    406 	.io_u_free = fio_hdfsio_io_u_free,
    407 	.option_struct_size	= sizeof(struct hdfsio_options),
    408 	.options		= options,
    409 };
    410 
    411 
    412 static void fio_init fio_hdfsio_register(void)
    413 {
    414 	register_ioengine(&ioengine_hdfs);
    415 }
    416 
    417 static void fio_exit fio_hdfsio_unregister(void)
    418 {
    419 	unregister_ioengine(&ioengine_hdfs);
    420 }
    421