Home | History | Annotate | Download | only in libevent
      1 /*
      2  * Copyright (c) 2000-2007 Niels Provos <provos (at) citi.umich.edu>
      3  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions
      7  * are met:
      8  * 1. Redistributions of source code must retain the above copyright
      9  *    notice, this list of conditions and the following disclaimer.
     10  * 2. Redistributions in binary form must reproduce the above copyright
     11  *    notice, this list of conditions and the following disclaimer in the
     12  *    documentation and/or other materials provided with the distribution.
     13  * 3. The name of the author may not be used to endorse or promote products
     14  *    derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     17  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     18  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     19  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     20  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     21  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     22  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     23  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     25  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 #include "event2/event-config.h"
     28 
     29 #ifdef WIN32
     30 #define WIN32_LEAN_AND_MEAN
     31 #include <winsock2.h>
     32 #include <windows.h>
     33 #undef WIN32_LEAN_AND_MEAN
     34 #endif
     35 
     36 #include <sys/types.h>
     37 #ifndef WIN32
     38 #include <sys/socket.h>
     39 #endif
     40 #ifdef _EVENT_HAVE_SYS_TIME_H
     41 #include <sys/time.h>
     42 #endif
     43 #include <sys/queue.h>
     44 #include <stdio.h>
     45 #include <stdlib.h>
     46 #ifndef WIN32
     47 #include <unistd.h>
     48 #endif
     49 #include <errno.h>
     50 #include <signal.h>
     51 #include <string.h>
     52 
     53 #include <sys/queue.h>
     54 
     55 #include "event2/event.h"
     56 #include "event2/event_struct.h"
     57 #include "event2/rpc.h"
     58 #include "event2/rpc_struct.h"
     59 #include "evrpc-internal.h"
     60 #include "event2/http.h"
     61 #include "event2/buffer.h"
     62 #include "event2/tag.h"
     63 #include "event2/http_struct.h"
     64 #include "event2/http_compat.h"
     65 #include "event2/util.h"
     66 #include "util-internal.h"
     67 #include "log-internal.h"
     68 #include "mm-internal.h"
     69 
     70 struct evrpc_base *
     71 evrpc_init(struct evhttp *http_server)
     72 {
     73 	struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base));
     74 	if (base == NULL)
     75 		return (NULL);
     76 
     77 	/* we rely on the tagging sub system */
     78 	evtag_init();
     79 
     80 	TAILQ_INIT(&base->registered_rpcs);
     81 	TAILQ_INIT(&base->input_hooks);
     82 	TAILQ_INIT(&base->output_hooks);
     83 
     84 	TAILQ_INIT(&base->paused_requests);
     85 
     86 	base->http_server = http_server;
     87 
     88 	return (base);
     89 }
     90 
     91 void
     92 evrpc_free(struct evrpc_base *base)
     93 {
     94 	struct evrpc *rpc;
     95 	struct evrpc_hook *hook;
     96 	struct evrpc_hook_ctx *pause;
     97 	int r;
     98 
     99 	while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
    100 		r = evrpc_unregister_rpc(base, rpc->uri);
    101 		EVUTIL_ASSERT(r == 0);
    102 	}
    103 	while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) {
    104 		TAILQ_REMOVE(&base->paused_requests, pause, next);
    105 		mm_free(pause);
    106 	}
    107 	while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
    108 		r = evrpc_remove_hook(base, EVRPC_INPUT, hook);
    109 		EVUTIL_ASSERT(r);
    110 	}
    111 	while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
    112 		r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook);
    113 		EVUTIL_ASSERT(r);
    114 	}
    115 	mm_free(base);
    116 }
    117 
    118 void *
    119 evrpc_add_hook(void *vbase,
    120     enum EVRPC_HOOK_TYPE hook_type,
    121     int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *),
    122     void *cb_arg)
    123 {
    124 	struct _evrpc_hooks *base = vbase;
    125 	struct evrpc_hook_list *head = NULL;
    126 	struct evrpc_hook *hook = NULL;
    127 	switch (hook_type) {
    128 	case EVRPC_INPUT:
    129 		head = &base->in_hooks;
    130 		break;
    131 	case EVRPC_OUTPUT:
    132 		head = &base->out_hooks;
    133 		break;
    134 	default:
    135 		EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
    136 	}
    137 
    138 	hook = mm_calloc(1, sizeof(struct evrpc_hook));
    139 	EVUTIL_ASSERT(hook != NULL);
    140 
    141 	hook->process = cb;
    142 	hook->process_arg = cb_arg;
    143 	TAILQ_INSERT_TAIL(head, hook, next);
    144 
    145 	return (hook);
    146 }
    147 
    148 static int
    149 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
    150 {
    151 	struct evrpc_hook *hook = NULL;
    152 	TAILQ_FOREACH(hook, head, next) {
    153 		if (hook == handle) {
    154 			TAILQ_REMOVE(head, hook, next);
    155 			mm_free(hook);
    156 			return (1);
    157 		}
    158 	}
    159 
    160 	return (0);
    161 }
    162 
    163 /*
    164  * remove the hook specified by the handle
    165  */
    166 
    167 int
    168 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
    169 {
    170 	struct _evrpc_hooks *base = vbase;
    171 	struct evrpc_hook_list *head = NULL;
    172 	switch (hook_type) {
    173 	case EVRPC_INPUT:
    174 		head = &base->in_hooks;
    175 		break;
    176 	case EVRPC_OUTPUT:
    177 		head = &base->out_hooks;
    178 		break;
    179 	default:
    180 		EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
    181 	}
    182 
    183 	return (evrpc_remove_hook_internal(head, handle));
    184 }
    185 
    186 static int
    187 evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx,
    188     struct evhttp_request *req, struct evbuffer *evbuf)
    189 {
    190 	struct evrpc_hook *hook;
    191 	TAILQ_FOREACH(hook, head, next) {
    192 		int res = hook->process(ctx, req, evbuf, hook->process_arg);
    193 		if (res != EVRPC_CONTINUE)
    194 			return (res);
    195 	}
    196 
    197 	return (EVRPC_CONTINUE);
    198 }
    199 
    200 static void evrpc_pool_schedule(struct evrpc_pool *pool);
    201 static void evrpc_request_cb(struct evhttp_request *, void *);
    202 
    203 /*
    204  * Registers a new RPC with the HTTP server.   The evrpc object is expected
    205  * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
    206  * calls this function.
    207  */
    208 
    209 static char *
    210 evrpc_construct_uri(const char *uri)
    211 {
    212 	char *constructed_uri;
    213 	size_t constructed_uri_len;
    214 
    215 	constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
    216 	if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL)
    217 		event_err(1, "%s: failed to register rpc at %s",
    218 		    __func__, uri);
    219 	memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
    220 	memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
    221 	constructed_uri[constructed_uri_len - 1] = '\0';
    222 
    223 	return (constructed_uri);
    224 }
    225 
    226 int
    227 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
    228     void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
    229 {
    230 	char *constructed_uri = evrpc_construct_uri(rpc->uri);
    231 
    232 	rpc->base = base;
    233 	rpc->cb = cb;
    234 	rpc->cb_arg = cb_arg;
    235 
    236 	TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
    237 
    238 	evhttp_set_cb(base->http_server,
    239 	    constructed_uri,
    240 	    evrpc_request_cb,
    241 	    rpc);
    242 
    243 	mm_free(constructed_uri);
    244 
    245 	return (0);
    246 }
    247 
    248 int
    249 evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
    250 {
    251 	char *registered_uri = NULL;
    252 	struct evrpc *rpc;
    253 	int r;
    254 
    255 	/* find the right rpc; linear search might be slow */
    256 	TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
    257 		if (strcmp(rpc->uri, name) == 0)
    258 			break;
    259 	}
    260 	if (rpc == NULL) {
    261 		/* We did not find an RPC with this name */
    262 		return (-1);
    263 	}
    264 	TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
    265 
    266 	registered_uri = evrpc_construct_uri(name);
    267 
    268 	/* remove the http server callback */
    269 	r = evhttp_del_cb(base->http_server, registered_uri);
    270 	EVUTIL_ASSERT(r == 0);
    271 
    272 	mm_free(registered_uri);
    273 
    274 	mm_free((char *)rpc->uri);
    275 	mm_free(rpc);
    276 	return (0);
    277 }
    278 
    279 static int evrpc_pause_request(void *vbase, void *ctx,
    280     void (*cb)(void *, enum EVRPC_HOOK_RESULT));
    281 static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT);
    282 
    283 static void
    284 evrpc_request_cb(struct evhttp_request *req, void *arg)
    285 {
    286 	struct evrpc *rpc = arg;
    287 	struct evrpc_req_generic *rpc_state = NULL;
    288 
    289 	/* let's verify the outside parameters */
    290 	if (req->type != EVHTTP_REQ_POST ||
    291 	    evbuffer_get_length(req->input_buffer) <= 0)
    292 		goto error;
    293 
    294 	rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic));
    295 	if (rpc_state == NULL)
    296 		goto error;
    297 	rpc_state->rpc = rpc;
    298 	rpc_state->http_req = req;
    299 	rpc_state->rpc_data = NULL;
    300 
    301 	if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) {
    302 		int hook_res;
    303 
    304 		evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
    305 
    306 		/*
    307 		 * allow hooks to modify the outgoing request
    308 		 */
    309 		hook_res = evrpc_process_hooks(&rpc->base->input_hooks,
    310 		    rpc_state, req, req->input_buffer);
    311 		switch (hook_res) {
    312 		case EVRPC_TERMINATE:
    313 			goto error;
    314 		case EVRPC_PAUSE:
    315 			evrpc_pause_request(rpc->base, rpc_state,
    316 			    evrpc_request_cb_closure);
    317 			return;
    318 		case EVRPC_CONTINUE:
    319 			break;
    320 		default:
    321 			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
    322 			    hook_res == EVRPC_CONTINUE ||
    323 			    hook_res == EVRPC_PAUSE);
    324 		}
    325 	}
    326 
    327 	evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE);
    328 	return;
    329 
    330 error:
    331 	if (rpc_state != NULL)
    332 		evrpc_reqstate_free(rpc_state);
    333 	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
    334 	return;
    335 }
    336 
    337 static void
    338 evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
    339 {
    340 	struct evrpc_req_generic *rpc_state = arg;
    341 	struct evrpc *rpc;
    342 	struct evhttp_request *req;
    343 
    344 	EVUTIL_ASSERT(rpc_state);
    345 	rpc = rpc_state->rpc;
    346 	req = rpc_state->http_req;
    347 
    348 	if (hook_res == EVRPC_TERMINATE)
    349 		goto error;
    350 
    351 	/* let's check that we can parse the request */
    352 	rpc_state->request = rpc->request_new(rpc->request_new_arg);
    353 	if (rpc_state->request == NULL)
    354 		goto error;
    355 
    356 	if (rpc->request_unmarshal(
    357 		    rpc_state->request, req->input_buffer) == -1) {
    358 		/* we failed to parse the request; that's a bummer */
    359 		goto error;
    360 	}
    361 
    362 	/* at this point, we have a well formed request, prepare the reply */
    363 
    364 	rpc_state->reply = rpc->reply_new(rpc->reply_new_arg);
    365 	if (rpc_state->reply == NULL)
    366 		goto error;
    367 
    368 	/* give the rpc to the user; they can deal with it */
    369 	rpc->cb(rpc_state, rpc->cb_arg);
    370 
    371 	return;
    372 
    373 error:
    374 	if (rpc_state != NULL)
    375 		evrpc_reqstate_free(rpc_state);
    376 	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
    377 	return;
    378 }
    379 
    380 
    381 void
    382 evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
    383 {
    384 	struct evrpc *rpc;
    385 	EVUTIL_ASSERT(rpc_state != NULL);
    386 	rpc = rpc_state->rpc;
    387 
    388 	/* clean up all memory */
    389 	if (rpc_state->hook_meta != NULL)
    390 		evrpc_hook_context_free(rpc_state->hook_meta);
    391 	if (rpc_state->request != NULL)
    392 		rpc->request_free(rpc_state->request);
    393 	if (rpc_state->reply != NULL)
    394 		rpc->reply_free(rpc_state->reply);
    395 	if (rpc_state->rpc_data != NULL)
    396 		evbuffer_free(rpc_state->rpc_data);
    397 	mm_free(rpc_state);
    398 }
    399 
    400 static void
    401 evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT);
    402 
    403 void
    404 evrpc_request_done(struct evrpc_req_generic *rpc_state)
    405 {
    406 	struct evhttp_request *req;
    407 	struct evrpc *rpc;
    408 
    409 	EVUTIL_ASSERT(rpc_state);
    410 
    411 	req = rpc_state->http_req;
    412 	rpc = rpc_state->rpc;
    413 
    414 	if (rpc->reply_complete(rpc_state->reply) == -1) {
    415 		/* the reply was not completely filled in.  error out */
    416 		goto error;
    417 	}
    418 
    419 	if ((rpc_state->rpc_data = evbuffer_new()) == NULL) {
    420 		/* out of memory */
    421 		goto error;
    422 	}
    423 
    424 	/* serialize the reply */
    425 	rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply);
    426 
    427 	if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) {
    428 		int hook_res;
    429 
    430 		evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
    431 
    432 		/* do hook based tweaks to the request */
    433 		hook_res = evrpc_process_hooks(&rpc->base->output_hooks,
    434 		    rpc_state, req, rpc_state->rpc_data);
    435 		switch (hook_res) {
    436 		case EVRPC_TERMINATE:
    437 			goto error;
    438 		case EVRPC_PAUSE:
    439 			if (evrpc_pause_request(rpc->base, rpc_state,
    440 				evrpc_request_done_closure) == -1)
    441 				goto error;
    442 			return;
    443 		case EVRPC_CONTINUE:
    444 			break;
    445 		default:
    446 			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
    447 			    hook_res == EVRPC_CONTINUE ||
    448 			    hook_res == EVRPC_PAUSE);
    449 		}
    450 	}
    451 
    452 	evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE);
    453 	return;
    454 
    455 error:
    456 	if (rpc_state != NULL)
    457 		evrpc_reqstate_free(rpc_state);
    458 	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
    459 	return;
    460 }
    461 
    462 void *
    463 evrpc_get_request(struct evrpc_req_generic *req)
    464 {
    465 	return req->request;
    466 }
    467 
    468 void *
    469 evrpc_get_reply(struct evrpc_req_generic *req)
    470 {
    471 	return req->reply;
    472 }
    473 
    474 static void
    475 evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
    476 {
    477 	struct evrpc_req_generic *rpc_state = arg;
    478 	struct evhttp_request *req;
    479 	EVUTIL_ASSERT(rpc_state);
    480 	req = rpc_state->http_req;
    481 
    482 	if (hook_res == EVRPC_TERMINATE)
    483 		goto error;
    484 
    485 	/* on success, we are going to transmit marshaled binary data */
    486 	if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
    487 		evhttp_add_header(req->output_headers,
    488 		    "Content-Type", "application/octet-stream");
    489 	}
    490 	evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data);
    491 
    492 	evrpc_reqstate_free(rpc_state);
    493 
    494 	return;
    495 
    496 error:
    497 	if (rpc_state != NULL)
    498 		evrpc_reqstate_free(rpc_state);
    499 	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
    500 	return;
    501 }
    502 
    503 
    504 /* Client implementation of RPC site */
    505 
    506 static int evrpc_schedule_request(struct evhttp_connection *connection,
    507     struct evrpc_request_wrapper *ctx);
    508 
    509 struct evrpc_pool *
    510 evrpc_pool_new(struct event_base *base)
    511 {
    512 	struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool));
    513 	if (pool == NULL)
    514 		return (NULL);
    515 
    516 	TAILQ_INIT(&pool->connections);
    517 	TAILQ_INIT(&pool->requests);
    518 
    519 	TAILQ_INIT(&pool->paused_requests);
    520 
    521 	TAILQ_INIT(&pool->input_hooks);
    522 	TAILQ_INIT(&pool->output_hooks);
    523 
    524 	pool->base = base;
    525 	pool->timeout = -1;
    526 
    527 	return (pool);
    528 }
    529 
    530 static void
    531 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
    532 {
    533 	if (request->hook_meta != NULL)
    534 		evrpc_hook_context_free(request->hook_meta);
    535 	mm_free(request->name);
    536 	mm_free(request);
    537 }
    538 
    539 void
    540 evrpc_pool_free(struct evrpc_pool *pool)
    541 {
    542 	struct evhttp_connection *connection;
    543 	struct evrpc_request_wrapper *request;
    544 	struct evrpc_hook_ctx *pause;
    545 	struct evrpc_hook *hook;
    546 	int r;
    547 
    548 	while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
    549 		TAILQ_REMOVE(&pool->requests, request, next);
    550 		evrpc_request_wrapper_free(request);
    551 	}
    552 
    553 	while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) {
    554 		TAILQ_REMOVE(&pool->paused_requests, pause, next);
    555 		mm_free(pause);
    556 	}
    557 
    558 	while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
    559 		TAILQ_REMOVE(&pool->connections, connection, next);
    560 		evhttp_connection_free(connection);
    561 	}
    562 
    563 	while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
    564 		r = evrpc_remove_hook(pool, EVRPC_INPUT, hook);
    565 		EVUTIL_ASSERT(r);
    566 	}
    567 
    568 	while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
    569 		r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook);
    570 		EVUTIL_ASSERT(r);
    571 	}
    572 
    573 	mm_free(pool);
    574 }
    575 
    576 /*
    577  * Add a connection to the RPC pool.   A request scheduled on the pool
    578  * may use any available connection.
    579  */
    580 
    581 void
    582 evrpc_pool_add_connection(struct evrpc_pool *pool,
    583     struct evhttp_connection *connection)
    584 {
    585 	EVUTIL_ASSERT(connection->http_server == NULL);
    586 	TAILQ_INSERT_TAIL(&pool->connections, connection, next);
    587 
    588 	/*
    589 	 * associate an event base with this connection
    590 	 */
    591 	if (pool->base != NULL)
    592 		evhttp_connection_set_base(connection, pool->base);
    593 
    594 	/*
    595 	 * unless a timeout was specifically set for a connection,
    596 	 * the connection inherits the timeout from the pool.
    597 	 */
    598 	if (connection->timeout == -1)
    599 		connection->timeout = pool->timeout;
    600 
    601 	/*
    602 	 * if we have any requests pending, schedule them with the new
    603 	 * connections.
    604 	 */
    605 
    606 	if (TAILQ_FIRST(&pool->requests) != NULL) {
    607 		struct evrpc_request_wrapper *request =
    608 		    TAILQ_FIRST(&pool->requests);
    609 		TAILQ_REMOVE(&pool->requests, request, next);
    610 		evrpc_schedule_request(connection, request);
    611 	}
    612 }
    613 
    614 void
    615 evrpc_pool_remove_connection(struct evrpc_pool *pool,
    616     struct evhttp_connection *connection)
    617 {
    618 	TAILQ_REMOVE(&pool->connections, connection, next);
    619 }
    620 
    621 void
    622 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
    623 {
    624 	struct evhttp_connection *evcon;
    625 	TAILQ_FOREACH(evcon, &pool->connections, next) {
    626 		evcon->timeout = timeout_in_secs;
    627 	}
    628 	pool->timeout = timeout_in_secs;
    629 }
    630 
    631 
    632 static void evrpc_reply_done(struct evhttp_request *, void *);
    633 static void evrpc_request_timeout(evutil_socket_t, short, void *);
    634 
    635 /*
    636  * Finds a connection object associated with the pool that is currently
    637  * idle and can be used to make a request.
    638  */
    639 static struct evhttp_connection *
    640 evrpc_pool_find_connection(struct evrpc_pool *pool)
    641 {
    642 	struct evhttp_connection *connection;
    643 	TAILQ_FOREACH(connection, &pool->connections, next) {
    644 		if (TAILQ_FIRST(&connection->requests) == NULL)
    645 			return (connection);
    646 	}
    647 
    648 	return (NULL);
    649 }
    650 
    651 /*
    652  * Prototypes responsible for evrpc scheduling and hooking
    653  */
    654 
    655 static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT);
    656 
    657 /*
    658  * We assume that the ctx is no longer queued on the pool.
    659  */
    660 static int
    661 evrpc_schedule_request(struct evhttp_connection *connection,
    662     struct evrpc_request_wrapper *ctx)
    663 {
    664 	struct evhttp_request *req = NULL;
    665 	struct evrpc_pool *pool = ctx->pool;
    666 	struct evrpc_status status;
    667 
    668 	if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
    669 		goto error;
    670 
    671 	/* serialize the request data into the output buffer */
    672 	ctx->request_marshal(req->output_buffer, ctx->request);
    673 
    674 	/* we need to know the connection that we might have to abort */
    675 	ctx->evcon = connection;
    676 
    677 	/* if we get paused we also need to know the request */
    678 	ctx->req = req;
    679 
    680 	if (TAILQ_FIRST(&pool->output_hooks) != NULL) {
    681 		int hook_res;
    682 
    683 		evrpc_hook_associate_meta(&ctx->hook_meta, connection);
    684 
    685 		/* apply hooks to the outgoing request */
    686 		hook_res = evrpc_process_hooks(&pool->output_hooks,
    687 		    ctx, req, req->output_buffer);
    688 
    689 		switch (hook_res) {
    690 		case EVRPC_TERMINATE:
    691 			goto error;
    692 		case EVRPC_PAUSE:
    693 			/* we need to be explicitly resumed */
    694 			if (evrpc_pause_request(pool, ctx,
    695 				evrpc_schedule_request_closure) == -1)
    696 				goto error;
    697 			return (0);
    698 		case EVRPC_CONTINUE:
    699 			/* we can just continue */
    700 			break;
    701 		default:
    702 			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
    703 			    hook_res == EVRPC_CONTINUE ||
    704 			    hook_res == EVRPC_PAUSE);
    705 		}
    706 	}
    707 
    708 	evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE);
    709 	return (0);
    710 
    711 error:
    712 	memset(&status, 0, sizeof(status));
    713 	status.error = EVRPC_STATUS_ERR_UNSTARTED;
    714 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
    715 	evrpc_request_wrapper_free(ctx);
    716 	return (-1);
    717 }
    718 
    719 static void
    720 evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
    721 {
    722 	struct evrpc_request_wrapper *ctx = arg;
    723 	struct evhttp_connection *connection = ctx->evcon;
    724 	struct evhttp_request *req = ctx->req;
    725 	struct evrpc_pool *pool = ctx->pool;
    726 	struct evrpc_status status;
    727 	char *uri = NULL;
    728 	int res = 0;
    729 
    730 	if (hook_res == EVRPC_TERMINATE)
    731 		goto error;
    732 
    733 	uri = evrpc_construct_uri(ctx->name);
    734 	if (uri == NULL)
    735 		goto error;
    736 
    737 	if (pool->timeout > 0) {
    738 		/*
    739 		 * a timeout after which the whole rpc is going to be aborted.
    740 		 */
    741 		struct timeval tv;
    742 		evutil_timerclear(&tv);
    743 		tv.tv_sec = pool->timeout;
    744 		evtimer_add(&ctx->ev_timeout, &tv);
    745 	}
    746 
    747 	/* start the request over the connection */
    748 	res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
    749 	mm_free(uri);
    750 
    751 	if (res == -1)
    752 		goto error;
    753 
    754 	return;
    755 
    756 error:
    757 	memset(&status, 0, sizeof(status));
    758 	status.error = EVRPC_STATUS_ERR_UNSTARTED;
    759 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
    760 	evrpc_request_wrapper_free(ctx);
    761 }
    762 
    763 /* we just queue the paused request on the pool under the req object */
    764 static int
    765 evrpc_pause_request(void *vbase, void *ctx,
    766     void (*cb)(void *, enum EVRPC_HOOK_RESULT))
    767 {
    768 	struct _evrpc_hooks *base = vbase;
    769 	struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause));
    770 	if (pause == NULL)
    771 		return (-1);
    772 
    773 	pause->ctx = ctx;
    774 	pause->cb = cb;
    775 
    776 	TAILQ_INSERT_TAIL(&base->pause_requests, pause, next);
    777 	return (0);
    778 }
    779 
    780 int
    781 evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res)
    782 {
    783 	struct _evrpc_hooks *base = vbase;
    784 	struct evrpc_pause_list *head = &base->pause_requests;
    785 	struct evrpc_hook_ctx *pause;
    786 
    787 	TAILQ_FOREACH(pause, head, next) {
    788 		if (pause->ctx == ctx)
    789 			break;
    790 	}
    791 
    792 	if (pause == NULL)
    793 		return (-1);
    794 
    795 	(*pause->cb)(pause->ctx, res);
    796 	TAILQ_REMOVE(head, pause, next);
    797 	mm_free(pause);
    798 	return (0);
    799 }
    800 
    801 int
    802 evrpc_make_request(struct evrpc_request_wrapper *ctx)
    803 {
    804 	struct evrpc_pool *pool = ctx->pool;
    805 
    806 	/* initialize the event structure for this rpc */
    807 	evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx);
    808 
    809 	/* we better have some available connections on the pool */
    810 	EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL);
    811 
    812 	/*
    813 	 * if no connection is available, we queue the request on the pool,
    814 	 * the next time a connection is empty, the rpc will be send on that.
    815 	 */
    816 	TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
    817 
    818 	evrpc_pool_schedule(pool);
    819 
    820 	return (0);
    821 }
    822 
    823 
    824 struct evrpc_request_wrapper *
    825 evrpc_make_request_ctx(
    826 	struct evrpc_pool *pool, void *request, void *reply,
    827 	const char *rpcname,
    828 	void (*req_marshal)(struct evbuffer*, void *),
    829 	void (*rpl_clear)(void *),
    830 	int (*rpl_unmarshal)(void *, struct evbuffer *),
    831 	void (*cb)(struct evrpc_status *, void *, void *, void *),
    832 	void *cbarg)
    833 {
    834 	struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *)
    835 	    mm_malloc(sizeof(struct evrpc_request_wrapper));
    836 	if (ctx == NULL)
    837 		return (NULL);
    838 
    839 	ctx->pool = pool;
    840 	ctx->hook_meta = NULL;
    841 	ctx->evcon = NULL;
    842 	ctx->name = mm_strdup(rpcname);
    843 	if (ctx->name == NULL) {
    844 		mm_free(ctx);
    845 		return (NULL);
    846 	}
    847 	ctx->cb = cb;
    848 	ctx->cb_arg = cbarg;
    849 	ctx->request = request;
    850 	ctx->reply = reply;
    851 	ctx->request_marshal = req_marshal;
    852 	ctx->reply_clear = rpl_clear;
    853 	ctx->reply_unmarshal = rpl_unmarshal;
    854 
    855 	return (ctx);
    856 }
    857 
    858 static void
    859 evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT);
    860 
    861 static void
    862 evrpc_reply_done(struct evhttp_request *req, void *arg)
    863 {
    864 	struct evrpc_request_wrapper *ctx = arg;
    865 	struct evrpc_pool *pool = ctx->pool;
    866 	int hook_res = EVRPC_CONTINUE;
    867 
    868 	/* cancel any timeout we might have scheduled */
    869 	event_del(&ctx->ev_timeout);
    870 
    871 	ctx->req = req;
    872 
    873 	/* we need to get the reply now */
    874 	if (req == NULL) {
    875 		evrpc_reply_done_closure(ctx, EVRPC_CONTINUE);
    876 		return;
    877 	}
    878 
    879 	if (TAILQ_FIRST(&pool->input_hooks) != NULL) {
    880 		evrpc_hook_associate_meta(&ctx->hook_meta, ctx->evcon);
    881 
    882 		/* apply hooks to the incoming request */
    883 		hook_res = evrpc_process_hooks(&pool->input_hooks,
    884 		    ctx, req, req->input_buffer);
    885 
    886 		switch (hook_res) {
    887 		case EVRPC_TERMINATE:
    888 		case EVRPC_CONTINUE:
    889 			break;
    890 		case EVRPC_PAUSE:
    891 			/*
    892 			 * if we get paused we also need to know the
    893 			 * request.  unfortunately, the underlying
    894 			 * layer is going to free it.  we need to
    895 			 * request ownership explicitly
    896 			 */
    897 			if (req != NULL)
    898 				evhttp_request_own(req);
    899 
    900 			evrpc_pause_request(pool, ctx,
    901 			    evrpc_reply_done_closure);
    902 			return;
    903 		default:
    904 			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
    905 			    hook_res == EVRPC_CONTINUE ||
    906 			    hook_res == EVRPC_PAUSE);
    907 		}
    908 	}
    909 
    910 	evrpc_reply_done_closure(ctx, hook_res);
    911 
    912 	/* http request is being freed by underlying layer */
    913 }
    914 
    915 static void
    916 evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
    917 {
    918 	struct evrpc_request_wrapper *ctx = arg;
    919 	struct evhttp_request *req = ctx->req;
    920 	struct evrpc_pool *pool = ctx->pool;
    921 	struct evrpc_status status;
    922 	int res = -1;
    923 
    924 	memset(&status, 0, sizeof(status));
    925 	status.http_req = req;
    926 
    927 	/* we need to get the reply now */
    928 	if (req == NULL) {
    929 		status.error = EVRPC_STATUS_ERR_TIMEOUT;
    930 	} else if (hook_res == EVRPC_TERMINATE) {
    931 		status.error = EVRPC_STATUS_ERR_HOOKABORTED;
    932 	} else {
    933 		res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
    934 		if (res == -1)
    935 			status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
    936 	}
    937 
    938 	if (res == -1) {
    939 		/* clear everything that we might have written previously */
    940 		ctx->reply_clear(ctx->reply);
    941 	}
    942 
    943 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
    944 
    945 	evrpc_request_wrapper_free(ctx);
    946 
    947 	/* the http layer owned the original request structure, but if we
    948 	 * got paused, we asked for ownership and need to free it here. */
    949 	if (req != NULL && evhttp_request_is_owned(req))
    950 		evhttp_request_free(req);
    951 
    952 	/* see if we can schedule another request */
    953 	evrpc_pool_schedule(pool);
    954 }
    955 
    956 static void
    957 evrpc_pool_schedule(struct evrpc_pool *pool)
    958 {
    959 	struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
    960 	struct evhttp_connection *evcon;
    961 
    962 	/* if no requests are pending, we have no work */
    963 	if (ctx == NULL)
    964 		return;
    965 
    966 	if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
    967 		TAILQ_REMOVE(&pool->requests, ctx, next);
    968 		evrpc_schedule_request(evcon, ctx);
    969 	}
    970 }
    971 
    972 static void
    973 evrpc_request_timeout(evutil_socket_t fd, short what, void *arg)
    974 {
    975 	struct evrpc_request_wrapper *ctx = arg;
    976 	struct evhttp_connection *evcon = ctx->evcon;
    977 	EVUTIL_ASSERT(evcon != NULL);
    978 
    979 	evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
    980 }
    981 
    982 /*
    983  * frees potential meta data associated with a request.
    984  */
    985 
    986 static void
    987 evrpc_meta_data_free(struct evrpc_meta_list *meta_data)
    988 {
    989 	struct evrpc_meta *entry;
    990 	EVUTIL_ASSERT(meta_data != NULL);
    991 
    992 	while ((entry = TAILQ_FIRST(meta_data)) != NULL) {
    993 		TAILQ_REMOVE(meta_data, entry, next);
    994 		mm_free(entry->key);
    995 		mm_free(entry->data);
    996 		mm_free(entry);
    997 	}
    998 }
    999 
   1000 static struct evrpc_hook_meta *
   1001 evrpc_hook_meta_new(void)
   1002 {
   1003 	struct evrpc_hook_meta *ctx;
   1004 	ctx = mm_malloc(sizeof(struct evrpc_hook_meta));
   1005 	EVUTIL_ASSERT(ctx != NULL);
   1006 
   1007 	TAILQ_INIT(&ctx->meta_data);
   1008 	ctx->evcon = NULL;
   1009 
   1010 	return (ctx);
   1011 }
   1012 
   1013 static void
   1014 evrpc_hook_associate_meta(struct evrpc_hook_meta **pctx,
   1015     struct evhttp_connection *evcon)
   1016 {
   1017 	struct evrpc_hook_meta *ctx = *pctx;
   1018 	if (ctx == NULL)
   1019 		*pctx = ctx = evrpc_hook_meta_new();
   1020 	ctx->evcon = evcon;
   1021 }
   1022 
   1023 static void
   1024 evrpc_hook_context_free(struct evrpc_hook_meta *ctx)
   1025 {
   1026 	evrpc_meta_data_free(&ctx->meta_data);
   1027 	mm_free(ctx);
   1028 }
   1029 
   1030 /* Adds meta data */
   1031 void
   1032 evrpc_hook_add_meta(void *ctx, const char *key,
   1033     const void *data, size_t data_size)
   1034 {
   1035 	struct evrpc_request_wrapper *req = ctx;
   1036 	struct evrpc_hook_meta *store = NULL;
   1037 	struct evrpc_meta *meta = NULL;
   1038 
   1039 	if ((store = req->hook_meta) == NULL)
   1040 		store = req->hook_meta = evrpc_hook_meta_new();
   1041 
   1042 	meta = mm_malloc(sizeof(struct evrpc_meta));
   1043 	EVUTIL_ASSERT(meta != NULL);
   1044 	meta->key = mm_strdup(key);
   1045 	EVUTIL_ASSERT(meta->key != NULL);
   1046 	meta->data_size = data_size;
   1047 	meta->data = mm_malloc(data_size);
   1048 	EVUTIL_ASSERT(meta->data != NULL);
   1049 	memcpy(meta->data, data, data_size);
   1050 
   1051 	TAILQ_INSERT_TAIL(&store->meta_data, meta, next);
   1052 }
   1053 
   1054 int
   1055 evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size)
   1056 {
   1057 	struct evrpc_request_wrapper *req = ctx;
   1058 	struct evrpc_meta *meta = NULL;
   1059 
   1060 	if (req->hook_meta == NULL)
   1061 		return (-1);
   1062 
   1063 	TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) {
   1064 		if (strcmp(meta->key, key) == 0) {
   1065 			*data = meta->data;
   1066 			*data_size = meta->data_size;
   1067 			return (0);
   1068 		}
   1069 	}
   1070 
   1071 	return (-1);
   1072 }
   1073 
   1074 struct evhttp_connection *
   1075 evrpc_hook_get_connection(void *ctx)
   1076 {
   1077 	struct evrpc_request_wrapper *req = ctx;
   1078 	return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL);
   1079 }
   1080 
   1081 int
   1082 evrpc_send_request_generic(struct evrpc_pool *pool,
   1083     void *request, void *reply,
   1084     void (*cb)(struct evrpc_status *, void *, void *, void *),
   1085     void *cb_arg,
   1086     const char *rpcname,
   1087     void (*req_marshal)(struct evbuffer *, void *),
   1088     void (*rpl_clear)(void *),
   1089     int (*rpl_unmarshal)(void *, struct evbuffer *))
   1090 {
   1091 	struct evrpc_status status;
   1092 	struct evrpc_request_wrapper *ctx;
   1093 	ctx = evrpc_make_request_ctx(pool, request, reply,
   1094 	    rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg);
   1095 	if (ctx == NULL)
   1096 		goto error;
   1097 	return (evrpc_make_request(ctx));
   1098 error:
   1099 	memset(&status, 0, sizeof(status));
   1100 	status.error = EVRPC_STATUS_ERR_UNSTARTED;
   1101 	(*(cb))(&status, request, reply, cb_arg);
   1102 	return (-1);
   1103 }
   1104 
   1105 /** Takes a request object and fills it in with the right magic */
   1106 static struct evrpc *
   1107 evrpc_register_object(const char *name,
   1108     void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *),
   1109     int (*req_unmarshal)(void *, struct evbuffer *),
   1110     void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *),
   1111     int (*rpl_complete)(void *),
   1112     void (*rpl_marshal)(struct evbuffer *, void *))
   1113 {
   1114 	struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc));
   1115 	if (rpc == NULL)
   1116 		return (NULL);
   1117 	rpc->uri = mm_strdup(name);
   1118 	if (rpc->uri == NULL) {
   1119 		mm_free(rpc);
   1120 		return (NULL);
   1121 	}
   1122 	rpc->request_new = req_new;
   1123 	rpc->request_new_arg = req_new_arg;
   1124 	rpc->request_free = req_free;
   1125 	rpc->request_unmarshal = req_unmarshal;
   1126 	rpc->reply_new = rpl_new;
   1127 	rpc->reply_new_arg = rpl_new_arg;
   1128 	rpc->reply_free = rpl_free;
   1129 	rpc->reply_complete = rpl_complete;
   1130 	rpc->reply_marshal = rpl_marshal;
   1131 	return (rpc);
   1132 }
   1133 
   1134 int
   1135 evrpc_register_generic(struct evrpc_base *base, const char *name,
   1136     void (*callback)(struct evrpc_req_generic *, void *), void *cbarg,
   1137     void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *),
   1138     int (*req_unmarshal)(void *, struct evbuffer *),
   1139     void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *),
   1140     int (*rpl_complete)(void *),
   1141     void (*rpl_marshal)(struct evbuffer *, void *))
   1142 {
   1143 	struct evrpc* rpc =
   1144 	    evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal,
   1145 		rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal);
   1146 	if (rpc == NULL)
   1147 		return (-1);
   1148 	evrpc_register_rpc(base, rpc,
   1149 	    (void (*)(struct evrpc_req_generic*, void *))callback, cbarg);
   1150 	return (0);
   1151 }
   1152 
   1153 /** accessors for obscure and undocumented functionality */
   1154 struct evrpc_pool *
   1155 evrpc_request_get_pool(struct evrpc_request_wrapper *ctx)
   1156 {
   1157 	return (ctx->pool);
   1158 }
   1159 
   1160 void
   1161 evrpc_request_set_pool(struct evrpc_request_wrapper *ctx,
   1162     struct evrpc_pool *pool)
   1163 {
   1164 	ctx->pool = pool;
   1165 }
   1166 
   1167 void
   1168 evrpc_request_set_cb(struct evrpc_request_wrapper *ctx,
   1169     void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg),
   1170     void *cb_arg)
   1171 {
   1172 	ctx->cb = cb;
   1173 	ctx->cb_arg = cb_arg;
   1174 }
   1175