Home | History | Annotate | Download | only in engines
      1 /*
      2  * libhdfs engine
      3  *
      4  * this engine helps perform read/write operations on hdfs cluster using
      5  * libhdfs. hdfs doesnot support modification of data once file is created.
      6  *
      7  * so to mimic that create many files of small size (e.g 256k), and this
      8  * engine select a file based on the offset generated by fio.
      9  *
     10  * thus, random reads and writes can also be achieved with this logic.
     11  *
     12  * NOTE: please set environment variables FIO_HDFS_BS and FIO_HDFS_FCOUNT
     13  * to appropriate value to work this engine properly
     14  *
     15  */
     16 
     17 #include <stdio.h>
     18 #include <stdlib.h>
     19 #include <unistd.h>
     20 #include <sys/uio.h>
     21 #include <errno.h>
     22 #include <assert.h>
     23 
     24 #include "../fio.h"
     25 
     26 #include "hdfs.h"
     27 
     28 struct hdfsio_data {
     29 	char host[256];
     30 	int port;
     31 	hdfsFS fs;
     32 	hdfsFile fp;
     33 	unsigned long fsbs;
     34 	unsigned long fscount;
     35 	unsigned long curr_file_id;
     36 	unsigned int numjobs;
     37 	unsigned int fid_correction;
     38 };
     39 
     40 static int fio_hdfsio_setup_fs_params(struct hdfsio_data *hd)
     41 {
     42 	/* make sure that hdfsConnect is invoked before executing this function */
     43 	hdfsSetWorkingDirectory(hd->fs, "/.perftest");
     44 	hd->fp = hdfsOpenFile(hd->fs, ".fcount", O_RDONLY, 0, 0, 0);
     45 	if (hd->fp) {
     46 		hdfsRead(hd->fs, hd->fp, &(hd->fscount), sizeof(hd->fscount));
     47 		hdfsCloseFile(hd->fs, hd->fp);
     48 	}
     49 	hd->fp = hdfsOpenFile(hd->fs, ".fbs", O_RDONLY, 0, 0, 0);
     50 	if (hd->fp) {
     51 		hdfsRead(hd->fs, hd->fp, &(hd->fsbs), sizeof(hd->fsbs));
     52 		hdfsCloseFile(hd->fs, hd->fp);
     53 	}
     54 
     55 	return 0;
     56 }
     57 
     58 static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
     59 {
     60 	struct hdfsio_data *hd;
     61 	hdfsFileInfo *fi;
     62 	unsigned long f_id;
     63 	char fname[80];
     64 	int open_flags = 0;
     65 
     66 	hd = td->io_ops->data;
     67 
     68 	if (hd->curr_file_id == -1) {
     69 		/* see comment in fio_hdfsio_setup() function */
     70 		fio_hdfsio_setup_fs_params(hd);
     71 	}
     72 
     73 	/* find out file id based on the offset generated by fio */
     74 	f_id = (io_u->offset / hd->fsbs) + hd->fid_correction;
     75 
     76 	if (f_id == hd->curr_file_id) {
     77 		/* file is already open */
     78 		return 0;
     79 	}
     80 
     81 	if (hd->curr_file_id != -1) {
     82 		hdfsCloseFile(hd->fs, hd->fp);
     83 	}
     84 
     85 	if (io_u->ddir == DDIR_READ) {
     86 		open_flags = O_RDONLY;
     87 	} else if (io_u->ddir == DDIR_WRITE) {
     88 		open_flags = O_WRONLY;
     89 	} else {
     90 		log_err("hdfs: Invalid I/O Operation\n");
     91 	}
     92 
     93 	hd->curr_file_id = f_id;
     94 	do {
     95 		sprintf(fname, ".f%lu", f_id);
     96 		fi = hdfsGetPathInfo(hd->fs, fname);
     97 		if (fi->mSize >= hd->fsbs || io_u->ddir == DDIR_WRITE) {
     98 			/* file has enough data to read OR file is opened in write mode */
     99 			hd->fp =
    100 			    hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
    101 					 hd->fsbs);
    102 			if (hd->fp) {
    103 				break;
    104 			}
    105 		}
    106 		/* file is empty, so try next file for reading */
    107 		f_id = (f_id + 1) % hd->fscount;
    108 	} while (1);
    109 
    110 	return 0;
    111 }
    112 
    113 static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret)
    114 {
    115 	if (ret != (int)io_u->xfer_buflen) {
    116 		if (ret >= 0) {
    117 			io_u->resid = io_u->xfer_buflen - ret;
    118 			io_u->error = 0;
    119 			return FIO_Q_COMPLETED;
    120 		} else
    121 			io_u->error = errno;
    122 	}
    123 
    124 	if (io_u->error)
    125 		td_verror(td, io_u->error, "xfer");
    126 
    127 	return FIO_Q_COMPLETED;
    128 }
    129 
    130 static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
    131 {
    132 	struct hdfsio_data *hd;
    133 	int ret = 0;
    134 
    135 	hd = td->io_ops->data;
    136 
    137 	if (io_u->ddir == DDIR_READ) {
    138 		ret =
    139 		    hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
    140 	} else if (io_u->ddir == DDIR_WRITE) {
    141 		ret =
    142 		    hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
    143 			      io_u->xfer_buflen);
    144 	} else {
    145 		log_err("hdfs: Invalid I/O Operation\n");
    146 	}
    147 
    148 	return fio_io_end(td, io_u, ret);
    149 }
    150 
    151 int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
    152 {
    153 	struct hdfsio_data *hd;
    154 
    155 	hd = td->io_ops->data;
    156 	hd->fs = hdfsConnect(hd->host, hd->port);
    157 	hdfsSetWorkingDirectory(hd->fs, "/.perftest");
    158 	hd->fid_correction = (getpid() % hd->numjobs);
    159 
    160 	return 0;
    161 }
    162 
    163 int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
    164 {
    165 	struct hdfsio_data *hd;
    166 
    167 	hd = td->io_ops->data;
    168 	hdfsDisconnect(hd->fs);
    169 
    170 	return 0;
    171 }
    172 
    173 static int fio_hdfsio_setup(struct thread_data *td)
    174 {
    175 	struct hdfsio_data *hd;
    176 	struct fio_file *f;
    177 	static unsigned int numjobs = 1;	/* atleast one job has to be there! */
    178 	numjobs = (td->o.numjobs > numjobs) ? td->o.numjobs : numjobs;
    179 
    180 	if (!td->io_ops->data) {
    181 		hd = malloc(sizeof(*hd));;
    182 
    183 		memset(hd, 0, sizeof(*hd));
    184 		td->io_ops->data = hd;
    185 
    186 		/* separate host and port from filename */
    187 		*(strchr(td->o.filename, ',')) = ' ';
    188 		sscanf(td->o.filename, "%s%d", hd->host, &(hd->port));
    189 
    190 		/* read fbs and fcount and based on that set f->real_file_size */
    191 		f = td->files[0];
    192 #if 0
    193 		/* IMHO, this should be done here instead of fio_hdfsio_prep()
    194 		 * but somehow calling it here doesn't seem to work,
    195 		 * some problem with libhdfs that needs to be debugged */
    196 		hd->fs = hdfsConnect(hd->host, hd->port);
    197 		fio_hdfsio_setup_fs_params(hd);
    198 		hdfsDisconnect(hd->fs);
    199 #else
    200 		/* so, as an alternate, using environment variables */
    201 		if (getenv("FIO_HDFS_FCOUNT") && getenv("FIO_HDFS_BS")) {
    202 			hd->fscount = atol(getenv("FIO_HDFS_FCOUNT"));
    203 			hd->fsbs = atol(getenv("FIO_HDFS_BS"));
    204 		} else {
    205 			log_err("FIO_HDFS_FCOUNT and/or FIO_HDFS_BS not set.\n");
    206 			return 1;
    207 		}
    208 #endif
    209 		f->real_file_size = hd->fscount * hd->fsbs;
    210 
    211 		td->o.nr_files = 1;
    212 		hd->curr_file_id = -1;
    213 		hd->numjobs = numjobs;
    214 		fio_file_set_size_known(f);
    215 	}
    216 
    217 	return 0;
    218 }
    219 
    220 static struct ioengine_ops ioengine_hdfs = {
    221 	.name = "libhdfs",
    222 	.version = FIO_IOOPS_VERSION,
    223 	.setup = fio_hdfsio_setup,
    224 	.prep = fio_hdfsio_prep,
    225 	.queue = fio_hdfsio_queue,
    226 	.open_file = fio_hdfsio_open_file,
    227 	.close_file = fio_hdfsio_close_file,
    228 	.flags = FIO_SYNCIO,
    229 };
    230 
    231 static void fio_init fio_hdfsio_register(void)
    232 {
    233 	register_ioengine(&ioengine_hdfs);
    234 }
    235 
    236 static void fio_exit fio_hdfsio_unregister(void)
    237 {
    238 	unregister_ioengine(&ioengine_hdfs);
    239 }
    240