Home | History | Annotate | Download | only in libevent
      1 /*
      2  * Copyright (c) 2000-2004 Niels Provos <provos (at) citi.umich.edu>
      3  * All rights reserved.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions
      7  * are met:
      8  * 1. Redistributions of source code must retain the above copyright
      9  *    notice, this list of conditions and the following disclaimer.
     10  * 2. Redistributions in binary form must reproduce the above copyright
     11  *    notice, this list of conditions and the following disclaimer in the
     12  *    documentation and/or other materials provided with the distribution.
     13  * 3. The name of the author may not be used to endorse or promote products
     14  *    derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     17  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     18  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     19  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     20  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     21  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     22  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     23  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     25  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 #ifdef HAVE_CONFIG_H
     28 #include "config.h"
     29 #endif
     30 
     31 #ifdef WIN32
     32 #define WIN32_LEAN_AND_MEAN
     33 #include <winsock2.h>
     34 #include <windows.h>
     35 #undef WIN32_LEAN_AND_MEAN
     36 #endif
     37 
     38 #include <sys/types.h>
     39 #ifndef WIN32
     40 #include <sys/socket.h>
     41 #endif
     42 #ifdef HAVE_SYS_TIME_H
     43 #include <sys/time.h>
     44 #endif
     45 #include <sys/queue.h>
     46 #include <stdio.h>
     47 #include <stdlib.h>
     48 #ifndef WIN32
     49 #include <unistd.h>
     50 #endif
     51 #include <errno.h>
     52 #include <signal.h>
     53 #include <string.h>
     54 #include <assert.h>
     55 
     56 #include "event.h"
     57 #include "evrpc.h"
     58 #include "evrpc-internal.h"
     59 #include "evhttp.h"
     60 #include "evutil.h"
     61 #include "log.h"
     62 
     63 struct evrpc_base *
     64 evrpc_init(struct evhttp *http_server)
     65 {
     66 	struct evrpc_base* base = calloc(1, sizeof(struct evrpc_base));
     67 	if (base == NULL)
     68 		return (NULL);
     69 
     70 	/* we rely on the tagging sub system */
     71 	evtag_init();
     72 
     73 	TAILQ_INIT(&base->registered_rpcs);
     74 	TAILQ_INIT(&base->input_hooks);
     75 	TAILQ_INIT(&base->output_hooks);
     76 	base->http_server = http_server;
     77 
     78 	return (base);
     79 }
     80 
     81 void
     82 evrpc_free(struct evrpc_base *base)
     83 {
     84 	struct evrpc *rpc;
     85 	struct evrpc_hook *hook;
     86 
     87 	while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
     88 		assert(evrpc_unregister_rpc(base, rpc->uri));
     89 	}
     90 	while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
     91 		assert(evrpc_remove_hook(base, EVRPC_INPUT, hook));
     92 	}
     93 	while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
     94 		assert(evrpc_remove_hook(base, EVRPC_OUTPUT, hook));
     95 	}
     96 	free(base);
     97 }
     98 
     99 void *
    100 evrpc_add_hook(void *vbase,
    101     enum EVRPC_HOOK_TYPE hook_type,
    102     int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
    103     void *cb_arg)
    104 {
    105 	struct _evrpc_hooks *base = vbase;
    106 	struct evrpc_hook_list *head = NULL;
    107 	struct evrpc_hook *hook = NULL;
    108 	switch (hook_type) {
    109 	case EVRPC_INPUT:
    110 		head = &base->in_hooks;
    111 		break;
    112 	case EVRPC_OUTPUT:
    113 		head = &base->out_hooks;
    114 		break;
    115 	default:
    116 		assert(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
    117 	}
    118 
    119 	hook = calloc(1, sizeof(struct evrpc_hook));
    120 	assert(hook != NULL);
    121 
    122 	hook->process = cb;
    123 	hook->process_arg = cb_arg;
    124 	TAILQ_INSERT_TAIL(head, hook, next);
    125 
    126 	return (hook);
    127 }
    128 
    129 static int
    130 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
    131 {
    132 	struct evrpc_hook *hook = NULL;
    133 	TAILQ_FOREACH(hook, head, next) {
    134 		if (hook == handle) {
    135 			TAILQ_REMOVE(head, hook, next);
    136 			free(hook);
    137 			return (1);
    138 		}
    139 	}
    140 
    141 	return (0);
    142 }
    143 
    144 /*
    145  * remove the hook specified by the handle
    146  */
    147 
    148 int
    149 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
    150 {
    151 	struct _evrpc_hooks *base = vbase;
    152 	struct evrpc_hook_list *head = NULL;
    153 	switch (hook_type) {
    154 	case EVRPC_INPUT:
    155 		head = &base->in_hooks;
    156 		break;
    157 	case EVRPC_OUTPUT:
    158 		head = &base->out_hooks;
    159 		break;
    160 	default:
    161 		assert(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
    162 	}
    163 
    164 	return (evrpc_remove_hook_internal(head, handle));
    165 }
    166 
    167 static int
    168 evrpc_process_hooks(struct evrpc_hook_list *head,
    169     struct evhttp_request *req, struct evbuffer *evbuf)
    170 {
    171 	struct evrpc_hook *hook;
    172 	TAILQ_FOREACH(hook, head, next) {
    173 		if (hook->process(req, evbuf, hook->process_arg) == -1)
    174 			return (-1);
    175 	}
    176 
    177 	return (0);
    178 }
    179 
    180 static void evrpc_pool_schedule(struct evrpc_pool *pool);
    181 static void evrpc_request_cb(struct evhttp_request *, void *);
    182 void evrpc_request_done(struct evrpc_req_generic*);
    183 
    184 /*
    185  * Registers a new RPC with the HTTP server.   The evrpc object is expected
    186  * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
    187  * calls this function.
    188  */
    189 
    190 static char *
    191 evrpc_construct_uri(const char *uri)
    192 {
    193 	char *constructed_uri;
    194 	int constructed_uri_len;
    195 
    196 	constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
    197 	if ((constructed_uri = malloc(constructed_uri_len)) == NULL)
    198 		event_err(1, "%s: failed to register rpc at %s",
    199 		    __func__, uri);
    200 	memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
    201 	memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
    202 	constructed_uri[constructed_uri_len - 1] = '\0';
    203 
    204 	return (constructed_uri);
    205 }
    206 
    207 int
    208 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
    209     void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
    210 {
    211 	char *constructed_uri = evrpc_construct_uri(rpc->uri);
    212 
    213 	rpc->base = base;
    214 	rpc->cb = cb;
    215 	rpc->cb_arg = cb_arg;
    216 
    217 	TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
    218 
    219 	evhttp_set_cb(base->http_server,
    220 	    constructed_uri,
    221 	    evrpc_request_cb,
    222 	    rpc);
    223 
    224 	free(constructed_uri);
    225 
    226 	return (0);
    227 }
    228 
    229 int
    230 evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
    231 {
    232 	char *registered_uri = NULL;
    233 	struct evrpc *rpc;
    234 
    235 	/* find the right rpc; linear search might be slow */
    236 	TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
    237 		if (strcmp(rpc->uri, name) == 0)
    238 			break;
    239 	}
    240 	if (rpc == NULL) {
    241 		/* We did not find an RPC with this name */
    242 		return (-1);
    243 	}
    244 	TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
    245 
    246 	free((char *)rpc->uri);
    247 	free(rpc);
    248 
    249         registered_uri = evrpc_construct_uri(name);
    250 
    251 	/* remove the http server callback */
    252 	assert(evhttp_del_cb(base->http_server, registered_uri) == 0);
    253 
    254 	free(registered_uri);
    255 	return (0);
    256 }
    257 
    258 static void
    259 evrpc_request_cb(struct evhttp_request *req, void *arg)
    260 {
    261 	struct evrpc *rpc = arg;
    262 	struct evrpc_req_generic *rpc_state = NULL;
    263 
    264 	/* let's verify the outside parameters */
    265 	if (req->type != EVHTTP_REQ_POST ||
    266 	    EVBUFFER_LENGTH(req->input_buffer) <= 0)
    267 		goto error;
    268 
    269 	/*
    270 	 * we might want to allow hooks to suspend the processing,
    271 	 * but at the moment, we assume that they just act as simple
    272 	 * filters.
    273 	 */
    274 	if (evrpc_process_hooks(&rpc->base->input_hooks,
    275 		req, req->input_buffer) == -1)
    276 		goto error;
    277 
    278 	rpc_state = calloc(1, sizeof(struct evrpc_req_generic));
    279 	if (rpc_state == NULL)
    280 		goto error;
    281 
    282 	/* let's check that we can parse the request */
    283 	rpc_state->request = rpc->request_new();
    284 	if (rpc_state->request == NULL)
    285 		goto error;
    286 
    287 	rpc_state->rpc = rpc;
    288 
    289 	if (rpc->request_unmarshal(
    290 		    rpc_state->request, req->input_buffer) == -1) {
    291 		/* we failed to parse the request; that's a bummer */
    292 		goto error;
    293 	}
    294 
    295 	/* at this point, we have a well formed request, prepare the reply */
    296 
    297 	rpc_state->reply = rpc->reply_new();
    298 	if (rpc_state->reply == NULL)
    299 		goto error;
    300 
    301 	rpc_state->http_req = req;
    302 	rpc_state->done = evrpc_request_done;
    303 
    304 	/* give the rpc to the user; they can deal with it */
    305 	rpc->cb(rpc_state, rpc->cb_arg);
    306 
    307 	return;
    308 
    309 error:
    310 	evrpc_reqstate_free(rpc_state);
    311 	evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
    312 	return;
    313 }
    314 
    315 void
    316 evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
    317 {
    318 	/* clean up all memory */
    319 	if (rpc_state != NULL) {
    320 		struct evrpc *rpc = rpc_state->rpc;
    321 
    322 		if (rpc_state->request != NULL)
    323 			rpc->request_free(rpc_state->request);
    324 		if (rpc_state->reply != NULL)
    325 			rpc->reply_free(rpc_state->reply);
    326 		free(rpc_state);
    327 	}
    328 }
    329 
    330 void
    331 evrpc_request_done(struct evrpc_req_generic* rpc_state)
    332 {
    333 	struct evhttp_request *req = rpc_state->http_req;
    334 	struct evrpc *rpc = rpc_state->rpc;
    335 	struct evbuffer* data = NULL;
    336 
    337 	if (rpc->reply_complete(rpc_state->reply) == -1) {
    338 		/* the reply was not completely filled in.  error out */
    339 		goto error;
    340 	}
    341 
    342 	if ((data = evbuffer_new()) == NULL) {
    343 		/* out of memory */
    344 		goto error;
    345 	}
    346 
    347 	/* serialize the reply */
    348 	rpc->reply_marshal(data, rpc_state->reply);
    349 
    350 	/* do hook based tweaks to the request */
    351 	if (evrpc_process_hooks(&rpc->base->output_hooks,
    352 		req, data) == -1)
    353 		goto error;
    354 
    355 	/* on success, we are going to transmit marshaled binary data */
    356 	if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
    357 		evhttp_add_header(req->output_headers,
    358 		    "Content-Type", "application/octet-stream");
    359 	}
    360 
    361 	evhttp_send_reply(req, HTTP_OK, "OK", data);
    362 
    363 	evbuffer_free(data);
    364 
    365 	evrpc_reqstate_free(rpc_state);
    366 
    367 	return;
    368 
    369 error:
    370 	if (data != NULL)
    371 		evbuffer_free(data);
    372 	evrpc_reqstate_free(rpc_state);
    373 	evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
    374 	return;
    375 }
    376 
    377 /* Client implementation of RPC site */
    378 
    379 static int evrpc_schedule_request(struct evhttp_connection *connection,
    380     struct evrpc_request_wrapper *ctx);
    381 
    382 struct evrpc_pool *
    383 evrpc_pool_new(struct event_base *base)
    384 {
    385 	struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool));
    386 	if (pool == NULL)
    387 		return (NULL);
    388 
    389 	TAILQ_INIT(&pool->connections);
    390 	TAILQ_INIT(&pool->requests);
    391 
    392 	TAILQ_INIT(&pool->input_hooks);
    393 	TAILQ_INIT(&pool->output_hooks);
    394 
    395 	pool->base = base;
    396 	pool->timeout = -1;
    397 
    398 	return (pool);
    399 }
    400 
    401 static void
    402 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
    403 {
    404 	free(request->name);
    405 	free(request);
    406 }
    407 
    408 void
    409 evrpc_pool_free(struct evrpc_pool *pool)
    410 {
    411 	struct evhttp_connection *connection;
    412 	struct evrpc_request_wrapper *request;
    413 	struct evrpc_hook *hook;
    414 
    415 	while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
    416 		TAILQ_REMOVE(&pool->requests, request, next);
    417 		/* if this gets more complicated we need our own function */
    418 		evrpc_request_wrapper_free(request);
    419 	}
    420 
    421 	while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
    422 		TAILQ_REMOVE(&pool->connections, connection, next);
    423 		evhttp_connection_free(connection);
    424 	}
    425 
    426 	while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
    427 		assert(evrpc_remove_hook(pool, EVRPC_INPUT, hook));
    428 	}
    429 
    430 	while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
    431 		assert(evrpc_remove_hook(pool, EVRPC_OUTPUT, hook));
    432 	}
    433 
    434 	free(pool);
    435 }
    436 
    437 /*
    438  * Add a connection to the RPC pool.   A request scheduled on the pool
    439  * may use any available connection.
    440  */
    441 
    442 void
    443 evrpc_pool_add_connection(struct evrpc_pool *pool,
    444     struct evhttp_connection *connection) {
    445 	assert(connection->http_server == NULL);
    446 	TAILQ_INSERT_TAIL(&pool->connections, connection, next);
    447 
    448 	/*
    449 	 * associate an event base with this connection
    450 	 */
    451 	if (pool->base != NULL)
    452 		evhttp_connection_set_base(connection, pool->base);
    453 
    454 	/*
    455 	 * unless a timeout was specifically set for a connection,
    456 	 * the connection inherits the timeout from the pool.
    457 	 */
    458 	if (connection->timeout == -1)
    459 		connection->timeout = pool->timeout;
    460 
    461 	/*
    462 	 * if we have any requests pending, schedule them with the new
    463 	 * connections.
    464 	 */
    465 
    466 	if (TAILQ_FIRST(&pool->requests) != NULL) {
    467 		struct evrpc_request_wrapper *request =
    468 		    TAILQ_FIRST(&pool->requests);
    469 		TAILQ_REMOVE(&pool->requests, request, next);
    470 		evrpc_schedule_request(connection, request);
    471 	}
    472 }
    473 
    474 void
    475 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
    476 {
    477 	struct evhttp_connection *evcon;
    478 	TAILQ_FOREACH(evcon, &pool->connections, next) {
    479 		evcon->timeout = timeout_in_secs;
    480 	}
    481 	pool->timeout = timeout_in_secs;
    482 }
    483 
    484 
    485 static void evrpc_reply_done(struct evhttp_request *, void *);
    486 static void evrpc_request_timeout(int, short, void *);
    487 
    488 /*
    489  * Finds a connection object associated with the pool that is currently
    490  * idle and can be used to make a request.
    491  */
    492 static struct evhttp_connection *
    493 evrpc_pool_find_connection(struct evrpc_pool *pool)
    494 {
    495 	struct evhttp_connection *connection;
    496 	TAILQ_FOREACH(connection, &pool->connections, next) {
    497 		if (TAILQ_FIRST(&connection->requests) == NULL)
    498 			return (connection);
    499 	}
    500 
    501 	return (NULL);
    502 }
    503 
    504 /*
    505  * We assume that the ctx is no longer queued on the pool.
    506  */
    507 static int
    508 evrpc_schedule_request(struct evhttp_connection *connection,
    509     struct evrpc_request_wrapper *ctx)
    510 {
    511 	struct evhttp_request *req = NULL;
    512 	struct evrpc_pool *pool = ctx->pool;
    513 	struct evrpc_status status;
    514 	char *uri = NULL;
    515 	int res = 0;
    516 
    517 	if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
    518 		goto error;
    519 
    520 	/* serialize the request data into the output buffer */
    521 	ctx->request_marshal(req->output_buffer, ctx->request);
    522 
    523 	uri = evrpc_construct_uri(ctx->name);
    524 	if (uri == NULL)
    525 		goto error;
    526 
    527 	/* we need to know the connection that we might have to abort */
    528 	ctx->evcon = connection;
    529 
    530 	/* apply hooks to the outgoing request */
    531 	if (evrpc_process_hooks(&pool->output_hooks,
    532 		req, req->output_buffer) == -1)
    533 		goto error;
    534 
    535 	if (pool->timeout > 0) {
    536 		/*
    537 		 * a timeout after which the whole rpc is going to be aborted.
    538 		 */
    539 		struct timeval tv;
    540 		evutil_timerclear(&tv);
    541 		tv.tv_sec = pool->timeout;
    542 		evtimer_add(&ctx->ev_timeout, &tv);
    543 	}
    544 
    545 	/* start the request over the connection */
    546 	res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
    547 	free(uri);
    548 
    549 	if (res == -1)
    550 		goto error;
    551 
    552 	return (0);
    553 
    554 error:
    555 	memset(&status, 0, sizeof(status));
    556 	status.error = EVRPC_STATUS_ERR_UNSTARTED;
    557 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
    558 	evrpc_request_wrapper_free(ctx);
    559 	return (-1);
    560 }
    561 
    562 int
    563 evrpc_make_request(struct evrpc_request_wrapper *ctx)
    564 {
    565 	struct evrpc_pool *pool = ctx->pool;
    566 
    567 	/* initialize the event structure for this rpc */
    568 	evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
    569 	if (pool->base != NULL)
    570 		event_base_set(pool->base, &ctx->ev_timeout);
    571 
    572 	/* we better have some available connections on the pool */
    573 	assert(TAILQ_FIRST(&pool->connections) != NULL);
    574 
    575 	/*
    576 	 * if no connection is available, we queue the request on the pool,
    577 	 * the next time a connection is empty, the rpc will be send on that.
    578 	 */
    579 	TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
    580 
    581 	evrpc_pool_schedule(pool);
    582 
    583 	return (0);
    584 }
    585 
    586 static void
    587 evrpc_reply_done(struct evhttp_request *req, void *arg)
    588 {
    589 	struct evrpc_request_wrapper *ctx = arg;
    590 	struct evrpc_pool *pool = ctx->pool;
    591 	struct evrpc_status status;
    592 	int res = -1;
    593 
    594 	/* cancel any timeout we might have scheduled */
    595 	event_del(&ctx->ev_timeout);
    596 
    597 	memset(&status, 0, sizeof(status));
    598 	status.http_req = req;
    599 
    600 	/* we need to get the reply now */
    601 	if (req != NULL) {
    602 		/* apply hooks to the incoming request */
    603 		if (evrpc_process_hooks(&pool->input_hooks,
    604 			req, req->input_buffer) == -1) {
    605 			status.error = EVRPC_STATUS_ERR_HOOKABORTED;
    606 			res = -1;
    607 		} else {
    608 			res = ctx->reply_unmarshal(ctx->reply,
    609 			    req->input_buffer);
    610 			if (res == -1) {
    611 				status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
    612 			}
    613 		}
    614 	} else {
    615 		status.error = EVRPC_STATUS_ERR_TIMEOUT;
    616 	}
    617 
    618 	if (res == -1) {
    619 		/* clear everything that we might have written previously */
    620 		ctx->reply_clear(ctx->reply);
    621 	}
    622 
    623 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
    624 
    625 	evrpc_request_wrapper_free(ctx);
    626 
    627 	/* the http layer owns the request structure */
    628 
    629 	/* see if we can schedule another request */
    630 	evrpc_pool_schedule(pool);
    631 }
    632 
    633 static void
    634 evrpc_pool_schedule(struct evrpc_pool *pool)
    635 {
    636 	struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
    637 	struct evhttp_connection *evcon;
    638 
    639 	/* if no requests are pending, we have no work */
    640 	if (ctx == NULL)
    641 		return;
    642 
    643 	if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
    644 		TAILQ_REMOVE(&pool->requests, ctx, next);
    645 		evrpc_schedule_request(evcon, ctx);
    646 	}
    647 }
    648 
    649 static void
    650 evrpc_request_timeout(int fd, short what, void *arg)
    651 {
    652 	struct evrpc_request_wrapper *ctx = arg;
    653 	struct evhttp_connection *evcon = ctx->evcon;
    654 	assert(evcon != NULL);
    655 
    656 	evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
    657 }
    658