1 /* 2 * rbd engine 3 * 4 * IO engine using Ceph's librbd to test RADOS Block Devices. 5 * 6 */ 7 8 #include <rbd/librbd.h> 9 10 #include "../fio.h" 11 12 struct fio_rbd_iou { 13 struct io_u *io_u; 14 rbd_completion_t completion; 15 int io_seen; 16 int io_complete; 17 }; 18 19 struct rbd_data { 20 rados_t cluster; 21 rados_ioctx_t io_ctx; 22 rbd_image_t image; 23 struct io_u **aio_events; 24 struct io_u **sort_events; 25 }; 26 27 struct rbd_options { 28 void *pad; 29 char *rbd_name; 30 char *pool_name; 31 char *client_name; 32 int busy_poll; 33 }; 34 35 static struct fio_option options[] = { 36 { 37 .name = "rbdname", 38 .lname = "rbd engine rbdname", 39 .type = FIO_OPT_STR_STORE, 40 .help = "RBD name for RBD engine", 41 .off1 = offsetof(struct rbd_options, rbd_name), 42 .category = FIO_OPT_C_ENGINE, 43 .group = FIO_OPT_G_RBD, 44 }, 45 { 46 .name = "pool", 47 .lname = "rbd engine pool", 48 .type = FIO_OPT_STR_STORE, 49 .help = "Name of the pool hosting the RBD for the RBD engine", 50 .off1 = offsetof(struct rbd_options, pool_name), 51 .category = FIO_OPT_C_ENGINE, 52 .group = FIO_OPT_G_RBD, 53 }, 54 { 55 .name = "clientname", 56 .lname = "rbd engine clientname", 57 .type = FIO_OPT_STR_STORE, 58 .help = "Name of the ceph client to access the RBD for the RBD engine", 59 .off1 = offsetof(struct rbd_options, client_name), 60 .category = FIO_OPT_C_ENGINE, 61 .group = FIO_OPT_G_RBD, 62 }, 63 { 64 .name = "busy_poll", 65 .lname = "Busy poll", 66 .type = FIO_OPT_BOOL, 67 .help = "Busy poll for completions instead of sleeping", 68 .off1 = offsetof(struct rbd_options, busy_poll), 69 .def = "0", 70 .category = FIO_OPT_C_ENGINE, 71 .group = FIO_OPT_G_RBD, 72 }, 73 { 74 .name = NULL, 75 }, 76 }; 77 78 static int _fio_setup_rbd_data(struct thread_data *td, 79 struct rbd_data **rbd_data_ptr) 80 { 81 struct rbd_data *rbd; 82 83 if (td->io_ops->data) 84 return 0; 85 86 rbd = calloc(1, sizeof(struct rbd_data)); 87 if (!rbd) 88 goto failed; 89 90 rbd->aio_events = calloc(td->o.iodepth, sizeof(struct io_u *)); 91 if (!rbd->aio_events) 92 goto failed; 93 94 rbd->sort_events = calloc(td->o.iodepth, sizeof(struct io_u *)); 95 if (!rbd->sort_events) 96 goto failed; 97 98 *rbd_data_ptr = rbd; 99 return 0; 100 101 failed: 102 if (rbd) 103 free(rbd); 104 return 1; 105 106 } 107 108 static int _fio_rbd_connect(struct thread_data *td) 109 { 110 struct rbd_data *rbd = td->io_ops->data; 111 struct rbd_options *o = td->eo; 112 int r; 113 114 r = rados_create(&rbd->cluster, o->client_name); 115 if (r < 0) { 116 log_err("rados_create failed.\n"); 117 goto failed_early; 118 } 119 120 r = rados_conf_read_file(rbd->cluster, NULL); 121 if (r < 0) { 122 log_err("rados_conf_read_file failed.\n"); 123 goto failed_early; 124 } 125 126 r = rados_connect(rbd->cluster); 127 if (r < 0) { 128 log_err("rados_connect failed.\n"); 129 goto failed_shutdown; 130 } 131 132 r = rados_ioctx_create(rbd->cluster, o->pool_name, &rbd->io_ctx); 133 if (r < 0) { 134 log_err("rados_ioctx_create failed.\n"); 135 goto failed_shutdown; 136 } 137 138 r = rbd_open(rbd->io_ctx, o->rbd_name, &rbd->image, NULL /*snap */ ); 139 if (r < 0) { 140 log_err("rbd_open failed.\n"); 141 goto failed_open; 142 } 143 return 0; 144 145 failed_open: 146 rados_ioctx_destroy(rbd->io_ctx); 147 rbd->io_ctx = NULL; 148 failed_shutdown: 149 rados_shutdown(rbd->cluster); 150 rbd->cluster = NULL; 151 failed_early: 152 return 1; 153 } 154 155 static void _fio_rbd_disconnect(struct rbd_data *rbd) 156 { 157 if (!rbd) 158 return; 159 160 /* shutdown everything */ 161 if (rbd->image) { 162 rbd_close(rbd->image); 163 rbd->image = NULL; 164 } 165 166 if (rbd->io_ctx) { 167 rados_ioctx_destroy(rbd->io_ctx); 168 rbd->io_ctx = NULL; 169 } 170 171 if (rbd->cluster) { 172 rados_shutdown(rbd->cluster); 173 rbd->cluster = NULL; 174 } 175 } 176 177 static void _fio_rbd_finish_aiocb(rbd_completion_t comp, void *data) 178 { 179 struct fio_rbd_iou *fri = data; 180 struct io_u *io_u = fri->io_u; 181 ssize_t ret; 182 183 /* 184 * Looks like return value is 0 for success, or < 0 for 185 * a specific error. So we have to assume that it can't do 186 * partial completions. 187 */ 188 fri->io_complete = 1; 189 190 ret = rbd_aio_get_return_value(fri->completion); 191 if (ret < 0) { 192 io_u->error = ret; 193 io_u->resid = io_u->xfer_buflen; 194 } else 195 io_u->error = 0; 196 } 197 198 static struct io_u *fio_rbd_event(struct thread_data *td, int event) 199 { 200 struct rbd_data *rbd = td->io_ops->data; 201 202 return rbd->aio_events[event]; 203 } 204 205 static inline int fri_check_complete(struct rbd_data *rbd, struct io_u *io_u, 206 unsigned int *events) 207 { 208 struct fio_rbd_iou *fri = io_u->engine_data; 209 210 if (fri->io_complete) { 211 fri->io_seen = 1; 212 rbd->aio_events[*events] = io_u; 213 (*events)++; 214 215 rbd_aio_release(fri->completion); 216 return 1; 217 } 218 219 return 0; 220 } 221 222 static inline int rbd_io_u_seen(struct io_u *io_u) 223 { 224 struct fio_rbd_iou *fri = io_u->engine_data; 225 226 return fri->io_seen; 227 } 228 229 static void rbd_io_u_wait_complete(struct io_u *io_u) 230 { 231 struct fio_rbd_iou *fri = io_u->engine_data; 232 233 rbd_aio_wait_for_complete(fri->completion); 234 } 235 236 static int rbd_io_u_cmp(const void *p1, const void *p2) 237 { 238 const struct io_u **a = (const struct io_u **) p1; 239 const struct io_u **b = (const struct io_u **) p2; 240 uint64_t at, bt; 241 242 at = utime_since_now(&(*a)->start_time); 243 bt = utime_since_now(&(*b)->start_time); 244 245 if (at < bt) 246 return -1; 247 else if (at == bt) 248 return 0; 249 else 250 return 1; 251 } 252 253 static int rbd_iter_events(struct thread_data *td, unsigned int *events, 254 unsigned int min_evts, int wait) 255 { 256 struct rbd_data *rbd = td->io_ops->data; 257 unsigned int this_events = 0; 258 struct io_u *io_u; 259 int i, sidx; 260 261 sidx = 0; 262 io_u_qiter(&td->io_u_all, io_u, i) { 263 if (!(io_u->flags & IO_U_F_FLIGHT)) 264 continue; 265 if (rbd_io_u_seen(io_u)) 266 continue; 267 268 if (fri_check_complete(rbd, io_u, events)) 269 this_events++; 270 else if (wait) 271 rbd->sort_events[sidx++] = io_u; 272 } 273 274 if (!wait || !sidx) 275 return this_events; 276 277 /* 278 * Sort events, oldest issue first, then wait on as many as we 279 * need in order of age. If we have enough events, stop waiting, 280 * and just check if any of the older ones are done. 281 */ 282 if (sidx > 1) 283 qsort(rbd->sort_events, sidx, sizeof(struct io_u *), rbd_io_u_cmp); 284 285 for (i = 0; i < sidx; i++) { 286 io_u = rbd->sort_events[i]; 287 288 if (fri_check_complete(rbd, io_u, events)) { 289 this_events++; 290 continue; 291 } 292 293 /* 294 * Stop waiting when we have enough, but continue checking 295 * all pending IOs if they are complete. 296 */ 297 if (*events >= min_evts) 298 continue; 299 300 rbd_io_u_wait_complete(io_u); 301 302 if (fri_check_complete(rbd, io_u, events)) 303 this_events++; 304 } 305 306 return this_events; 307 } 308 309 static int fio_rbd_getevents(struct thread_data *td, unsigned int min, 310 unsigned int max, const struct timespec *t) 311 { 312 unsigned int this_events, events = 0; 313 struct rbd_options *o = td->eo; 314 int wait = 0; 315 316 do { 317 this_events = rbd_iter_events(td, &events, min, wait); 318 319 if (events >= min) 320 break; 321 if (this_events) 322 continue; 323 324 if (!o->busy_poll) 325 wait = 1; 326 else 327 nop; 328 } while (1); 329 330 return events; 331 } 332 333 static int fio_rbd_queue(struct thread_data *td, struct io_u *io_u) 334 { 335 struct rbd_data *rbd = td->io_ops->data; 336 struct fio_rbd_iou *fri = io_u->engine_data; 337 int r = -1; 338 339 fio_ro_check(td, io_u); 340 341 fri->io_seen = 0; 342 fri->io_complete = 0; 343 344 r = rbd_aio_create_completion(fri, _fio_rbd_finish_aiocb, 345 &fri->completion); 346 if (r < 0) { 347 log_err("rbd_aio_create_completion failed.\n"); 348 goto failed; 349 } 350 351 if (io_u->ddir == DDIR_WRITE) { 352 r = rbd_aio_write(rbd->image, io_u->offset, io_u->xfer_buflen, 353 io_u->xfer_buf, fri->completion); 354 if (r < 0) { 355 log_err("rbd_aio_write failed.\n"); 356 goto failed_comp; 357 } 358 359 } else if (io_u->ddir == DDIR_READ) { 360 r = rbd_aio_read(rbd->image, io_u->offset, io_u->xfer_buflen, 361 io_u->xfer_buf, fri->completion); 362 363 if (r < 0) { 364 log_err("rbd_aio_read failed.\n"); 365 goto failed_comp; 366 } 367 } else if (io_u->ddir == DDIR_TRIM) { 368 r = rbd_aio_discard(rbd->image, io_u->offset, 369 io_u->xfer_buflen, fri->completion); 370 if (r < 0) { 371 log_err("rbd_aio_discard failed.\n"); 372 goto failed_comp; 373 } 374 } else if (io_u->ddir == DDIR_SYNC) { 375 r = rbd_aio_flush(rbd->image, fri->completion); 376 if (r < 0) { 377 log_err("rbd_flush failed.\n"); 378 goto failed_comp; 379 } 380 } else { 381 dprint(FD_IO, "%s: Warning: unhandled ddir: %d\n", __func__, 382 io_u->ddir); 383 goto failed_comp; 384 } 385 386 return FIO_Q_QUEUED; 387 failed_comp: 388 rbd_aio_release(fri->completion); 389 failed: 390 io_u->error = r; 391 td_verror(td, io_u->error, "xfer"); 392 return FIO_Q_COMPLETED; 393 } 394 395 static int fio_rbd_init(struct thread_data *td) 396 { 397 int r; 398 399 r = _fio_rbd_connect(td); 400 if (r) { 401 log_err("fio_rbd_connect failed, return code: %d .\n", r); 402 goto failed; 403 } 404 405 return 0; 406 407 failed: 408 return 1; 409 } 410 411 static void fio_rbd_cleanup(struct thread_data *td) 412 { 413 struct rbd_data *rbd = td->io_ops->data; 414 415 if (rbd) { 416 _fio_rbd_disconnect(rbd); 417 free(rbd->aio_events); 418 free(rbd->sort_events); 419 free(rbd); 420 } 421 } 422 423 static int fio_rbd_setup(struct thread_data *td) 424 { 425 rbd_image_info_t info; 426 struct fio_file *f; 427 struct rbd_data *rbd = NULL; 428 int major, minor, extra; 429 int r; 430 431 /* log version of librbd. No cluster connection required. */ 432 rbd_version(&major, &minor, &extra); 433 log_info("rbd engine: RBD version: %d.%d.%d\n", major, minor, extra); 434 435 /* allocate engine specific structure to deal with librbd. */ 436 r = _fio_setup_rbd_data(td, &rbd); 437 if (r) { 438 log_err("fio_setup_rbd_data failed.\n"); 439 goto cleanup; 440 } 441 td->io_ops->data = rbd; 442 443 /* librbd does not allow us to run first in the main thread and later 444 * in a fork child. It needs to be the same process context all the 445 * time. 446 */ 447 td->o.use_thread = 1; 448 449 /* connect in the main thread to determine to determine 450 * the size of the given RADOS block device. And disconnect 451 * later on. 452 */ 453 r = _fio_rbd_connect(td); 454 if (r) { 455 log_err("fio_rbd_connect failed.\n"); 456 goto cleanup; 457 } 458 459 /* get size of the RADOS block device */ 460 r = rbd_stat(rbd->image, &info, sizeof(info)); 461 if (r < 0) { 462 log_err("rbd_status failed.\n"); 463 goto disconnect; 464 } 465 dprint(FD_IO, "rbd-engine: image size: %lu\n", info.size); 466 467 /* taken from "net" engine. Pretend we deal with files, 468 * even if we do not have any ideas about files. 469 * The size of the RBD is set instead of a artificial file. 470 */ 471 if (!td->files_index) { 472 add_file(td, td->o.filename ? : "rbd", 0, 0); 473 td->o.nr_files = td->o.nr_files ? : 1; 474 td->o.open_files++; 475 } 476 f = td->files[0]; 477 f->real_file_size = info.size; 478 479 /* disconnect, then we were only connected to determine 480 * the size of the RBD. 481 */ 482 _fio_rbd_disconnect(rbd); 483 return 0; 484 485 disconnect: 486 _fio_rbd_disconnect(rbd); 487 cleanup: 488 fio_rbd_cleanup(td); 489 return r; 490 } 491 492 static int fio_rbd_open(struct thread_data *td, struct fio_file *f) 493 { 494 return 0; 495 } 496 497 static int fio_rbd_invalidate(struct thread_data *td, struct fio_file *f) 498 { 499 #if defined(CONFIG_RBD_INVAL) 500 struct rbd_data *rbd = td->io_ops->data; 501 502 return rbd_invalidate_cache(rbd->image); 503 #else 504 return 0; 505 #endif 506 } 507 508 static void fio_rbd_io_u_free(struct thread_data *td, struct io_u *io_u) 509 { 510 struct fio_rbd_iou *fri = io_u->engine_data; 511 512 if (fri) { 513 io_u->engine_data = NULL; 514 free(fri); 515 } 516 } 517 518 static int fio_rbd_io_u_init(struct thread_data *td, struct io_u *io_u) 519 { 520 struct fio_rbd_iou *fri; 521 522 fri = calloc(1, sizeof(*fri)); 523 fri->io_u = io_u; 524 io_u->engine_data = fri; 525 return 0; 526 } 527 528 static struct ioengine_ops ioengine = { 529 .name = "rbd", 530 .version = FIO_IOOPS_VERSION, 531 .setup = fio_rbd_setup, 532 .init = fio_rbd_init, 533 .queue = fio_rbd_queue, 534 .getevents = fio_rbd_getevents, 535 .event = fio_rbd_event, 536 .cleanup = fio_rbd_cleanup, 537 .open_file = fio_rbd_open, 538 .invalidate = fio_rbd_invalidate, 539 .options = options, 540 .io_u_init = fio_rbd_io_u_init, 541 .io_u_free = fio_rbd_io_u_free, 542 .option_struct_size = sizeof(struct rbd_options), 543 }; 544 545 static void fio_init fio_rbd_register(void) 546 { 547 register_ioengine(&ioengine); 548 } 549 550 static void fio_exit fio_rbd_unregister(void) 551 { 552 unregister_ioengine(&ioengine); 553 } 554