1 /* 2 * Copyright (c) 2000-2007 Niels Provos <provos (at) citi.umich.edu> 3 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 #include "event2/event-config.h" 28 29 #ifdef WIN32 30 #define WIN32_LEAN_AND_MEAN 31 #include <winsock2.h> 32 #include <windows.h> 33 #undef WIN32_LEAN_AND_MEAN 34 #endif 35 36 #include <sys/types.h> 37 #ifndef WIN32 38 #include <sys/socket.h> 39 #endif 40 #ifdef _EVENT_HAVE_SYS_TIME_H 41 #include <sys/time.h> 42 #endif 43 #include <sys/queue.h> 44 #include <stdio.h> 45 #include <stdlib.h> 46 #ifndef WIN32 47 #include <unistd.h> 48 #endif 49 #include <errno.h> 50 #include <signal.h> 51 #include <string.h> 52 53 #include <sys/queue.h> 54 55 #include "event2/event.h" 56 #include "event2/event_struct.h" 57 #include "event2/rpc.h" 58 #include "event2/rpc_struct.h" 59 #include "evrpc-internal.h" 60 #include "event2/http.h" 61 #include "event2/buffer.h" 62 #include "event2/tag.h" 63 #include "event2/http_struct.h" 64 #include "event2/http_compat.h" 65 #include "event2/util.h" 66 #include "util-internal.h" 67 #include "log-internal.h" 68 #include "mm-internal.h" 69 70 struct evrpc_base * 71 evrpc_init(struct evhttp *http_server) 72 { 73 struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base)); 74 if (base == NULL) 75 return (NULL); 76 77 /* we rely on the tagging sub system */ 78 evtag_init(); 79 80 TAILQ_INIT(&base->registered_rpcs); 81 TAILQ_INIT(&base->input_hooks); 82 TAILQ_INIT(&base->output_hooks); 83 84 TAILQ_INIT(&base->paused_requests); 85 86 base->http_server = http_server; 87 88 return (base); 89 } 90 91 void 92 evrpc_free(struct evrpc_base *base) 93 { 94 struct evrpc *rpc; 95 struct evrpc_hook *hook; 96 struct evrpc_hook_ctx *pause; 97 int r; 98 99 while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) { 100 r = evrpc_unregister_rpc(base, rpc->uri); 101 EVUTIL_ASSERT(r == 0); 102 } 103 while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) { 104 TAILQ_REMOVE(&base->paused_requests, pause, next); 105 mm_free(pause); 106 } 107 while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) { 108 r = evrpc_remove_hook(base, EVRPC_INPUT, hook); 109 EVUTIL_ASSERT(r); 110 } 111 while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) { 112 r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook); 113 EVUTIL_ASSERT(r); 114 } 115 mm_free(base); 116 } 117 118 void * 119 evrpc_add_hook(void *vbase, 120 enum EVRPC_HOOK_TYPE hook_type, 121 int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *), 122 void *cb_arg) 123 { 124 struct _evrpc_hooks *base = vbase; 125 struct evrpc_hook_list *head = NULL; 126 struct evrpc_hook *hook = NULL; 127 switch (hook_type) { 128 case EVRPC_INPUT: 129 head = &base->in_hooks; 130 break; 131 case EVRPC_OUTPUT: 132 head = &base->out_hooks; 133 break; 134 default: 135 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 136 } 137 138 hook = mm_calloc(1, sizeof(struct evrpc_hook)); 139 EVUTIL_ASSERT(hook != NULL); 140 141 hook->process = cb; 142 hook->process_arg = cb_arg; 143 TAILQ_INSERT_TAIL(head, hook, next); 144 145 return (hook); 146 } 147 148 static int 149 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle) 150 { 151 struct evrpc_hook *hook = NULL; 152 TAILQ_FOREACH(hook, head, next) { 153 if (hook == handle) { 154 TAILQ_REMOVE(head, hook, next); 155 mm_free(hook); 156 return (1); 157 } 158 } 159 160 return (0); 161 } 162 163 /* 164 * remove the hook specified by the handle 165 */ 166 167 int 168 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle) 169 { 170 struct _evrpc_hooks *base = vbase; 171 struct evrpc_hook_list *head = NULL; 172 switch (hook_type) { 173 case EVRPC_INPUT: 174 head = &base->in_hooks; 175 break; 176 case EVRPC_OUTPUT: 177 head = &base->out_hooks; 178 break; 179 default: 180 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 181 } 182 183 return (evrpc_remove_hook_internal(head, handle)); 184 } 185 186 static int 187 evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx, 188 struct evhttp_request *req, struct evbuffer *evbuf) 189 { 190 struct evrpc_hook *hook; 191 TAILQ_FOREACH(hook, head, next) { 192 int res = hook->process(ctx, req, evbuf, hook->process_arg); 193 if (res != EVRPC_CONTINUE) 194 return (res); 195 } 196 197 return (EVRPC_CONTINUE); 198 } 199 200 static void evrpc_pool_schedule(struct evrpc_pool *pool); 201 static void evrpc_request_cb(struct evhttp_request *, void *); 202 203 /* 204 * Registers a new RPC with the HTTP server. The evrpc object is expected 205 * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn 206 * calls this function. 207 */ 208 209 static char * 210 evrpc_construct_uri(const char *uri) 211 { 212 char *constructed_uri; 213 size_t constructed_uri_len; 214 215 constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1; 216 if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL) 217 event_err(1, "%s: failed to register rpc at %s", 218 __func__, uri); 219 memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX)); 220 memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri)); 221 constructed_uri[constructed_uri_len - 1] = '\0'; 222 223 return (constructed_uri); 224 } 225 226 int 227 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, 228 void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg) 229 { 230 char *constructed_uri = evrpc_construct_uri(rpc->uri); 231 232 rpc->base = base; 233 rpc->cb = cb; 234 rpc->cb_arg = cb_arg; 235 236 TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next); 237 238 evhttp_set_cb(base->http_server, 239 constructed_uri, 240 evrpc_request_cb, 241 rpc); 242 243 mm_free(constructed_uri); 244 245 return (0); 246 } 247 248 int 249 evrpc_unregister_rpc(struct evrpc_base *base, const char *name) 250 { 251 char *registered_uri = NULL; 252 struct evrpc *rpc; 253 int r; 254 255 /* find the right rpc; linear search might be slow */ 256 TAILQ_FOREACH(rpc, &base->registered_rpcs, next) { 257 if (strcmp(rpc->uri, name) == 0) 258 break; 259 } 260 if (rpc == NULL) { 261 /* We did not find an RPC with this name */ 262 return (-1); 263 } 264 TAILQ_REMOVE(&base->registered_rpcs, rpc, next); 265 266 registered_uri = evrpc_construct_uri(name); 267 268 /* remove the http server callback */ 269 r = evhttp_del_cb(base->http_server, registered_uri); 270 EVUTIL_ASSERT(r == 0); 271 272 mm_free(registered_uri); 273 274 mm_free((char *)rpc->uri); 275 mm_free(rpc); 276 return (0); 277 } 278 279 static int evrpc_pause_request(void *vbase, void *ctx, 280 void (*cb)(void *, enum EVRPC_HOOK_RESULT)); 281 static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT); 282 283 static void 284 evrpc_request_cb(struct evhttp_request *req, void *arg) 285 { 286 struct evrpc *rpc = arg; 287 struct evrpc_req_generic *rpc_state = NULL; 288 289 /* let's verify the outside parameters */ 290 if (req->type != EVHTTP_REQ_POST || 291 evbuffer_get_length(req->input_buffer) <= 0) 292 goto error; 293 294 rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic)); 295 if (rpc_state == NULL) 296 goto error; 297 rpc_state->rpc = rpc; 298 rpc_state->http_req = req; 299 rpc_state->rpc_data = NULL; 300 301 if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) { 302 int hook_res; 303 304 evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon); 305 306 /* 307 * allow hooks to modify the outgoing request 308 */ 309 hook_res = evrpc_process_hooks(&rpc->base->input_hooks, 310 rpc_state, req, req->input_buffer); 311 switch (hook_res) { 312 case EVRPC_TERMINATE: 313 goto error; 314 case EVRPC_PAUSE: 315 evrpc_pause_request(rpc->base, rpc_state, 316 evrpc_request_cb_closure); 317 return; 318 case EVRPC_CONTINUE: 319 break; 320 default: 321 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 322 hook_res == EVRPC_CONTINUE || 323 hook_res == EVRPC_PAUSE); 324 } 325 } 326 327 evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE); 328 return; 329 330 error: 331 if (rpc_state != NULL) 332 evrpc_reqstate_free(rpc_state); 333 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 334 return; 335 } 336 337 static void 338 evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 339 { 340 struct evrpc_req_generic *rpc_state = arg; 341 struct evrpc *rpc; 342 struct evhttp_request *req; 343 344 EVUTIL_ASSERT(rpc_state); 345 rpc = rpc_state->rpc; 346 req = rpc_state->http_req; 347 348 if (hook_res == EVRPC_TERMINATE) 349 goto error; 350 351 /* let's check that we can parse the request */ 352 rpc_state->request = rpc->request_new(rpc->request_new_arg); 353 if (rpc_state->request == NULL) 354 goto error; 355 356 if (rpc->request_unmarshal( 357 rpc_state->request, req->input_buffer) == -1) { 358 /* we failed to parse the request; that's a bummer */ 359 goto error; 360 } 361 362 /* at this point, we have a well formed request, prepare the reply */ 363 364 rpc_state->reply = rpc->reply_new(rpc->reply_new_arg); 365 if (rpc_state->reply == NULL) 366 goto error; 367 368 /* give the rpc to the user; they can deal with it */ 369 rpc->cb(rpc_state, rpc->cb_arg); 370 371 return; 372 373 error: 374 if (rpc_state != NULL) 375 evrpc_reqstate_free(rpc_state); 376 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 377 return; 378 } 379 380 381 void 382 evrpc_reqstate_free(struct evrpc_req_generic* rpc_state) 383 { 384 struct evrpc *rpc; 385 EVUTIL_ASSERT(rpc_state != NULL); 386 rpc = rpc_state->rpc; 387 388 /* clean up all memory */ 389 if (rpc_state->hook_meta != NULL) 390 evrpc_hook_context_free(rpc_state->hook_meta); 391 if (rpc_state->request != NULL) 392 rpc->request_free(rpc_state->request); 393 if (rpc_state->reply != NULL) 394 rpc->reply_free(rpc_state->reply); 395 if (rpc_state->rpc_data != NULL) 396 evbuffer_free(rpc_state->rpc_data); 397 mm_free(rpc_state); 398 } 399 400 static void 401 evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT); 402 403 void 404 evrpc_request_done(struct evrpc_req_generic *rpc_state) 405 { 406 struct evhttp_request *req; 407 struct evrpc *rpc; 408 409 EVUTIL_ASSERT(rpc_state); 410 411 req = rpc_state->http_req; 412 rpc = rpc_state->rpc; 413 414 if (rpc->reply_complete(rpc_state->reply) == -1) { 415 /* the reply was not completely filled in. error out */ 416 goto error; 417 } 418 419 if ((rpc_state->rpc_data = evbuffer_new()) == NULL) { 420 /* out of memory */ 421 goto error; 422 } 423 424 /* serialize the reply */ 425 rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply); 426 427 if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) { 428 int hook_res; 429 430 evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon); 431 432 /* do hook based tweaks to the request */ 433 hook_res = evrpc_process_hooks(&rpc->base->output_hooks, 434 rpc_state, req, rpc_state->rpc_data); 435 switch (hook_res) { 436 case EVRPC_TERMINATE: 437 goto error; 438 case EVRPC_PAUSE: 439 if (evrpc_pause_request(rpc->base, rpc_state, 440 evrpc_request_done_closure) == -1) 441 goto error; 442 return; 443 case EVRPC_CONTINUE: 444 break; 445 default: 446 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 447 hook_res == EVRPC_CONTINUE || 448 hook_res == EVRPC_PAUSE); 449 } 450 } 451 452 evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE); 453 return; 454 455 error: 456 if (rpc_state != NULL) 457 evrpc_reqstate_free(rpc_state); 458 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 459 return; 460 } 461 462 void * 463 evrpc_get_request(struct evrpc_req_generic *req) 464 { 465 return req->request; 466 } 467 468 void * 469 evrpc_get_reply(struct evrpc_req_generic *req) 470 { 471 return req->reply; 472 } 473 474 static void 475 evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 476 { 477 struct evrpc_req_generic *rpc_state = arg; 478 struct evhttp_request *req; 479 EVUTIL_ASSERT(rpc_state); 480 req = rpc_state->http_req; 481 482 if (hook_res == EVRPC_TERMINATE) 483 goto error; 484 485 /* on success, we are going to transmit marshaled binary data */ 486 if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) { 487 evhttp_add_header(req->output_headers, 488 "Content-Type", "application/octet-stream"); 489 } 490 evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data); 491 492 evrpc_reqstate_free(rpc_state); 493 494 return; 495 496 error: 497 if (rpc_state != NULL) 498 evrpc_reqstate_free(rpc_state); 499 evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 500 return; 501 } 502 503 504 /* Client implementation of RPC site */ 505 506 static int evrpc_schedule_request(struct evhttp_connection *connection, 507 struct evrpc_request_wrapper *ctx); 508 509 struct evrpc_pool * 510 evrpc_pool_new(struct event_base *base) 511 { 512 struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool)); 513 if (pool == NULL) 514 return (NULL); 515 516 TAILQ_INIT(&pool->connections); 517 TAILQ_INIT(&pool->requests); 518 519 TAILQ_INIT(&pool->paused_requests); 520 521 TAILQ_INIT(&pool->input_hooks); 522 TAILQ_INIT(&pool->output_hooks); 523 524 pool->base = base; 525 pool->timeout = -1; 526 527 return (pool); 528 } 529 530 static void 531 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request) 532 { 533 if (request->hook_meta != NULL) 534 evrpc_hook_context_free(request->hook_meta); 535 mm_free(request->name); 536 mm_free(request); 537 } 538 539 void 540 evrpc_pool_free(struct evrpc_pool *pool) 541 { 542 struct evhttp_connection *connection; 543 struct evrpc_request_wrapper *request; 544 struct evrpc_hook_ctx *pause; 545 struct evrpc_hook *hook; 546 int r; 547 548 while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { 549 TAILQ_REMOVE(&pool->requests, request, next); 550 evrpc_request_wrapper_free(request); 551 } 552 553 while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) { 554 TAILQ_REMOVE(&pool->paused_requests, pause, next); 555 mm_free(pause); 556 } 557 558 while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) { 559 TAILQ_REMOVE(&pool->connections, connection, next); 560 evhttp_connection_free(connection); 561 } 562 563 while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) { 564 r = evrpc_remove_hook(pool, EVRPC_INPUT, hook); 565 EVUTIL_ASSERT(r); 566 } 567 568 while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) { 569 r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook); 570 EVUTIL_ASSERT(r); 571 } 572 573 mm_free(pool); 574 } 575 576 /* 577 * Add a connection to the RPC pool. A request scheduled on the pool 578 * may use any available connection. 579 */ 580 581 void 582 evrpc_pool_add_connection(struct evrpc_pool *pool, 583 struct evhttp_connection *connection) 584 { 585 EVUTIL_ASSERT(connection->http_server == NULL); 586 TAILQ_INSERT_TAIL(&pool->connections, connection, next); 587 588 /* 589 * associate an event base with this connection 590 */ 591 if (pool->base != NULL) 592 evhttp_connection_set_base(connection, pool->base); 593 594 /* 595 * unless a timeout was specifically set for a connection, 596 * the connection inherits the timeout from the pool. 597 */ 598 if (connection->timeout == -1) 599 connection->timeout = pool->timeout; 600 601 /* 602 * if we have any requests pending, schedule them with the new 603 * connections. 604 */ 605 606 if (TAILQ_FIRST(&pool->requests) != NULL) { 607 struct evrpc_request_wrapper *request = 608 TAILQ_FIRST(&pool->requests); 609 TAILQ_REMOVE(&pool->requests, request, next); 610 evrpc_schedule_request(connection, request); 611 } 612 } 613 614 void 615 evrpc_pool_remove_connection(struct evrpc_pool *pool, 616 struct evhttp_connection *connection) 617 { 618 TAILQ_REMOVE(&pool->connections, connection, next); 619 } 620 621 void 622 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs) 623 { 624 struct evhttp_connection *evcon; 625 TAILQ_FOREACH(evcon, &pool->connections, next) { 626 evcon->timeout = timeout_in_secs; 627 } 628 pool->timeout = timeout_in_secs; 629 } 630 631 632 static void evrpc_reply_done(struct evhttp_request *, void *); 633 static void evrpc_request_timeout(evutil_socket_t, short, void *); 634 635 /* 636 * Finds a connection object associated with the pool that is currently 637 * idle and can be used to make a request. 638 */ 639 static struct evhttp_connection * 640 evrpc_pool_find_connection(struct evrpc_pool *pool) 641 { 642 struct evhttp_connection *connection; 643 TAILQ_FOREACH(connection, &pool->connections, next) { 644 if (TAILQ_FIRST(&connection->requests) == NULL) 645 return (connection); 646 } 647 648 return (NULL); 649 } 650 651 /* 652 * Prototypes responsible for evrpc scheduling and hooking 653 */ 654 655 static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT); 656 657 /* 658 * We assume that the ctx is no longer queued on the pool. 659 */ 660 static int 661 evrpc_schedule_request(struct evhttp_connection *connection, 662 struct evrpc_request_wrapper *ctx) 663 { 664 struct evhttp_request *req = NULL; 665 struct evrpc_pool *pool = ctx->pool; 666 struct evrpc_status status; 667 668 if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) 669 goto error; 670 671 /* serialize the request data into the output buffer */ 672 ctx->request_marshal(req->output_buffer, ctx->request); 673 674 /* we need to know the connection that we might have to abort */ 675 ctx->evcon = connection; 676 677 /* if we get paused we also need to know the request */ 678 ctx->req = req; 679 680 if (TAILQ_FIRST(&pool->output_hooks) != NULL) { 681 int hook_res; 682 683 evrpc_hook_associate_meta(&ctx->hook_meta, connection); 684 685 /* apply hooks to the outgoing request */ 686 hook_res = evrpc_process_hooks(&pool->output_hooks, 687 ctx, req, req->output_buffer); 688 689 switch (hook_res) { 690 case EVRPC_TERMINATE: 691 goto error; 692 case EVRPC_PAUSE: 693 /* we need to be explicitly resumed */ 694 if (evrpc_pause_request(pool, ctx, 695 evrpc_schedule_request_closure) == -1) 696 goto error; 697 return (0); 698 case EVRPC_CONTINUE: 699 /* we can just continue */ 700 break; 701 default: 702 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 703 hook_res == EVRPC_CONTINUE || 704 hook_res == EVRPC_PAUSE); 705 } 706 } 707 708 evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE); 709 return (0); 710 711 error: 712 memset(&status, 0, sizeof(status)); 713 status.error = EVRPC_STATUS_ERR_UNSTARTED; 714 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 715 evrpc_request_wrapper_free(ctx); 716 return (-1); 717 } 718 719 static void 720 evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 721 { 722 struct evrpc_request_wrapper *ctx = arg; 723 struct evhttp_connection *connection = ctx->evcon; 724 struct evhttp_request *req = ctx->req; 725 struct evrpc_pool *pool = ctx->pool; 726 struct evrpc_status status; 727 char *uri = NULL; 728 int res = 0; 729 730 if (hook_res == EVRPC_TERMINATE) 731 goto error; 732 733 uri = evrpc_construct_uri(ctx->name); 734 if (uri == NULL) 735 goto error; 736 737 if (pool->timeout > 0) { 738 /* 739 * a timeout after which the whole rpc is going to be aborted. 740 */ 741 struct timeval tv; 742 evutil_timerclear(&tv); 743 tv.tv_sec = pool->timeout; 744 evtimer_add(&ctx->ev_timeout, &tv); 745 } 746 747 /* start the request over the connection */ 748 res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri); 749 mm_free(uri); 750 751 if (res == -1) 752 goto error; 753 754 return; 755 756 error: 757 memset(&status, 0, sizeof(status)); 758 status.error = EVRPC_STATUS_ERR_UNSTARTED; 759 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 760 evrpc_request_wrapper_free(ctx); 761 } 762 763 /* we just queue the paused request on the pool under the req object */ 764 static int 765 evrpc_pause_request(void *vbase, void *ctx, 766 void (*cb)(void *, enum EVRPC_HOOK_RESULT)) 767 { 768 struct _evrpc_hooks *base = vbase; 769 struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause)); 770 if (pause == NULL) 771 return (-1); 772 773 pause->ctx = ctx; 774 pause->cb = cb; 775 776 TAILQ_INSERT_TAIL(&base->pause_requests, pause, next); 777 return (0); 778 } 779 780 int 781 evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res) 782 { 783 struct _evrpc_hooks *base = vbase; 784 struct evrpc_pause_list *head = &base->pause_requests; 785 struct evrpc_hook_ctx *pause; 786 787 TAILQ_FOREACH(pause, head, next) { 788 if (pause->ctx == ctx) 789 break; 790 } 791 792 if (pause == NULL) 793 return (-1); 794 795 (*pause->cb)(pause->ctx, res); 796 TAILQ_REMOVE(head, pause, next); 797 mm_free(pause); 798 return (0); 799 } 800 801 int 802 evrpc_make_request(struct evrpc_request_wrapper *ctx) 803 { 804 struct evrpc_pool *pool = ctx->pool; 805 806 /* initialize the event structure for this rpc */ 807 evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx); 808 809 /* we better have some available connections on the pool */ 810 EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL); 811 812 /* 813 * if no connection is available, we queue the request on the pool, 814 * the next time a connection is empty, the rpc will be send on that. 815 */ 816 TAILQ_INSERT_TAIL(&pool->requests, ctx, next); 817 818 evrpc_pool_schedule(pool); 819 820 return (0); 821 } 822 823 824 struct evrpc_request_wrapper * 825 evrpc_make_request_ctx( 826 struct evrpc_pool *pool, void *request, void *reply, 827 const char *rpcname, 828 void (*req_marshal)(struct evbuffer*, void *), 829 void (*rpl_clear)(void *), 830 int (*rpl_unmarshal)(void *, struct evbuffer *), 831 void (*cb)(struct evrpc_status *, void *, void *, void *), 832 void *cbarg) 833 { 834 struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *) 835 mm_malloc(sizeof(struct evrpc_request_wrapper)); 836 if (ctx == NULL) 837 return (NULL); 838 839 ctx->pool = pool; 840 ctx->hook_meta = NULL; 841 ctx->evcon = NULL; 842 ctx->name = mm_strdup(rpcname); 843 if (ctx->name == NULL) { 844 mm_free(ctx); 845 return (NULL); 846 } 847 ctx->cb = cb; 848 ctx->cb_arg = cbarg; 849 ctx->request = request; 850 ctx->reply = reply; 851 ctx->request_marshal = req_marshal; 852 ctx->reply_clear = rpl_clear; 853 ctx->reply_unmarshal = rpl_unmarshal; 854 855 return (ctx); 856 } 857 858 static void 859 evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT); 860 861 static void 862 evrpc_reply_done(struct evhttp_request *req, void *arg) 863 { 864 struct evrpc_request_wrapper *ctx = arg; 865 struct evrpc_pool *pool = ctx->pool; 866 int hook_res = EVRPC_CONTINUE; 867 868 /* cancel any timeout we might have scheduled */ 869 event_del(&ctx->ev_timeout); 870 871 ctx->req = req; 872 873 /* we need to get the reply now */ 874 if (req == NULL) { 875 evrpc_reply_done_closure(ctx, EVRPC_CONTINUE); 876 return; 877 } 878 879 if (TAILQ_FIRST(&pool->input_hooks) != NULL) { 880 evrpc_hook_associate_meta(&ctx->hook_meta, ctx->evcon); 881 882 /* apply hooks to the incoming request */ 883 hook_res = evrpc_process_hooks(&pool->input_hooks, 884 ctx, req, req->input_buffer); 885 886 switch (hook_res) { 887 case EVRPC_TERMINATE: 888 case EVRPC_CONTINUE: 889 break; 890 case EVRPC_PAUSE: 891 /* 892 * if we get paused we also need to know the 893 * request. unfortunately, the underlying 894 * layer is going to free it. we need to 895 * request ownership explicitly 896 */ 897 if (req != NULL) 898 evhttp_request_own(req); 899 900 evrpc_pause_request(pool, ctx, 901 evrpc_reply_done_closure); 902 return; 903 default: 904 EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 905 hook_res == EVRPC_CONTINUE || 906 hook_res == EVRPC_PAUSE); 907 } 908 } 909 910 evrpc_reply_done_closure(ctx, hook_res); 911 912 /* http request is being freed by underlying layer */ 913 } 914 915 static void 916 evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 917 { 918 struct evrpc_request_wrapper *ctx = arg; 919 struct evhttp_request *req = ctx->req; 920 struct evrpc_pool *pool = ctx->pool; 921 struct evrpc_status status; 922 int res = -1; 923 924 memset(&status, 0, sizeof(status)); 925 status.http_req = req; 926 927 /* we need to get the reply now */ 928 if (req == NULL) { 929 status.error = EVRPC_STATUS_ERR_TIMEOUT; 930 } else if (hook_res == EVRPC_TERMINATE) { 931 status.error = EVRPC_STATUS_ERR_HOOKABORTED; 932 } else { 933 res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); 934 if (res == -1) 935 status.error = EVRPC_STATUS_ERR_BADPAYLOAD; 936 } 937 938 if (res == -1) { 939 /* clear everything that we might have written previously */ 940 ctx->reply_clear(ctx->reply); 941 } 942 943 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 944 945 evrpc_request_wrapper_free(ctx); 946 947 /* the http layer owned the original request structure, but if we 948 * got paused, we asked for ownership and need to free it here. */ 949 if (req != NULL && evhttp_request_is_owned(req)) 950 evhttp_request_free(req); 951 952 /* see if we can schedule another request */ 953 evrpc_pool_schedule(pool); 954 } 955 956 static void 957 evrpc_pool_schedule(struct evrpc_pool *pool) 958 { 959 struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests); 960 struct evhttp_connection *evcon; 961 962 /* if no requests are pending, we have no work */ 963 if (ctx == NULL) 964 return; 965 966 if ((evcon = evrpc_pool_find_connection(pool)) != NULL) { 967 TAILQ_REMOVE(&pool->requests, ctx, next); 968 evrpc_schedule_request(evcon, ctx); 969 } 970 } 971 972 static void 973 evrpc_request_timeout(evutil_socket_t fd, short what, void *arg) 974 { 975 struct evrpc_request_wrapper *ctx = arg; 976 struct evhttp_connection *evcon = ctx->evcon; 977 EVUTIL_ASSERT(evcon != NULL); 978 979 evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT); 980 } 981 982 /* 983 * frees potential meta data associated with a request. 984 */ 985 986 static void 987 evrpc_meta_data_free(struct evrpc_meta_list *meta_data) 988 { 989 struct evrpc_meta *entry; 990 EVUTIL_ASSERT(meta_data != NULL); 991 992 while ((entry = TAILQ_FIRST(meta_data)) != NULL) { 993 TAILQ_REMOVE(meta_data, entry, next); 994 mm_free(entry->key); 995 mm_free(entry->data); 996 mm_free(entry); 997 } 998 } 999 1000 static struct evrpc_hook_meta * 1001 evrpc_hook_meta_new(void) 1002 { 1003 struct evrpc_hook_meta *ctx; 1004 ctx = mm_malloc(sizeof(struct evrpc_hook_meta)); 1005 EVUTIL_ASSERT(ctx != NULL); 1006 1007 TAILQ_INIT(&ctx->meta_data); 1008 ctx->evcon = NULL; 1009 1010 return (ctx); 1011 } 1012 1013 static void 1014 evrpc_hook_associate_meta(struct evrpc_hook_meta **pctx, 1015 struct evhttp_connection *evcon) 1016 { 1017 struct evrpc_hook_meta *ctx = *pctx; 1018 if (ctx == NULL) 1019 *pctx = ctx = evrpc_hook_meta_new(); 1020 ctx->evcon = evcon; 1021 } 1022 1023 static void 1024 evrpc_hook_context_free(struct evrpc_hook_meta *ctx) 1025 { 1026 evrpc_meta_data_free(&ctx->meta_data); 1027 mm_free(ctx); 1028 } 1029 1030 /* Adds meta data */ 1031 void 1032 evrpc_hook_add_meta(void *ctx, const char *key, 1033 const void *data, size_t data_size) 1034 { 1035 struct evrpc_request_wrapper *req = ctx; 1036 struct evrpc_hook_meta *store = NULL; 1037 struct evrpc_meta *meta = NULL; 1038 1039 if ((store = req->hook_meta) == NULL) 1040 store = req->hook_meta = evrpc_hook_meta_new(); 1041 1042 meta = mm_malloc(sizeof(struct evrpc_meta)); 1043 EVUTIL_ASSERT(meta != NULL); 1044 meta->key = mm_strdup(key); 1045 EVUTIL_ASSERT(meta->key != NULL); 1046 meta->data_size = data_size; 1047 meta->data = mm_malloc(data_size); 1048 EVUTIL_ASSERT(meta->data != NULL); 1049 memcpy(meta->data, data, data_size); 1050 1051 TAILQ_INSERT_TAIL(&store->meta_data, meta, next); 1052 } 1053 1054 int 1055 evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size) 1056 { 1057 struct evrpc_request_wrapper *req = ctx; 1058 struct evrpc_meta *meta = NULL; 1059 1060 if (req->hook_meta == NULL) 1061 return (-1); 1062 1063 TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) { 1064 if (strcmp(meta->key, key) == 0) { 1065 *data = meta->data; 1066 *data_size = meta->data_size; 1067 return (0); 1068 } 1069 } 1070 1071 return (-1); 1072 } 1073 1074 struct evhttp_connection * 1075 evrpc_hook_get_connection(void *ctx) 1076 { 1077 struct evrpc_request_wrapper *req = ctx; 1078 return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL); 1079 } 1080 1081 int 1082 evrpc_send_request_generic(struct evrpc_pool *pool, 1083 void *request, void *reply, 1084 void (*cb)(struct evrpc_status *, void *, void *, void *), 1085 void *cb_arg, 1086 const char *rpcname, 1087 void (*req_marshal)(struct evbuffer *, void *), 1088 void (*rpl_clear)(void *), 1089 int (*rpl_unmarshal)(void *, struct evbuffer *)) 1090 { 1091 struct evrpc_status status; 1092 struct evrpc_request_wrapper *ctx; 1093 ctx = evrpc_make_request_ctx(pool, request, reply, 1094 rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg); 1095 if (ctx == NULL) 1096 goto error; 1097 return (evrpc_make_request(ctx)); 1098 error: 1099 memset(&status, 0, sizeof(status)); 1100 status.error = EVRPC_STATUS_ERR_UNSTARTED; 1101 (*(cb))(&status, request, reply, cb_arg); 1102 return (-1); 1103 } 1104 1105 /** Takes a request object and fills it in with the right magic */ 1106 static struct evrpc * 1107 evrpc_register_object(const char *name, 1108 void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *), 1109 int (*req_unmarshal)(void *, struct evbuffer *), 1110 void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *), 1111 int (*rpl_complete)(void *), 1112 void (*rpl_marshal)(struct evbuffer *, void *)) 1113 { 1114 struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc)); 1115 if (rpc == NULL) 1116 return (NULL); 1117 rpc->uri = mm_strdup(name); 1118 if (rpc->uri == NULL) { 1119 mm_free(rpc); 1120 return (NULL); 1121 } 1122 rpc->request_new = req_new; 1123 rpc->request_new_arg = req_new_arg; 1124 rpc->request_free = req_free; 1125 rpc->request_unmarshal = req_unmarshal; 1126 rpc->reply_new = rpl_new; 1127 rpc->reply_new_arg = rpl_new_arg; 1128 rpc->reply_free = rpl_free; 1129 rpc->reply_complete = rpl_complete; 1130 rpc->reply_marshal = rpl_marshal; 1131 return (rpc); 1132 } 1133 1134 int 1135 evrpc_register_generic(struct evrpc_base *base, const char *name, 1136 void (*callback)(struct evrpc_req_generic *, void *), void *cbarg, 1137 void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *), 1138 int (*req_unmarshal)(void *, struct evbuffer *), 1139 void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *), 1140 int (*rpl_complete)(void *), 1141 void (*rpl_marshal)(struct evbuffer *, void *)) 1142 { 1143 struct evrpc* rpc = 1144 evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal, 1145 rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal); 1146 if (rpc == NULL) 1147 return (-1); 1148 evrpc_register_rpc(base, rpc, 1149 (void (*)(struct evrpc_req_generic*, void *))callback, cbarg); 1150 return (0); 1151 } 1152 1153 /** accessors for obscure and undocumented functionality */ 1154 struct evrpc_pool * 1155 evrpc_request_get_pool(struct evrpc_request_wrapper *ctx) 1156 { 1157 return (ctx->pool); 1158 } 1159 1160 void 1161 evrpc_request_set_pool(struct evrpc_request_wrapper *ctx, 1162 struct evrpc_pool *pool) 1163 { 1164 ctx->pool = pool; 1165 } 1166 1167 void 1168 evrpc_request_set_cb(struct evrpc_request_wrapper *ctx, 1169 void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg), 1170 void *cb_arg) 1171 { 1172 ctx->cb = cb; 1173 ctx->cb_arg = cb_arg; 1174 } 1175