1 /* 2 * libhdfs engine 3 * 4 * this engine helps perform read/write operations on hdfs cluster using 5 * libhdfs. hdfs doesnot support modification of data once file is created. 6 * 7 * so to mimic that create many files of small size (e.g 256k), and this 8 * engine select a file based on the offset generated by fio. 9 * 10 * thus, random reads and writes can also be achieved with this logic. 11 * 12 */ 13 14 #include <math.h> 15 #include <hdfs.h> 16 17 #include "../fio.h" 18 #include "../optgroup.h" 19 20 #define CHUNCK_NAME_LENGTH_MAX 80 21 #define CHUNCK_CREATION_BUFFER_SIZE 65536 22 23 struct hdfsio_data { 24 hdfsFS fs; 25 hdfsFile fp; 26 uint64_t curr_file_id; 27 }; 28 29 struct hdfsio_options { 30 void *pad; /* needed because offset can't be 0 for a option defined used offsetof */ 31 char *host; 32 char *directory; 33 unsigned int port; 34 unsigned int chunck_size; 35 unsigned int single_instance; 36 unsigned int use_direct; 37 }; 38 39 static struct fio_option options[] = { 40 { 41 .name = "namenode", 42 .lname = "hfds namenode", 43 .type = FIO_OPT_STR_STORE, 44 .off1 = offsetof(struct hdfsio_options, host), 45 .def = "localhost", 46 .help = "Namenode of the HDFS cluster", 47 .category = FIO_OPT_C_ENGINE, 48 .group = FIO_OPT_G_HDFS, 49 }, 50 { 51 .name = "hostname", 52 .lname = "hfds namenode", 53 .type = FIO_OPT_STR_STORE, 54 .off1 = offsetof(struct hdfsio_options, host), 55 .def = "localhost", 56 .help = "Namenode of the HDFS cluster", 57 .category = FIO_OPT_C_ENGINE, 58 .group = FIO_OPT_G_HDFS, 59 }, 60 { 61 .name = "port", 62 .lname = "hdfs namenode port", 63 .type = FIO_OPT_INT, 64 .off1 = offsetof(struct hdfsio_options, port), 65 .def = "9000", 66 .minval = 1, 67 .maxval = 65535, 68 .help = "Port used by the HDFS cluster namenode", 69 .category = FIO_OPT_C_ENGINE, 70 .group = FIO_OPT_G_HDFS, 71 }, 72 { 73 .name = "hdfsdirectory", 74 .lname = "hfds directory", 75 .type = FIO_OPT_STR_STORE, 76 .off1 = offsetof(struct hdfsio_options, directory), 77 .def = "/", 78 .help = "The HDFS directory where fio will create chuncks", 79 .category = FIO_OPT_C_ENGINE, 80 .group = FIO_OPT_G_HDFS, 81 }, 82 { 83 .name = "chunk_size", 84 .alias = "chunck_size", 85 .lname = "Chunk size", 86 .type = FIO_OPT_INT, 87 .off1 = offsetof(struct hdfsio_options, chunck_size), 88 .def = "1048576", 89 .help = "Size of individual chunck", 90 .category = FIO_OPT_C_ENGINE, 91 .group = FIO_OPT_G_HDFS, 92 }, 93 { 94 .name = "single_instance", 95 .lname = "Single Instance", 96 .type = FIO_OPT_BOOL, 97 .off1 = offsetof(struct hdfsio_options, single_instance), 98 .def = "1", 99 .help = "Use a single instance", 100 .category = FIO_OPT_C_ENGINE, 101 .group = FIO_OPT_G_HDFS, 102 }, 103 { 104 .name = "hdfs_use_direct", 105 .lname = "HDFS Use Direct", 106 .type = FIO_OPT_BOOL, 107 .off1 = offsetof(struct hdfsio_options, use_direct), 108 .def = "0", 109 .help = "Use readDirect instead of hdfsRead", 110 .category = FIO_OPT_C_ENGINE, 111 .group = FIO_OPT_G_HDFS, 112 }, 113 { 114 .name = NULL, 115 }, 116 }; 117 118 119 static int get_chunck_name(char *dest, char *file_name, uint64_t chunk_id) { 120 return snprintf(dest, CHUNCK_NAME_LENGTH_MAX, "%s_%lu", file_name, chunk_id); 121 } 122 123 static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u) 124 { 125 struct hdfsio_options *options = td->eo; 126 struct hdfsio_data *hd = td->io_ops_data; 127 unsigned long f_id; 128 char fname[CHUNCK_NAME_LENGTH_MAX]; 129 int open_flags; 130 131 /* find out file id based on the offset generated by fio */ 132 f_id = floor(io_u->offset / options-> chunck_size); 133 134 if (f_id == hd->curr_file_id) { 135 /* file is already open */ 136 return 0; 137 } 138 139 if (hd->curr_file_id != -1) { 140 if ( hdfsCloseFile(hd->fs, hd->fp) == -1) { 141 log_err("hdfs: unable to close file: %s\n", strerror(errno)); 142 return errno; 143 } 144 hd->curr_file_id = -1; 145 } 146 147 if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_SYNC) { 148 open_flags = O_RDONLY; 149 } else if (io_u->ddir == DDIR_WRITE) { 150 open_flags = O_WRONLY; 151 } else { 152 log_err("hdfs: Invalid I/O Operation\n"); 153 return 0; 154 } 155 156 get_chunck_name(fname, io_u->file->file_name, f_id); 157 hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0, 158 options->chunck_size); 159 if(hd->fp == NULL) { 160 log_err("hdfs: unable to open file: %s: %d\n", fname, strerror(errno)); 161 return errno; 162 } 163 hd->curr_file_id = f_id; 164 165 return 0; 166 } 167 168 static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u) 169 { 170 struct hdfsio_data *hd = td->io_ops_data; 171 struct hdfsio_options *options = td->eo; 172 int ret; 173 unsigned long offset; 174 175 offset = io_u->offset % options->chunck_size; 176 177 if( (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) && 178 hdfsTell(hd->fs, hd->fp) != offset && hdfsSeek(hd->fs, hd->fp, offset) != 0 ) { 179 log_err("hdfs: seek failed: %s, are you doing random write smaller than chunck size ?\n", strerror(errno)); 180 io_u->error = errno; 181 return FIO_Q_COMPLETED; 182 }; 183 184 // do the IO 185 if (io_u->ddir == DDIR_READ) { 186 if (options->use_direct) { 187 ret = readDirect(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); 188 } else { 189 ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); 190 } 191 } else if (io_u->ddir == DDIR_WRITE) { 192 ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf, 193 io_u->xfer_buflen); 194 } else if (io_u->ddir == DDIR_SYNC) { 195 ret = hdfsFlush(hd->fs, hd->fp); 196 } else { 197 log_err("hdfs: Invalid I/O Operation: %d\n", io_u->ddir); 198 ret = EINVAL; 199 } 200 201 // Check if the IO went fine, or is incomplete 202 if (ret != (int)io_u->xfer_buflen) { 203 if (ret >= 0) { 204 io_u->resid = io_u->xfer_buflen - ret; 205 io_u->error = 0; 206 return FIO_Q_COMPLETED; 207 } else { 208 io_u->error = errno; 209 } 210 } 211 212 if (io_u->error) 213 td_verror(td, io_u->error, "xfer"); 214 215 return FIO_Q_COMPLETED; 216 } 217 218 int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f) 219 { 220 if (td->o.odirect) { 221 td->error = EINVAL; 222 return 0; 223 } 224 225 return 0; 226 } 227 228 int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f) 229 { 230 struct hdfsio_data *hd = td->io_ops_data; 231 232 if (hd->curr_file_id != -1) { 233 if ( hdfsCloseFile(hd->fs, hd->fp) == -1) { 234 log_err("hdfs: unable to close file: %s\n", strerror(errno)); 235 return errno; 236 } 237 hd->curr_file_id = -1; 238 } 239 return 0; 240 } 241 242 static int fio_hdfsio_init(struct thread_data *td) 243 { 244 struct hdfsio_options *options = td->eo; 245 struct hdfsio_data *hd = td->io_ops_data; 246 struct fio_file *f; 247 uint64_t j,k; 248 int i, failure = 0; 249 uint8_t buffer[CHUNCK_CREATION_BUFFER_SIZE]; 250 uint64_t bytes_left; 251 char fname[CHUNCK_NAME_LENGTH_MAX]; 252 hdfsFile fp; 253 hdfsFileInfo *fi; 254 tOffset fi_size; 255 256 for_each_file(td, f, i) { 257 k = 0; 258 for(j=0; j < f->real_file_size; j += options->chunck_size) { 259 get_chunck_name(fname, f->file_name, k++); 260 fi = hdfsGetPathInfo(hd->fs, fname); 261 fi_size = fi ? fi->mSize : 0; 262 // fill exist and is big enough, nothing to do 263 if( fi && fi_size >= options->chunck_size) { 264 continue; 265 } 266 fp = hdfsOpenFile(hd->fs, fname, O_WRONLY, 0, 0, 267 options->chunck_size); 268 if(fp == NULL) { 269 failure = errno; 270 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); 271 break; 272 } 273 bytes_left = options->chunck_size; 274 memset(buffer, 0, CHUNCK_CREATION_BUFFER_SIZE); 275 while( bytes_left > CHUNCK_CREATION_BUFFER_SIZE) { 276 if( hdfsWrite(hd->fs, fp, buffer, CHUNCK_CREATION_BUFFER_SIZE) 277 != CHUNCK_CREATION_BUFFER_SIZE) { 278 failure = errno; 279 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); 280 break; 281 }; 282 bytes_left -= CHUNCK_CREATION_BUFFER_SIZE; 283 } 284 if(bytes_left > 0) { 285 if( hdfsWrite(hd->fs, fp, buffer, bytes_left) 286 != bytes_left) { 287 failure = errno; 288 break; 289 }; 290 } 291 if( hdfsCloseFile(hd->fs, fp) != 0) { 292 failure = errno; 293 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); 294 break; 295 } 296 } 297 if(failure) { 298 break; 299 } 300 } 301 302 if( !failure ) { 303 fio_file_set_size_known(f); 304 } 305 306 return failure; 307 } 308 309 static int fio_hdfsio_setup(struct thread_data *td) 310 { 311 struct hdfsio_data *hd; 312 struct fio_file *f; 313 int i; 314 uint64_t file_size, total_file_size; 315 316 if (!td->io_ops_data) { 317 hd = malloc(sizeof(*hd)); 318 memset(hd, 0, sizeof(*hd)); 319 320 hd->curr_file_id = -1; 321 322 td->io_ops_data = hd; 323 } 324 325 total_file_size = 0; 326 file_size = 0; 327 328 for_each_file(td, f, i) { 329 if(!td->o.file_size_low) { 330 file_size = floor(td->o.size / td->o.nr_files); 331 total_file_size += file_size; 332 } 333 else if (td->o.file_size_low == td->o.file_size_high) 334 file_size = td->o.file_size_low; 335 else { 336 file_size = get_rand_file_size(td); 337 } 338 f->real_file_size = file_size; 339 } 340 /* If the size doesn't divide nicely with the chunck size, 341 * make the last files bigger. 342 * Used only if filesize was not explicitely given 343 */ 344 if (!td->o.file_size_low && total_file_size < td->o.size) { 345 f->real_file_size += (td->o.size - total_file_size); 346 } 347 348 return 0; 349 } 350 351 static int fio_hdfsio_io_u_init(struct thread_data *td, struct io_u *io_u) 352 { 353 struct hdfsio_data *hd = td->io_ops_data; 354 struct hdfsio_options *options = td->eo; 355 int failure; 356 struct hdfsBuilder *bld; 357 358 if (options->host == NULL || options->port == 0) { 359 log_err("hdfs: server not defined\n"); 360 return EINVAL; 361 } 362 363 bld = hdfsNewBuilder(); 364 if (!bld) { 365 failure = errno; 366 log_err("hdfs: unable to allocate connect builder\n"); 367 return failure; 368 } 369 hdfsBuilderSetNameNode(bld, options->host); 370 hdfsBuilderSetNameNodePort(bld, options->port); 371 if(! options->single_instance) { 372 hdfsBuilderSetForceNewInstance(bld); 373 } 374 hd->fs = hdfsBuilderConnect(bld); 375 376 /* hdfsSetWorkingDirectory succeed on non existend directory */ 377 if (hdfsExists(hd->fs, options->directory) < 0 || hdfsSetWorkingDirectory(hd->fs, options->directory) < 0) { 378 failure = errno; 379 log_err("hdfs: invalid working directory %s: %s\n", options->directory, strerror(errno)); 380 return failure; 381 } 382 383 return 0; 384 } 385 386 static void fio_hdfsio_io_u_free(struct thread_data *td, struct io_u *io_u) 387 { 388 struct hdfsio_data *hd = td->io_ops_data; 389 390 if (hd->fs && hdfsDisconnect(hd->fs) < 0) { 391 log_err("hdfs: disconnect failed: %d\n", errno); 392 } 393 } 394 395 static struct ioengine_ops ioengine_hdfs = { 396 .name = "libhdfs", 397 .version = FIO_IOOPS_VERSION, 398 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_NODISKUTIL, 399 .setup = fio_hdfsio_setup, 400 .init = fio_hdfsio_init, 401 .prep = fio_hdfsio_prep, 402 .queue = fio_hdfsio_queue, 403 .open_file = fio_hdfsio_open_file, 404 .close_file = fio_hdfsio_close_file, 405 .io_u_init = fio_hdfsio_io_u_init, 406 .io_u_free = fio_hdfsio_io_u_free, 407 .option_struct_size = sizeof(struct hdfsio_options), 408 .options = options, 409 }; 410 411 412 static void fio_init fio_hdfsio_register(void) 413 { 414 register_ioengine(&ioengine_hdfs); 415 } 416 417 static void fio_exit fio_hdfsio_unregister(void) 418 { 419 unregister_ioengine(&ioengine_hdfs); 420 } 421