Home | History | Annotate | Download | only in engines
      1 /*
      2  * RDMA I/O engine
      3  *
      4  * RDMA I/O engine based on the IB verbs and RDMA/CM user space libraries.
      5  * Supports both RDMA memory semantics and channel semantics
      6  *   for the InfiniBand, RoCE and iWARP protocols.
      7  *
      8  * You will need the Linux RDMA software installed, either
      9  * from your Linux distributor or directly from openfabrics.org:
     10  *
     11  * http://www.openfabrics.org/downloads/OFED/
     12  *
     13  * Exchanging steps of RDMA ioengine control messages:
     14  *	1. client side sends test mode (RDMA_WRITE/RDMA_READ/SEND)
     15  *	   to server side.
     16  *	2. server side parses test mode, and sends back confirmation
     17  *	   to client side. In RDMA WRITE/READ test, this confirmation
     18  *	   includes memory information, such as rkey, address.
     19  *	3. client side initiates test loop.
     20  *	4. In RDMA WRITE/READ test, client side sends a completion
     21  *	   notification to server side. Server side updates its
     22  *	   td->done as true.
     23  *
     24  */
     25 #include <stdio.h>
     26 #include <stdlib.h>
     27 #include <unistd.h>
     28 #include <errno.h>
     29 #include <assert.h>
     30 #include <netinet/in.h>
     31 #include <arpa/inet.h>
     32 #include <netdb.h>
     33 #include <sys/poll.h>
     34 #include <sys/types.h>
     35 #include <sys/socket.h>
     36 #include <sys/time.h>
     37 #include <sys/resource.h>
     38 
     39 #include <pthread.h>
     40 #include <inttypes.h>
     41 
     42 #include "../fio.h"
     43 #include "../hash.h"
     44 
     45 #include <rdma/rdma_cma.h>
     46 #include <infiniband/arch.h>
     47 
     48 #define FIO_RDMA_MAX_IO_DEPTH    512
     49 
     50 enum rdma_io_mode {
     51 	FIO_RDMA_UNKNOWN = 0,
     52 	FIO_RDMA_MEM_WRITE,
     53 	FIO_RDMA_MEM_READ,
     54 	FIO_RDMA_CHA_SEND,
     55 	FIO_RDMA_CHA_RECV
     56 };
     57 
     58 struct remote_u {
     59 	uint64_t buf;
     60 	uint32_t rkey;
     61 	uint32_t size;
     62 };
     63 
     64 struct rdma_info_blk {
     65 	uint32_t mode;		/* channel semantic or memory semantic */
     66 	uint32_t nr;		/* client: io depth
     67 				   server: number of records for memory semantic
     68 				 */
     69 	struct remote_u rmt_us[FIO_RDMA_MAX_IO_DEPTH];
     70 };
     71 
     72 struct rdma_io_u_data {
     73 	uint64_t wr_id;
     74 	struct ibv_send_wr sq_wr;
     75 	struct ibv_recv_wr rq_wr;
     76 	struct ibv_sge rdma_sgl;
     77 };
     78 
     79 struct rdmaio_data {
     80 	int is_client;
     81 	enum rdma_io_mode rdma_protocol;
     82 	char host[64];
     83 	struct sockaddr_in addr;
     84 
     85 	struct ibv_recv_wr rq_wr;
     86 	struct ibv_sge recv_sgl;
     87 	struct rdma_info_blk recv_buf;
     88 	struct ibv_mr *recv_mr;
     89 
     90 	struct ibv_send_wr sq_wr;
     91 	struct ibv_sge send_sgl;
     92 	struct rdma_info_blk send_buf;
     93 	struct ibv_mr *send_mr;
     94 
     95 	struct ibv_comp_channel *channel;
     96 	struct ibv_cq *cq;
     97 	struct ibv_pd *pd;
     98 	struct ibv_qp *qp;
     99 
    100 	pthread_t cmthread;
    101 	struct rdma_event_channel *cm_channel;
    102 	struct rdma_cm_id *cm_id;
    103 	struct rdma_cm_id *child_cm_id;
    104 
    105 	int cq_event_num;
    106 
    107 	struct remote_u *rmt_us;
    108 	int rmt_nr;
    109 	struct io_u **io_us_queued;
    110 	int io_u_queued_nr;
    111 	struct io_u **io_us_flight;
    112 	int io_u_flight_nr;
    113 	struct io_u **io_us_completed;
    114 	int io_u_completed_nr;
    115 
    116 	struct frand_state rand_state;
    117 };
    118 
    119 static int client_recv(struct thread_data *td, struct ibv_wc *wc)
    120 {
    121 	struct rdmaio_data *rd = td->io_ops->data;
    122 
    123 	if (wc->byte_len != sizeof(rd->recv_buf)) {
    124 		log_err("Received bogus data, size %d\n", wc->byte_len);
    125 		return 1;
    126 	}
    127 
    128 	/* store mr info for MEMORY semantic */
    129 	if ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE) ||
    130 	    (rd->rdma_protocol == FIO_RDMA_MEM_READ)) {
    131 		/* struct flist_head *entry; */
    132 		int i = 0;
    133 
    134 		rd->rmt_nr = ntohl(rd->recv_buf.nr);
    135 
    136 		for (i = 0; i < rd->rmt_nr; i++) {
    137 			rd->rmt_us[i].buf = ntohll(rd->recv_buf.rmt_us[i].buf);
    138 			rd->rmt_us[i].rkey = ntohl(rd->recv_buf.rmt_us[i].rkey);
    139 			rd->rmt_us[i].size = ntohl(rd->recv_buf.rmt_us[i].size);
    140 
    141 			dprint(FD_IO,
    142 			       "fio: Received rkey %x addr %" PRIx64
    143 			       " len %d from peer\n", rd->rmt_us[i].rkey,
    144 			       rd->rmt_us[i].buf, rd->rmt_us[i].size);
    145 		}
    146 	}
    147 
    148 	return 0;
    149 }
    150 
    151 static int server_recv(struct thread_data *td, struct ibv_wc *wc)
    152 {
    153 	struct rdmaio_data *rd = td->io_ops->data;
    154 
    155 	if (wc->wr_id == FIO_RDMA_MAX_IO_DEPTH) {
    156 		rd->rdma_protocol = ntohl(rd->recv_buf.mode);
    157 
    158 		/* CHANNEL semantic, do nothing */
    159 		if (rd->rdma_protocol == FIO_RDMA_CHA_SEND)
    160 			rd->rdma_protocol = FIO_RDMA_CHA_RECV;
    161 	}
    162 
    163 	return 0;
    164 }
    165 
    166 static int cq_event_handler(struct thread_data *td, enum ibv_wc_opcode opcode)
    167 {
    168 	struct rdmaio_data *rd = td->io_ops->data;
    169 	struct ibv_wc wc;
    170 	struct rdma_io_u_data *r_io_u_d;
    171 	int ret;
    172 	int compevnum = 0;
    173 	int i;
    174 
    175 	while ((ret = ibv_poll_cq(rd->cq, 1, &wc)) == 1) {
    176 		ret = 0;
    177 		compevnum++;
    178 
    179 		if (wc.status) {
    180 			log_err("fio: cq completion status %d(%s)\n",
    181 				wc.status, ibv_wc_status_str(wc.status));
    182 			return -1;
    183 		}
    184 
    185 		switch (wc.opcode) {
    186 
    187 		case IBV_WC_RECV:
    188 			if (rd->is_client == 1)
    189 				client_recv(td, &wc);
    190 			else
    191 				server_recv(td, &wc);
    192 
    193 			if (wc.wr_id == FIO_RDMA_MAX_IO_DEPTH)
    194 				break;
    195 
    196 			for (i = 0; i < rd->io_u_flight_nr; i++) {
    197 				r_io_u_d = rd->io_us_flight[i]->engine_data;
    198 
    199 				if (wc.wr_id == r_io_u_d->rq_wr.wr_id) {
    200 					rd->io_us_flight[i]->resid =
    201 					    rd->io_us_flight[i]->buflen
    202 					    - wc.byte_len;
    203 
    204 					rd->io_us_flight[i]->error = 0;
    205 
    206 					rd->io_us_completed[rd->
    207 							    io_u_completed_nr]
    208 					    = rd->io_us_flight[i];
    209 					rd->io_u_completed_nr++;
    210 					break;
    211 				}
    212 			}
    213 			if (i == rd->io_u_flight_nr)
    214 				log_err("fio: recv wr %" PRId64 " not found\n",
    215 					wc.wr_id);
    216 			else {
    217 				/* put the last one into middle of the list */
    218 				rd->io_us_flight[i] =
    219 				    rd->io_us_flight[rd->io_u_flight_nr - 1];
    220 				rd->io_u_flight_nr--;
    221 			}
    222 
    223 			break;
    224 
    225 		case IBV_WC_SEND:
    226 		case IBV_WC_RDMA_WRITE:
    227 		case IBV_WC_RDMA_READ:
    228 			if (wc.wr_id == FIO_RDMA_MAX_IO_DEPTH)
    229 				break;
    230 
    231 			for (i = 0; i < rd->io_u_flight_nr; i++) {
    232 				r_io_u_d = rd->io_us_flight[i]->engine_data;
    233 
    234 				if (wc.wr_id == r_io_u_d->sq_wr.wr_id) {
    235 					rd->io_us_completed[rd->
    236 							    io_u_completed_nr]
    237 					    = rd->io_us_flight[i];
    238 					rd->io_u_completed_nr++;
    239 					break;
    240 				}
    241 			}
    242 			if (i == rd->io_u_flight_nr)
    243 				log_err("fio: send wr %" PRId64 " not found\n",
    244 					wc.wr_id);
    245 			else {
    246 				/* put the last one into middle of the list */
    247 				rd->io_us_flight[i] =
    248 				    rd->io_us_flight[rd->io_u_flight_nr - 1];
    249 				rd->io_u_flight_nr--;
    250 			}
    251 
    252 			break;
    253 
    254 		default:
    255 			log_info("fio: unknown completion event %d\n",
    256 				 wc.opcode);
    257 			return -1;
    258 		}
    259 		rd->cq_event_num++;
    260 	}
    261 	if (ret) {
    262 		log_err("fio: poll error %d\n", ret);
    263 		return 1;
    264 	}
    265 
    266 	return compevnum;
    267 }
    268 
    269 /*
    270  * Return -1 for error and 'nr events' for a positive number
    271  * of events
    272  */
    273 static int rdma_poll_wait(struct thread_data *td, enum ibv_wc_opcode opcode)
    274 {
    275 	struct rdmaio_data *rd = td->io_ops->data;
    276 	struct ibv_cq *ev_cq;
    277 	void *ev_ctx;
    278 	int ret;
    279 
    280 	if (rd->cq_event_num > 0) {	/* previous left */
    281 		rd->cq_event_num--;
    282 		return 0;
    283 	}
    284 
    285 again:
    286 	if (ibv_get_cq_event(rd->channel, &ev_cq, &ev_ctx) != 0) {
    287 		log_err("fio: Failed to get cq event!\n");
    288 		return -1;
    289 	}
    290 	if (ev_cq != rd->cq) {
    291 		log_err("fio: Unknown CQ!\n");
    292 		return -1;
    293 	}
    294 	if (ibv_req_notify_cq(rd->cq, 0) != 0) {
    295 		log_err("fio: Failed to set notify!\n");
    296 		return -1;
    297 	}
    298 
    299 	ret = cq_event_handler(td, opcode);
    300 	if (ret < 1)
    301 		goto again;
    302 
    303 	ibv_ack_cq_events(rd->cq, ret);
    304 
    305 	rd->cq_event_num--;
    306 
    307 	return ret;
    308 }
    309 
    310 static int fio_rdmaio_setup_qp(struct thread_data *td)
    311 {
    312 	struct rdmaio_data *rd = td->io_ops->data;
    313 	struct ibv_qp_init_attr init_attr;
    314 	int qp_depth = td->o.iodepth * 2;	/* 2 times of io depth */
    315 
    316 	if (rd->is_client == 0)
    317 		rd->pd = ibv_alloc_pd(rd->child_cm_id->verbs);
    318 	else
    319 		rd->pd = ibv_alloc_pd(rd->cm_id->verbs);
    320 
    321 	if (rd->pd == NULL) {
    322 		log_err("fio: ibv_alloc_pd fail\n");
    323 		return 1;
    324 	}
    325 
    326 	if (rd->is_client == 0)
    327 		rd->channel = ibv_create_comp_channel(rd->child_cm_id->verbs);
    328 	else
    329 		rd->channel = ibv_create_comp_channel(rd->cm_id->verbs);
    330 	if (rd->channel == NULL) {
    331 		log_err("fio: ibv_create_comp_channel fail\n");
    332 		goto err1;
    333 	}
    334 
    335 	if (qp_depth < 16)
    336 		qp_depth = 16;
    337 
    338 	if (rd->is_client == 0)
    339 		rd->cq = ibv_create_cq(rd->child_cm_id->verbs,
    340 				       qp_depth, rd, rd->channel, 0);
    341 	else
    342 		rd->cq = ibv_create_cq(rd->cm_id->verbs,
    343 				       qp_depth, rd, rd->channel, 0);
    344 	if (rd->cq == NULL) {
    345 		log_err("fio: ibv_create_cq failed\n");
    346 		goto err2;
    347 	}
    348 
    349 	if (ibv_req_notify_cq(rd->cq, 0) != 0) {
    350 		log_err("fio: ibv_create_cq failed\n");
    351 		goto err3;
    352 	}
    353 
    354 	/* create queue pair */
    355 	memset(&init_attr, 0, sizeof(init_attr));
    356 	init_attr.cap.max_send_wr = qp_depth;
    357 	init_attr.cap.max_recv_wr = qp_depth;
    358 	init_attr.cap.max_recv_sge = 1;
    359 	init_attr.cap.max_send_sge = 1;
    360 	init_attr.qp_type = IBV_QPT_RC;
    361 	init_attr.send_cq = rd->cq;
    362 	init_attr.recv_cq = rd->cq;
    363 
    364 	if (rd->is_client == 0) {
    365 		if (rdma_create_qp(rd->child_cm_id, rd->pd, &init_attr) != 0) {
    366 			log_err("fio: rdma_create_qp failed\n");
    367 			goto err3;
    368 		}
    369 		rd->qp = rd->child_cm_id->qp;
    370 	} else {
    371 		if (rdma_create_qp(rd->cm_id, rd->pd, &init_attr) != 0) {
    372 			log_err("fio: rdma_create_qp failed\n");
    373 			goto err3;
    374 		}
    375 		rd->qp = rd->cm_id->qp;
    376 	}
    377 
    378 	return 0;
    379 
    380 err3:
    381 	ibv_destroy_cq(rd->cq);
    382 err2:
    383 	ibv_destroy_comp_channel(rd->channel);
    384 err1:
    385 	ibv_dealloc_pd(rd->pd);
    386 
    387 	return 1;
    388 }
    389 
    390 static int fio_rdmaio_setup_control_msg_buffers(struct thread_data *td)
    391 {
    392 	struct rdmaio_data *rd = td->io_ops->data;
    393 
    394 	rd->recv_mr = ibv_reg_mr(rd->pd, &rd->recv_buf, sizeof(rd->recv_buf),
    395 				 IBV_ACCESS_LOCAL_WRITE);
    396 	if (rd->recv_mr == NULL) {
    397 		log_err("fio: recv_buf reg_mr failed\n");
    398 		return 1;
    399 	}
    400 
    401 	rd->send_mr = ibv_reg_mr(rd->pd, &rd->send_buf, sizeof(rd->send_buf),
    402 				 0);
    403 	if (rd->send_mr == NULL) {
    404 		log_err("fio: send_buf reg_mr failed\n");
    405 		ibv_dereg_mr(rd->recv_mr);
    406 		return 1;
    407 	}
    408 
    409 	/* setup work request */
    410 	/* recv wq */
    411 	rd->recv_sgl.addr = (uint64_t) (unsigned long)&rd->recv_buf;
    412 	rd->recv_sgl.length = sizeof(rd->recv_buf);
    413 	rd->recv_sgl.lkey = rd->recv_mr->lkey;
    414 	rd->rq_wr.sg_list = &rd->recv_sgl;
    415 	rd->rq_wr.num_sge = 1;
    416 	rd->rq_wr.wr_id = FIO_RDMA_MAX_IO_DEPTH;
    417 
    418 	/* send wq */
    419 	rd->send_sgl.addr = (uint64_t) (unsigned long)&rd->send_buf;
    420 	rd->send_sgl.length = sizeof(rd->send_buf);
    421 	rd->send_sgl.lkey = rd->send_mr->lkey;
    422 
    423 	rd->sq_wr.opcode = IBV_WR_SEND;
    424 	rd->sq_wr.send_flags = IBV_SEND_SIGNALED;
    425 	rd->sq_wr.sg_list = &rd->send_sgl;
    426 	rd->sq_wr.num_sge = 1;
    427 	rd->sq_wr.wr_id = FIO_RDMA_MAX_IO_DEPTH;
    428 
    429 	return 0;
    430 }
    431 
    432 static int get_next_channel_event(struct thread_data *td,
    433 				  struct rdma_event_channel *channel,
    434 				  enum rdma_cm_event_type wait_event)
    435 {
    436 	struct rdmaio_data *rd = td->io_ops->data;
    437 	struct rdma_cm_event *event;
    438 	int ret;
    439 
    440 	ret = rdma_get_cm_event(channel, &event);
    441 	if (ret) {
    442 		log_err("fio: rdma_get_cm_event: %d\n", ret);
    443 		return 1;
    444 	}
    445 
    446 	if (event->event != wait_event) {
    447 		log_err("fio: event is %s instead of %s\n",
    448 			rdma_event_str(event->event),
    449 			rdma_event_str(wait_event));
    450 		return 1;
    451 	}
    452 
    453 	switch (event->event) {
    454 	case RDMA_CM_EVENT_CONNECT_REQUEST:
    455 		rd->child_cm_id = event->id;
    456 		break;
    457 	default:
    458 		break;
    459 	}
    460 
    461 	rdma_ack_cm_event(event);
    462 
    463 	return 0;
    464 }
    465 
    466 static int fio_rdmaio_prep(struct thread_data *td, struct io_u *io_u)
    467 {
    468 	struct rdmaio_data *rd = td->io_ops->data;
    469 	struct rdma_io_u_data *r_io_u_d;
    470 
    471 	r_io_u_d = io_u->engine_data;
    472 
    473 	switch (rd->rdma_protocol) {
    474 	case FIO_RDMA_MEM_WRITE:
    475 	case FIO_RDMA_MEM_READ:
    476 		r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
    477 		r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
    478 		r_io_u_d->sq_wr.wr_id = r_io_u_d->wr_id;
    479 		r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
    480 		r_io_u_d->sq_wr.sg_list = &r_io_u_d->rdma_sgl;
    481 		r_io_u_d->sq_wr.num_sge = 1;
    482 		break;
    483 	case FIO_RDMA_CHA_SEND:
    484 		r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
    485 		r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
    486 		r_io_u_d->rdma_sgl.length = io_u->buflen;
    487 		r_io_u_d->sq_wr.wr_id = r_io_u_d->wr_id;
    488 		r_io_u_d->sq_wr.opcode = IBV_WR_SEND;
    489 		r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
    490 		r_io_u_d->sq_wr.sg_list = &r_io_u_d->rdma_sgl;
    491 		r_io_u_d->sq_wr.num_sge = 1;
    492 		break;
    493 	case FIO_RDMA_CHA_RECV:
    494 		r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
    495 		r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
    496 		r_io_u_d->rdma_sgl.length = io_u->buflen;
    497 		r_io_u_d->rq_wr.wr_id = r_io_u_d->wr_id;
    498 		r_io_u_d->rq_wr.sg_list = &r_io_u_d->rdma_sgl;
    499 		r_io_u_d->rq_wr.num_sge = 1;
    500 		break;
    501 	default:
    502 		log_err("fio: unknown rdma protocol - %d\n", rd->rdma_protocol);
    503 		break;
    504 	}
    505 
    506 	return 0;
    507 }
    508 
    509 static struct io_u *fio_rdmaio_event(struct thread_data *td, int event)
    510 {
    511 	struct rdmaio_data *rd = td->io_ops->data;
    512 	struct io_u *io_u;
    513 	int i;
    514 
    515 	io_u = rd->io_us_completed[0];
    516 	for (i = 0; i < rd->io_u_completed_nr - 1; i++)
    517 		rd->io_us_completed[i] = rd->io_us_completed[i + 1];
    518 
    519 	rd->io_u_completed_nr--;
    520 
    521 	dprint_io_u(io_u, "fio_rdmaio_event");
    522 
    523 	return io_u;
    524 }
    525 
    526 static int fio_rdmaio_getevents(struct thread_data *td, unsigned int min,
    527 				unsigned int max, const struct timespec *t)
    528 {
    529 	struct rdmaio_data *rd = td->io_ops->data;
    530 	enum ibv_wc_opcode comp_opcode;
    531 	struct ibv_cq *ev_cq;
    532 	void *ev_ctx;
    533 	int ret, r = 0;
    534 	comp_opcode = IBV_WC_RDMA_WRITE;
    535 
    536 	switch (rd->rdma_protocol) {
    537 	case FIO_RDMA_MEM_WRITE:
    538 		comp_opcode = IBV_WC_RDMA_WRITE;
    539 		break;
    540 	case FIO_RDMA_MEM_READ:
    541 		comp_opcode = IBV_WC_RDMA_READ;
    542 		break;
    543 	case FIO_RDMA_CHA_SEND:
    544 		comp_opcode = IBV_WC_SEND;
    545 		break;
    546 	case FIO_RDMA_CHA_RECV:
    547 		comp_opcode = IBV_WC_RECV;
    548 		break;
    549 	default:
    550 		log_err("fio: unknown rdma protocol - %d\n", rd->rdma_protocol);
    551 		break;
    552 	}
    553 
    554 	if (rd->cq_event_num > 0) {	/* previous left */
    555 		rd->cq_event_num--;
    556 		return 0;
    557 	}
    558 
    559 again:
    560 	if (ibv_get_cq_event(rd->channel, &ev_cq, &ev_ctx) != 0) {
    561 		log_err("fio: Failed to get cq event!\n");
    562 		return -1;
    563 	}
    564 	if (ev_cq != rd->cq) {
    565 		log_err("fio: Unknown CQ!\n");
    566 		return -1;
    567 	}
    568 	if (ibv_req_notify_cq(rd->cq, 0) != 0) {
    569 		log_err("fio: Failed to set notify!\n");
    570 		return -1;
    571 	}
    572 
    573 	ret = cq_event_handler(td, comp_opcode);
    574 	if (ret < 1)
    575 		goto again;
    576 
    577 	ibv_ack_cq_events(rd->cq, ret);
    578 
    579 	r += ret;
    580 	if (r < min)
    581 		goto again;
    582 
    583 	rd->cq_event_num -= r;
    584 
    585 	return r;
    586 }
    587 
    588 static int fio_rdmaio_send(struct thread_data *td, struct io_u **io_us,
    589 			   unsigned int nr)
    590 {
    591 	struct rdmaio_data *rd = td->io_ops->data;
    592 	struct ibv_send_wr *bad_wr;
    593 #if 0
    594 	enum ibv_wc_opcode comp_opcode;
    595 	comp_opcode = IBV_WC_RDMA_WRITE;
    596 #endif
    597 	int i;
    598 	long index;
    599 	struct rdma_io_u_data *r_io_u_d;
    600 
    601 	r_io_u_d = NULL;
    602 
    603 	for (i = 0; i < nr; i++) {
    604 		/* RDMA_WRITE or RDMA_READ */
    605 		switch (rd->rdma_protocol) {
    606 		case FIO_RDMA_MEM_WRITE:
    607 			/* compose work request */
    608 			r_io_u_d = io_us[i]->engine_data;
    609 			index = __rand(&rd->rand_state) % rd->rmt_nr;
    610 			r_io_u_d->sq_wr.opcode = IBV_WR_RDMA_WRITE;
    611 			r_io_u_d->sq_wr.wr.rdma.rkey = rd->rmt_us[index].rkey;
    612 			r_io_u_d->sq_wr.wr.rdma.remote_addr = \
    613 				rd->rmt_us[index].buf;
    614 			r_io_u_d->sq_wr.sg_list->length = io_us[i]->buflen;
    615 			break;
    616 		case FIO_RDMA_MEM_READ:
    617 			/* compose work request */
    618 			r_io_u_d = io_us[i]->engine_data;
    619 			index = __rand(&rd->rand_state) % rd->rmt_nr;
    620 			r_io_u_d->sq_wr.opcode = IBV_WR_RDMA_READ;
    621 			r_io_u_d->sq_wr.wr.rdma.rkey = rd->rmt_us[index].rkey;
    622 			r_io_u_d->sq_wr.wr.rdma.remote_addr = \
    623 				rd->rmt_us[index].buf;
    624 			r_io_u_d->sq_wr.sg_list->length = io_us[i]->buflen;
    625 			break;
    626 		case FIO_RDMA_CHA_SEND:
    627 			r_io_u_d = io_us[i]->engine_data;
    628 			r_io_u_d->sq_wr.opcode = IBV_WR_SEND;
    629 			r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
    630 			break;
    631 		default:
    632 			log_err("fio: unknown rdma protocol - %d\n",
    633 				rd->rdma_protocol);
    634 			break;
    635 		}
    636 
    637 		if (ibv_post_send(rd->qp, &r_io_u_d->sq_wr, &bad_wr) != 0) {
    638 			log_err("fio: ibv_post_send fail\n");
    639 			return -1;
    640 		}
    641 
    642 		dprint_io_u(io_us[i], "fio_rdmaio_send");
    643 	}
    644 
    645 	/* wait for completion
    646 	   rdma_poll_wait(td, comp_opcode); */
    647 
    648 	return i;
    649 }
    650 
    651 static int fio_rdmaio_recv(struct thread_data *td, struct io_u **io_us,
    652 			   unsigned int nr)
    653 {
    654 	struct rdmaio_data *rd = td->io_ops->data;
    655 	struct ibv_recv_wr *bad_wr;
    656 	struct rdma_io_u_data *r_io_u_d;
    657 	int i;
    658 
    659 	i = 0;
    660 	if (rd->rdma_protocol == FIO_RDMA_CHA_RECV) {
    661 		/* post io_u into recv queue */
    662 		for (i = 0; i < nr; i++) {
    663 			r_io_u_d = io_us[i]->engine_data;
    664 			if (ibv_post_recv(rd->qp, &r_io_u_d->rq_wr, &bad_wr) !=
    665 			    0) {
    666 				log_err("fio: ibv_post_recv fail\n");
    667 				return 1;
    668 			}
    669 		}
    670 	} else if ((rd->rdma_protocol == FIO_RDMA_MEM_READ)
    671 		   || (rd->rdma_protocol == FIO_RDMA_MEM_WRITE)) {
    672 		/* re-post the rq_wr */
    673 		if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) {
    674 			log_err("fio: ibv_post_recv fail\n");
    675 			return 1;
    676 		}
    677 
    678 		rdma_poll_wait(td, IBV_WC_RECV);
    679 
    680 		dprint(FD_IO, "fio: recv FINISH message\n");
    681 		td->done = 1;
    682 		return 0;
    683 	}
    684 
    685 	return i;
    686 }
    687 
    688 static int fio_rdmaio_queue(struct thread_data *td, struct io_u *io_u)
    689 {
    690 	struct rdmaio_data *rd = td->io_ops->data;
    691 
    692 	fio_ro_check(td, io_u);
    693 
    694 	if (rd->io_u_queued_nr == (int)td->o.iodepth)
    695 		return FIO_Q_BUSY;
    696 
    697 	rd->io_us_queued[rd->io_u_queued_nr] = io_u;
    698 	rd->io_u_queued_nr++;
    699 
    700 	dprint_io_u(io_u, "fio_rdmaio_queue");
    701 
    702 	return FIO_Q_QUEUED;
    703 }
    704 
    705 static void fio_rdmaio_queued(struct thread_data *td, struct io_u **io_us,
    706 			      unsigned int nr)
    707 {
    708 	struct rdmaio_data *rd = td->io_ops->data;
    709 	struct timeval now;
    710 	unsigned int i;
    711 
    712 	if (!fio_fill_issue_time(td))
    713 		return;
    714 
    715 	fio_gettime(&now, NULL);
    716 
    717 	for (i = 0; i < nr; i++) {
    718 		struct io_u *io_u = io_us[i];
    719 
    720 		/* queued -> flight */
    721 		rd->io_us_flight[rd->io_u_flight_nr] = io_u;
    722 		rd->io_u_flight_nr++;
    723 
    724 		memcpy(&io_u->issue_time, &now, sizeof(now));
    725 		io_u_queued(td, io_u);
    726 	}
    727 }
    728 
    729 static int fio_rdmaio_commit(struct thread_data *td)
    730 {
    731 	struct rdmaio_data *rd = td->io_ops->data;
    732 	struct io_u **io_us;
    733 	int ret;
    734 
    735 	if (!rd->io_us_queued)
    736 		return 0;
    737 
    738 	io_us = rd->io_us_queued;
    739 	do {
    740 		/* RDMA_WRITE or RDMA_READ */
    741 		if (rd->is_client)
    742 			ret = fio_rdmaio_send(td, io_us, rd->io_u_queued_nr);
    743 		else if (!rd->is_client)
    744 			ret = fio_rdmaio_recv(td, io_us, rd->io_u_queued_nr);
    745 		else
    746 			ret = 0;	/* must be a SYNC */
    747 
    748 		if (ret > 0) {
    749 			fio_rdmaio_queued(td, io_us, ret);
    750 			io_u_mark_submit(td, ret);
    751 			rd->io_u_queued_nr -= ret;
    752 			io_us += ret;
    753 			ret = 0;
    754 		} else
    755 			break;
    756 	} while (rd->io_u_queued_nr);
    757 
    758 	return ret;
    759 }
    760 
    761 static int fio_rdmaio_connect(struct thread_data *td, struct fio_file *f)
    762 {
    763 	struct rdmaio_data *rd = td->io_ops->data;
    764 	struct rdma_conn_param conn_param;
    765 	struct ibv_send_wr *bad_wr;
    766 
    767 	memset(&conn_param, 0, sizeof(conn_param));
    768 	conn_param.responder_resources = 1;
    769 	conn_param.initiator_depth = 1;
    770 	conn_param.retry_count = 10;
    771 
    772 	if (rdma_connect(rd->cm_id, &conn_param) != 0) {
    773 		log_err("fio: rdma_connect fail\n");
    774 		return 1;
    775 	}
    776 
    777 	if (get_next_channel_event
    778 	    (td, rd->cm_channel, RDMA_CM_EVENT_ESTABLISHED) != 0) {
    779 		log_err("fio: wait for RDMA_CM_EVENT_ESTABLISHED\n");
    780 		return 1;
    781 	}
    782 
    783 	/* send task request */
    784 	rd->send_buf.mode = htonl(rd->rdma_protocol);
    785 	rd->send_buf.nr = htonl(td->o.iodepth);
    786 
    787 	if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
    788 		log_err("fio: ibv_post_send fail");
    789 		return 1;
    790 	}
    791 
    792 	rdma_poll_wait(td, IBV_WC_SEND);
    793 
    794 	/* wait for remote MR info from server side */
    795 	rdma_poll_wait(td, IBV_WC_RECV);
    796 
    797 	/* In SEND/RECV test, it's a good practice to setup the iodepth of
    798 	 * of the RECV side deeper than that of the SEND side to
    799 	 * avoid RNR (receiver not ready) error. The
    800 	 * SEND side may send so many unsolicited message before
    801 	 * RECV side commits sufficient recv buffers into recv queue.
    802 	 * This may lead to RNR error. Here, SEND side pauses for a while
    803 	 * during which RECV side commits sufficient recv buffers.
    804 	 */
    805 	usleep(500000);
    806 
    807 	return 0;
    808 }
    809 
    810 static int fio_rdmaio_accept(struct thread_data *td, struct fio_file *f)
    811 {
    812 	struct rdmaio_data *rd = td->io_ops->data;
    813 	struct rdma_conn_param conn_param;
    814 	struct ibv_send_wr *bad_wr;
    815 
    816 	/* rdma_accept() - then wait for accept success */
    817 	memset(&conn_param, 0, sizeof(conn_param));
    818 	conn_param.responder_resources = 1;
    819 	conn_param.initiator_depth = 1;
    820 
    821 	if (rdma_accept(rd->child_cm_id, &conn_param) != 0) {
    822 		log_err("fio: rdma_accept\n");
    823 		return 1;
    824 	}
    825 
    826 	if (get_next_channel_event
    827 	    (td, rd->cm_channel, RDMA_CM_EVENT_ESTABLISHED) != 0) {
    828 		log_err("fio: wait for RDMA_CM_EVENT_ESTABLISHED\n");
    829 		return 1;
    830 	}
    831 
    832 	/* wait for request */
    833 	rdma_poll_wait(td, IBV_WC_RECV);
    834 
    835 	if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
    836 		log_err("fio: ibv_post_send fail");
    837 		return 1;
    838 	}
    839 
    840 	rdma_poll_wait(td, IBV_WC_SEND);
    841 
    842 	return 0;
    843 }
    844 
    845 static int fio_rdmaio_open_file(struct thread_data *td, struct fio_file *f)
    846 {
    847 	if (td_read(td))
    848 		return fio_rdmaio_accept(td, f);
    849 	else
    850 		return fio_rdmaio_connect(td, f);
    851 }
    852 
    853 static int fio_rdmaio_close_file(struct thread_data *td, struct fio_file *f)
    854 {
    855 	struct rdmaio_data *rd = td->io_ops->data;
    856 	struct ibv_send_wr *bad_wr;
    857 
    858 	/* unregister rdma buffer */
    859 
    860 	/*
    861 	 * Client sends notification to the server side
    862 	 */
    863 	/* refer to: http://linux.die.net/man/7/rdma_cm */
    864 	if ((rd->is_client == 1) && ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE)
    865 				     || (rd->rdma_protocol ==
    866 					 FIO_RDMA_MEM_READ))) {
    867 		if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
    868 			log_err("fio: ibv_post_send fail");
    869 			return 1;
    870 		}
    871 
    872 		dprint(FD_IO, "fio: close information sent success\n");
    873 		rdma_poll_wait(td, IBV_WC_SEND);
    874 	}
    875 
    876 	if (rd->is_client == 1)
    877 		rdma_disconnect(rd->cm_id);
    878 	else {
    879 		rdma_disconnect(rd->child_cm_id);
    880 #if 0
    881 		rdma_disconnect(rd->cm_id);
    882 #endif
    883 	}
    884 
    885 #if 0
    886 	if (get_next_channel_event(td, rd->cm_channel, RDMA_CM_EVENT_DISCONNECTED) != 0) {
    887 		log_err("fio: wait for RDMA_CM_EVENT_DISCONNECTED\n");
    888 		return 1;
    889 	}
    890 #endif
    891 
    892 	ibv_destroy_cq(rd->cq);
    893 	ibv_destroy_qp(rd->qp);
    894 
    895 	if (rd->is_client == 1)
    896 		rdma_destroy_id(rd->cm_id);
    897 	else {
    898 		rdma_destroy_id(rd->child_cm_id);
    899 		rdma_destroy_id(rd->cm_id);
    900 	}
    901 
    902 	ibv_destroy_comp_channel(rd->channel);
    903 	ibv_dealloc_pd(rd->pd);
    904 
    905 	return 0;
    906 }
    907 
    908 static int fio_rdmaio_setup_connect(struct thread_data *td, const char *host,
    909 				    unsigned short port)
    910 {
    911 	struct rdmaio_data *rd = td->io_ops->data;
    912 	struct ibv_recv_wr *bad_wr;
    913 	int err;
    914 
    915 	rd->addr.sin_family = AF_INET;
    916 	rd->addr.sin_port = htons(port);
    917 
    918 	if (inet_aton(host, &rd->addr.sin_addr) != 1) {
    919 		struct hostent *hent;
    920 
    921 		hent = gethostbyname(host);
    922 		if (!hent) {
    923 			td_verror(td, errno, "gethostbyname");
    924 			return 1;
    925 		}
    926 
    927 		memcpy(&rd->addr.sin_addr, hent->h_addr, 4);
    928 	}
    929 
    930 	/* resolve route */
    931 	err = rdma_resolve_addr(rd->cm_id, NULL, (struct sockaddr *)&rd->addr, 2000);
    932 	if (err != 0) {
    933 		log_err("fio: rdma_resolve_addr: %d\n", err);
    934 		return 1;
    935 	}
    936 
    937 	err = get_next_channel_event(td, rd->cm_channel, RDMA_CM_EVENT_ADDR_RESOLVED);
    938 	if (err != 0) {
    939 		log_err("fio: get_next_channel_event: %d\n", err);
    940 		return 1;
    941 	}
    942 
    943 	/* resolve route */
    944 	err = rdma_resolve_route(rd->cm_id, 2000);
    945 	if (err != 0) {
    946 		log_err("fio: rdma_resolve_route: %d\n", err);
    947 		return 1;
    948 	}
    949 
    950 	err = get_next_channel_event(td, rd->cm_channel, RDMA_CM_EVENT_ROUTE_RESOLVED);
    951 	if (err != 0) {
    952 		log_err("fio: get_next_channel_event: %d\n", err);
    953 		return 1;
    954 	}
    955 
    956 	/* create qp and buffer */
    957 	if (fio_rdmaio_setup_qp(td) != 0)
    958 		return 1;
    959 
    960 	if (fio_rdmaio_setup_control_msg_buffers(td) != 0)
    961 		return 1;
    962 
    963 	/* post recv buf */
    964 	err = ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr);
    965 	if (err != 0) {
    966 		log_err("fio: ibv_post_recv fail: %d\n", err);
    967 		return 1;
    968 	}
    969 
    970 	return 0;
    971 }
    972 
    973 static int fio_rdmaio_setup_listen(struct thread_data *td, short port)
    974 {
    975 	struct rdmaio_data *rd = td->io_ops->data;
    976 	struct ibv_recv_wr *bad_wr;
    977 
    978 	rd->addr.sin_family = AF_INET;
    979 	rd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
    980 	rd->addr.sin_port = htons(port);
    981 
    982 	/* rdma_listen */
    983 	if (rdma_bind_addr(rd->cm_id, (struct sockaddr *)&rd->addr) != 0) {
    984 		log_err("fio: rdma_bind_addr fail\n");
    985 		return 1;
    986 	}
    987 
    988 	if (rdma_listen(rd->cm_id, 3) != 0) {
    989 		log_err("fio: rdma_listen fail\n");
    990 		return 1;
    991 	}
    992 
    993 	/* wait for CONNECT_REQUEST */
    994 	if (get_next_channel_event
    995 	    (td, rd->cm_channel, RDMA_CM_EVENT_CONNECT_REQUEST) != 0) {
    996 		log_err("fio: wait for RDMA_CM_EVENT_CONNECT_REQUEST\n");
    997 		return 1;
    998 	}
    999 
   1000 	if (fio_rdmaio_setup_qp(td) != 0)
   1001 		return 1;
   1002 
   1003 	if (fio_rdmaio_setup_control_msg_buffers(td) != 0)
   1004 		return 1;
   1005 
   1006 	/* post recv buf */
   1007 	if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) {
   1008 		log_err("fio: ibv_post_recv fail\n");
   1009 		return 1;
   1010 	}
   1011 
   1012 	return 0;
   1013 }
   1014 
   1015 static int check_set_rlimits(struct thread_data *td)
   1016 {
   1017 #ifdef CONFIG_RLIMIT_MEMLOCK
   1018 	struct rlimit rl;
   1019 
   1020 	/* check RLIMIT_MEMLOCK */
   1021 	if (getrlimit(RLIMIT_MEMLOCK, &rl) != 0) {
   1022 		log_err("fio: getrlimit fail: %d(%s)\n",
   1023 			errno, strerror(errno));
   1024 		return 1;
   1025 	}
   1026 
   1027 	/* soft limit */
   1028 	if ((rl.rlim_cur != RLIM_INFINITY)
   1029 	    && (rl.rlim_cur < td->orig_buffer_size)) {
   1030 		log_err("fio: soft RLIMIT_MEMLOCK is: %" PRId64 "\n",
   1031 			rl.rlim_cur);
   1032 		log_err("fio: total block size is:    %zd\n",
   1033 			td->orig_buffer_size);
   1034 		/* try to set larger RLIMIT_MEMLOCK */
   1035 		rl.rlim_cur = rl.rlim_max;
   1036 		if (setrlimit(RLIMIT_MEMLOCK, &rl) != 0) {
   1037 			log_err("fio: setrlimit fail: %d(%s)\n",
   1038 				errno, strerror(errno));
   1039 			log_err("fio: you may try enlarge MEMLOCK by root\n");
   1040 			log_err("# ulimit -l unlimited\n");
   1041 			return 1;
   1042 		}
   1043 	}
   1044 #endif
   1045 
   1046 	return 0;
   1047 }
   1048 
   1049 static int fio_rdmaio_init(struct thread_data *td)
   1050 {
   1051 	struct rdmaio_data *rd = td->io_ops->data;
   1052 	unsigned int max_bs;
   1053 	unsigned int port;
   1054 	char host[64], buf[128];
   1055 	char *sep, *portp, *modep;
   1056 	int ret, i;
   1057 
   1058 	if (td_rw(td)) {
   1059 		log_err("fio: rdma connections must be read OR write\n");
   1060 		return 1;
   1061 	}
   1062 	if (td_random(td)) {
   1063 		log_err("fio: RDMA network IO can't be random\n");
   1064 		return 1;
   1065 	}
   1066 
   1067 	if (check_set_rlimits(td))
   1068 		return 1;
   1069 
   1070 	strcpy(buf, td->o.filename);
   1071 
   1072 	sep = strchr(buf, '/');
   1073 	if (!sep)
   1074 		goto bad_host;
   1075 
   1076 	*sep = '\0';
   1077 	sep++;
   1078 	strcpy(host, buf);
   1079 	if (!strlen(host))
   1080 		goto bad_host;
   1081 
   1082 	modep = NULL;
   1083 	portp = sep;
   1084 	sep = strchr(portp, '/');
   1085 	if (sep) {
   1086 		*sep = '\0';
   1087 		modep = sep + 1;
   1088 	}
   1089 
   1090 	port = strtol(portp, NULL, 10);
   1091 	if (!port || port > 65535)
   1092 		goto bad_host;
   1093 
   1094 	if (modep) {
   1095 		if (!strncmp("rdma_write", modep, strlen(modep)) ||
   1096 		    !strncmp("RDMA_WRITE", modep, strlen(modep)))
   1097 			rd->rdma_protocol = FIO_RDMA_MEM_WRITE;
   1098 		else if (!strncmp("rdma_read", modep, strlen(modep)) ||
   1099 			 !strncmp("RDMA_READ", modep, strlen(modep)))
   1100 			rd->rdma_protocol = FIO_RDMA_MEM_READ;
   1101 		else if (!strncmp("send", modep, strlen(modep)) ||
   1102 			 !strncmp("SEND", modep, strlen(modep)))
   1103 			rd->rdma_protocol = FIO_RDMA_CHA_SEND;
   1104 		else
   1105 			goto bad_host;
   1106 	} else
   1107 		rd->rdma_protocol = FIO_RDMA_MEM_WRITE;
   1108 
   1109 	rd->cq_event_num = 0;
   1110 
   1111 	rd->cm_channel = rdma_create_event_channel();
   1112 	if (!rd->cm_channel) {
   1113 		log_err("fio: rdma_create_event_channel fail\n");
   1114 		return 1;
   1115 	}
   1116 
   1117 	ret = rdma_create_id(rd->cm_channel, &rd->cm_id, rd, RDMA_PS_TCP);
   1118 	if (ret) {
   1119 		log_err("fio: rdma_create_id fail\n");
   1120 		return 1;
   1121 	}
   1122 
   1123 	if ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE) ||
   1124 	    (rd->rdma_protocol == FIO_RDMA_MEM_READ)) {
   1125 		rd->rmt_us =
   1126 			malloc(FIO_RDMA_MAX_IO_DEPTH * sizeof(struct remote_u));
   1127 		memset(rd->rmt_us, 0,
   1128 			FIO_RDMA_MAX_IO_DEPTH * sizeof(struct remote_u));
   1129 		rd->rmt_nr = 0;
   1130 	}
   1131 
   1132 	rd->io_us_queued = malloc(td->o.iodepth * sizeof(struct io_u *));
   1133 	memset(rd->io_us_queued, 0, td->o.iodepth * sizeof(struct io_u *));
   1134 	rd->io_u_queued_nr = 0;
   1135 
   1136 	rd->io_us_flight = malloc(td->o.iodepth * sizeof(struct io_u *));
   1137 	memset(rd->io_us_flight, 0, td->o.iodepth * sizeof(struct io_u *));
   1138 	rd->io_u_flight_nr = 0;
   1139 
   1140 	rd->io_us_completed = malloc(td->o.iodepth * sizeof(struct io_u *));
   1141 	memset(rd->io_us_completed, 0, td->o.iodepth * sizeof(struct io_u *));
   1142 	rd->io_u_completed_nr = 0;
   1143 
   1144 	if (td_read(td)) {	/* READ as the server */
   1145 		rd->is_client = 0;
   1146 		/* server rd->rdma_buf_len will be setup after got request */
   1147 		ret = fio_rdmaio_setup_listen(td, port);
   1148 	} else {		/* WRITE as the client */
   1149 		rd->is_client = 1;
   1150 		ret = fio_rdmaio_setup_connect(td, host, port);
   1151 	}
   1152 
   1153 	max_bs = max(td->o.max_bs[DDIR_READ], td->o.max_bs[DDIR_WRITE]);
   1154 	/* register each io_u in the free list */
   1155 	for (i = 0; i < td->io_u_freelist.nr; i++) {
   1156 		struct io_u *io_u = td->io_u_freelist.io_us[i];
   1157 
   1158 		io_u->engine_data = malloc(sizeof(struct rdma_io_u_data));
   1159 		memset(io_u->engine_data, 0, sizeof(struct rdma_io_u_data));
   1160 		((struct rdma_io_u_data *)io_u->engine_data)->wr_id = i;
   1161 
   1162 		io_u->mr = ibv_reg_mr(rd->pd, io_u->buf, max_bs,
   1163 				      IBV_ACCESS_LOCAL_WRITE |
   1164 				      IBV_ACCESS_REMOTE_READ |
   1165 				      IBV_ACCESS_REMOTE_WRITE);
   1166 		if (io_u->mr == NULL) {
   1167 			log_err("fio: ibv_reg_mr io_u failed\n");
   1168 			return 1;
   1169 		}
   1170 
   1171 		rd->send_buf.rmt_us[i].buf =
   1172 		    htonll((uint64_t) (unsigned long)io_u->buf);
   1173 		rd->send_buf.rmt_us[i].rkey = htonl(io_u->mr->rkey);
   1174 		rd->send_buf.rmt_us[i].size = htonl(max_bs);
   1175 
   1176 #if 0
   1177 		log_info("fio: Send rkey %x addr %" PRIx64 " len %d to client\n", io_u->mr->rkey, io_u->buf, max_bs); */
   1178 #endif
   1179 	}
   1180 
   1181 	rd->send_buf.nr = htonl(i);
   1182 
   1183 	return ret;
   1184 bad_host:
   1185 	log_err("fio: bad rdma host/port/protocol: %s\n", td->o.filename);
   1186 	return 1;
   1187 }
   1188 
   1189 static void fio_rdmaio_cleanup(struct thread_data *td)
   1190 {
   1191 	struct rdmaio_data *rd = td->io_ops->data;
   1192 
   1193 	if (rd)
   1194 		free(rd);
   1195 }
   1196 
   1197 static int fio_rdmaio_setup(struct thread_data *td)
   1198 {
   1199 	struct rdmaio_data *rd;
   1200 
   1201 	if (!td->io_ops->data) {
   1202 		rd = malloc(sizeof(*rd));
   1203 
   1204 		memset(rd, 0, sizeof(*rd));
   1205 		init_rand_seed(&rd->rand_state, (unsigned int) GOLDEN_RATIO_PRIME);
   1206 		td->io_ops->data = rd;
   1207 	}
   1208 
   1209 	return 0;
   1210 }
   1211 
   1212 static struct ioengine_ops ioengine_rw = {
   1213 	.name		= "rdma",
   1214 	.version	= FIO_IOOPS_VERSION,
   1215 	.setup		= fio_rdmaio_setup,
   1216 	.init		= fio_rdmaio_init,
   1217 	.prep		= fio_rdmaio_prep,
   1218 	.queue		= fio_rdmaio_queue,
   1219 	.commit		= fio_rdmaio_commit,
   1220 	.getevents	= fio_rdmaio_getevents,
   1221 	.event		= fio_rdmaio_event,
   1222 	.cleanup	= fio_rdmaio_cleanup,
   1223 	.open_file	= fio_rdmaio_open_file,
   1224 	.close_file	= fio_rdmaio_close_file,
   1225 	.flags		= FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO,
   1226 };
   1227 
   1228 static void fio_init fio_rdmaio_register(void)
   1229 {
   1230 	register_ioengine(&ioengine_rw);
   1231 }
   1232 
   1233 static void fio_exit fio_rdmaio_unregister(void)
   1234 {
   1235 	unregister_ioengine(&ioengine_rw);
   1236 }
   1237