1 /* 2 * rbd engine 3 * 4 * IO engine using Ceph's librbd to test RADOS Block Devices. 5 * 6 */ 7 8 #include <rbd/librbd.h> 9 10 #include "../fio.h" 11 12 struct fio_rbd_iou { 13 struct io_u *io_u; 14 int io_complete; 15 }; 16 17 struct rbd_data { 18 rados_t cluster; 19 rados_ioctx_t io_ctx; 20 rbd_image_t image; 21 struct io_u **aio_events; 22 }; 23 24 struct rbd_options { 25 struct thread_data *td; 26 char *rbd_name; 27 char *pool_name; 28 char *client_name; 29 }; 30 31 static struct fio_option options[] = { 32 { 33 .name = "rbdname", 34 .lname = "rbd engine rbdname", 35 .type = FIO_OPT_STR_STORE, 36 .help = "RBD name for RBD engine", 37 .off1 = offsetof(struct rbd_options, rbd_name), 38 .category = FIO_OPT_C_ENGINE, 39 .group = FIO_OPT_G_RBD, 40 }, 41 { 42 .name = "pool", 43 .lname = "rbd engine pool", 44 .type = FIO_OPT_STR_STORE, 45 .help = "Name of the pool hosting the RBD for the RBD engine", 46 .off1 = offsetof(struct rbd_options, pool_name), 47 .category = FIO_OPT_C_ENGINE, 48 .group = FIO_OPT_G_RBD, 49 }, 50 { 51 .name = "clientname", 52 .lname = "rbd engine clientname", 53 .type = FIO_OPT_STR_STORE, 54 .help = "Name of the ceph client to access the RBD for the RBD engine", 55 .off1 = offsetof(struct rbd_options, client_name), 56 .category = FIO_OPT_C_ENGINE, 57 .group = FIO_OPT_G_RBD, 58 }, 59 { 60 .name = NULL, 61 }, 62 }; 63 64 static int _fio_setup_rbd_data(struct thread_data *td, 65 struct rbd_data **rbd_data_ptr) 66 { 67 struct rbd_data *rbd_data; 68 69 if (td->io_ops->data) 70 return 0; 71 72 rbd_data = malloc(sizeof(struct rbd_data)); 73 if (!rbd_data) 74 goto failed; 75 76 memset(rbd_data, 0, sizeof(struct rbd_data)); 77 78 rbd_data->aio_events = malloc(td->o.iodepth * sizeof(struct io_u *)); 79 if (!rbd_data->aio_events) 80 goto failed; 81 82 memset(rbd_data->aio_events, 0, td->o.iodepth * sizeof(struct io_u *)); 83 84 *rbd_data_ptr = rbd_data; 85 86 return 0; 87 88 failed: 89 return 1; 90 91 } 92 93 static int _fio_rbd_connect(struct thread_data *td) 94 { 95 struct rbd_data *rbd_data = td->io_ops->data; 96 struct rbd_options *o = td->eo; 97 int r; 98 99 r = rados_create(&(rbd_data->cluster), o->client_name); 100 if (r < 0) { 101 log_err("rados_create failed.\n"); 102 goto failed_early; 103 } 104 105 r = rados_conf_read_file(rbd_data->cluster, NULL); 106 if (r < 0) { 107 log_err("rados_conf_read_file failed.\n"); 108 goto failed_early; 109 } 110 111 r = rados_connect(rbd_data->cluster); 112 if (r < 0) { 113 log_err("rados_connect failed.\n"); 114 goto failed_shutdown; 115 } 116 117 r = rados_ioctx_create(rbd_data->cluster, o->pool_name, 118 &(rbd_data->io_ctx)); 119 if (r < 0) { 120 log_err("rados_ioctx_create failed.\n"); 121 goto failed_shutdown; 122 } 123 124 r = rbd_open(rbd_data->io_ctx, o->rbd_name, &(rbd_data->image), 125 NULL /*snap */ ); 126 if (r < 0) { 127 log_err("rbd_open failed.\n"); 128 goto failed_open; 129 } 130 return 0; 131 132 failed_open: 133 rados_ioctx_destroy(rbd_data->io_ctx); 134 failed_shutdown: 135 rados_shutdown(rbd_data->cluster); 136 failed_early: 137 return 1; 138 } 139 140 static void _fio_rbd_disconnect(struct rbd_data *rbd_data) 141 { 142 if (!rbd_data) 143 return; 144 145 /* shutdown everything */ 146 if (rbd_data->image) { 147 rbd_close(rbd_data->image); 148 rbd_data->image = NULL; 149 } 150 151 if (rbd_data->io_ctx) { 152 rados_ioctx_destroy(rbd_data->io_ctx); 153 rbd_data->io_ctx = NULL; 154 } 155 156 if (rbd_data->cluster) { 157 rados_shutdown(rbd_data->cluster); 158 rbd_data->cluster = NULL; 159 } 160 } 161 162 static void _fio_rbd_finish_write_aiocb(rbd_completion_t comp, void *data) 163 { 164 struct io_u *io_u = (struct io_u *)data; 165 struct fio_rbd_iou *fio_rbd_iou = 166 (struct fio_rbd_iou *)io_u->engine_data; 167 168 fio_rbd_iou->io_complete = 1; 169 170 /* if write needs to be verified - we should not release comp here 171 without fetching the result */ 172 173 rbd_aio_release(comp); 174 /* TODO handle error */ 175 176 return; 177 } 178 179 static void _fio_rbd_finish_read_aiocb(rbd_completion_t comp, void *data) 180 { 181 struct io_u *io_u = (struct io_u *)data; 182 struct fio_rbd_iou *fio_rbd_iou = 183 (struct fio_rbd_iou *)io_u->engine_data; 184 185 fio_rbd_iou->io_complete = 1; 186 187 /* if read needs to be verified - we should not release comp here 188 without fetching the result */ 189 rbd_aio_release(comp); 190 191 /* TODO handle error */ 192 193 return; 194 } 195 196 static struct io_u *fio_rbd_event(struct thread_data *td, int event) 197 { 198 struct rbd_data *rbd_data = td->io_ops->data; 199 200 return rbd_data->aio_events[event]; 201 } 202 203 static int fio_rbd_getevents(struct thread_data *td, unsigned int min, 204 unsigned int max, struct timespec *t) 205 { 206 struct rbd_data *rbd_data = td->io_ops->data; 207 unsigned int events = 0; 208 struct io_u *io_u; 209 int i; 210 struct fio_rbd_iou *fov; 211 212 do { 213 io_u_qiter(&td->io_u_all, io_u, i) { 214 if (!(io_u->flags & IO_U_F_FLIGHT)) 215 continue; 216 217 fov = (struct fio_rbd_iou *)io_u->engine_data; 218 219 if (fov->io_complete) { 220 fov->io_complete = 0; 221 rbd_data->aio_events[events] = io_u; 222 events++; 223 } 224 225 } 226 if (events < min) 227 usleep(100); 228 else 229 break; 230 231 } while (1); 232 233 return events; 234 } 235 236 static int fio_rbd_queue(struct thread_data *td, struct io_u *io_u) 237 { 238 int r = -1; 239 struct rbd_data *rbd_data = td->io_ops->data; 240 rbd_completion_t comp; 241 242 fio_ro_check(td, io_u); 243 244 if (io_u->ddir == DDIR_WRITE) { 245 r = rbd_aio_create_completion(io_u, 246 (rbd_callback_t) 247 _fio_rbd_finish_write_aiocb, 248 &comp); 249 if (r < 0) { 250 log_err 251 ("rbd_aio_create_completion for DDIR_WRITE failed.\n"); 252 goto failed; 253 } 254 255 r = rbd_aio_write(rbd_data->image, io_u->offset, 256 io_u->xfer_buflen, io_u->xfer_buf, comp); 257 if (r < 0) { 258 log_err("rbd_aio_write failed.\n"); 259 goto failed; 260 } 261 262 } else if (io_u->ddir == DDIR_READ) { 263 r = rbd_aio_create_completion(io_u, 264 (rbd_callback_t) 265 _fio_rbd_finish_read_aiocb, 266 &comp); 267 if (r < 0) { 268 log_err 269 ("rbd_aio_create_completion for DDIR_READ failed.\n"); 270 goto failed; 271 } 272 273 r = rbd_aio_read(rbd_data->image, io_u->offset, 274 io_u->xfer_buflen, io_u->xfer_buf, comp); 275 276 if (r < 0) { 277 log_err("rbd_aio_read failed.\n"); 278 goto failed; 279 } 280 281 } else if (io_u->ddir == DDIR_SYNC) { 282 r = rbd_flush(rbd_data->image); 283 if (r < 0) { 284 log_err("rbd_flush failed.\n"); 285 goto failed; 286 } 287 288 return FIO_Q_COMPLETED; 289 } else { 290 dprint(FD_IO, "%s: Warning: unhandled ddir: %d\n", __func__, 291 io_u->ddir); 292 return FIO_Q_COMPLETED; 293 } 294 295 return FIO_Q_QUEUED; 296 297 failed: 298 io_u->error = r; 299 td_verror(td, io_u->error, "xfer"); 300 return FIO_Q_COMPLETED; 301 } 302 303 static int fio_rbd_init(struct thread_data *td) 304 { 305 int r; 306 307 r = _fio_rbd_connect(td); 308 if (r) { 309 log_err("fio_rbd_connect failed, return code: %d .\n", r); 310 goto failed; 311 } 312 313 return 0; 314 315 failed: 316 return 1; 317 318 } 319 320 static void fio_rbd_cleanup(struct thread_data *td) 321 { 322 struct rbd_data *rbd_data = td->io_ops->data; 323 324 if (rbd_data) { 325 _fio_rbd_disconnect(rbd_data); 326 free(rbd_data->aio_events); 327 free(rbd_data); 328 } 329 330 } 331 332 static int fio_rbd_setup(struct thread_data *td) 333 { 334 int r = 0; 335 rbd_image_info_t info; 336 struct fio_file *f; 337 struct rbd_data *rbd_data = NULL; 338 int major, minor, extra; 339 340 /* log version of librbd. No cluster connection required. */ 341 rbd_version(&major, &minor, &extra); 342 log_info("rbd engine: RBD version: %d.%d.%d\n", major, minor, extra); 343 344 /* allocate engine specific structure to deal with librbd. */ 345 r = _fio_setup_rbd_data(td, &rbd_data); 346 if (r) { 347 log_err("fio_setup_rbd_data failed.\n"); 348 goto cleanup; 349 } 350 td->io_ops->data = rbd_data; 351 352 /* librbd does not allow us to run first in the main thread and later in a 353 * fork child. It needs to be the same process context all the time. 354 */ 355 td->o.use_thread = 1; 356 357 /* connect in the main thread to determine to determine 358 * the size of the given RADOS block device. And disconnect 359 * later on. 360 */ 361 r = _fio_rbd_connect(td); 362 if (r) { 363 log_err("fio_rbd_connect failed.\n"); 364 goto cleanup; 365 } 366 367 /* get size of the RADOS block device */ 368 r = rbd_stat(rbd_data->image, &info, sizeof(info)); 369 if (r < 0) { 370 log_err("rbd_status failed.\n"); 371 goto disconnect; 372 } 373 dprint(FD_IO, "rbd-engine: image size: %lu\n", info.size); 374 375 /* taken from "net" engine. Pretend we deal with files, 376 * even if we do not have any ideas about files. 377 * The size of the RBD is set instead of a artificial file. 378 */ 379 if (!td->files_index) { 380 add_file(td, td->o.filename ? : "rbd", 0, 0); 381 td->o.nr_files = td->o.nr_files ? : 1; 382 td->o.open_files++; 383 } 384 f = td->files[0]; 385 f->real_file_size = info.size; 386 387 /* disconnect, then we were only connected to determine 388 * the size of the RBD. 389 */ 390 _fio_rbd_disconnect(rbd_data); 391 return 0; 392 393 disconnect: 394 _fio_rbd_disconnect(rbd_data); 395 cleanup: 396 fio_rbd_cleanup(td); 397 return r; 398 } 399 400 static int fio_rbd_open(struct thread_data *td, struct fio_file *f) 401 { 402 return 0; 403 } 404 405 static void fio_rbd_io_u_free(struct thread_data *td, struct io_u *io_u) 406 { 407 struct fio_rbd_iou *o = io_u->engine_data; 408 409 if (o) { 410 io_u->engine_data = NULL; 411 free(o); 412 } 413 } 414 415 static int fio_rbd_io_u_init(struct thread_data *td, struct io_u *io_u) 416 { 417 struct fio_rbd_iou *o; 418 419 o = malloc(sizeof(*o)); 420 o->io_complete = 0; 421 o->io_u = io_u; 422 io_u->engine_data = o; 423 return 0; 424 } 425 426 static struct ioengine_ops ioengine = { 427 .name = "rbd", 428 .version = FIO_IOOPS_VERSION, 429 .setup = fio_rbd_setup, 430 .init = fio_rbd_init, 431 .queue = fio_rbd_queue, 432 .getevents = fio_rbd_getevents, 433 .event = fio_rbd_event, 434 .cleanup = fio_rbd_cleanup, 435 .open_file = fio_rbd_open, 436 .options = options, 437 .io_u_init = fio_rbd_io_u_init, 438 .io_u_free = fio_rbd_io_u_free, 439 .option_struct_size = sizeof(struct rbd_options), 440 }; 441 442 static void fio_init fio_rbd_register(void) 443 { 444 register_ioengine(&ioengine); 445 } 446 447 static void fio_exit fio_rbd_unregister(void) 448 { 449 unregister_ioengine(&ioengine); 450 } 451