Home | History | Annotate | Download | only in libevent
      1 /*
      2  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
      3  * Copyright (c) 2002-2006 Niels Provos <provos (at) citi.umich.edu>
      4  * All rights reserved.
      5  *
      6  * Redistribution and use in source and binary forms, with or without
      7  * modification, are permitted provided that the following conditions
      8  * are met:
      9  * 1. Redistributions of source code must retain the above copyright
     10  *    notice, this list of conditions and the following disclaimer.
     11  * 2. Redistributions in binary form must reproduce the above copyright
     12  *    notice, this list of conditions and the following disclaimer in the
     13  *    documentation and/or other materials provided with the distribution.
     14  * 3. The name of the author may not be used to endorse or promote products
     15  *    derived from this software without specific prior written permission.
     16  *
     17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     27  */
     28 
     29 #include <sys/types.h>
     30 
     31 #include "event2/event-config.h"
     32 
     33 #ifdef _EVENT_HAVE_SYS_TIME_H
     34 #include <sys/time.h>
     35 #endif
     36 
     37 #include <errno.h>
     38 #include <stdio.h>
     39 #include <stdlib.h>
     40 #include <string.h>
     41 #ifdef _EVENT_HAVE_STDARG_H
     42 #include <stdarg.h>
     43 #endif
     44 
     45 #ifdef WIN32
     46 #include <winsock2.h>
     47 #endif
     48 
     49 #include "event2/util.h"
     50 #include "event2/bufferevent.h"
     51 #include "event2/buffer.h"
     52 #include "event2/bufferevent_struct.h"
     53 #include "event2/event.h"
     54 #include "log-internal.h"
     55 #include "mm-internal.h"
     56 #include "bufferevent-internal.h"
     57 #include "util-internal.h"
     58 
     59 /* prototypes */
     60 static int be_filter_enable(struct bufferevent *, short);
     61 static int be_filter_disable(struct bufferevent *, short);
     62 static void be_filter_destruct(struct bufferevent *);
     63 
     64 static void be_filter_readcb(struct bufferevent *, void *);
     65 static void be_filter_writecb(struct bufferevent *, void *);
     66 static void be_filter_eventcb(struct bufferevent *, short, void *);
     67 static int be_filter_flush(struct bufferevent *bufev,
     68     short iotype, enum bufferevent_flush_mode mode);
     69 static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
     70 
     71 static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
     72     const struct evbuffer_cb_info *info, void *arg);
     73 
     74 struct bufferevent_filtered {
     75 	struct bufferevent_private bev;
     76 
     77 	/** The bufferevent that we read/write filtered data from/to. */
     78 	struct bufferevent *underlying;
     79 	/** A callback on our outbuf to notice when somebody adds data */
     80 	struct evbuffer_cb_entry *outbuf_cb;
     81 	/** True iff we have received an EOF callback from the underlying
     82 	 * bufferevent. */
     83 	unsigned got_eof;
     84 
     85 	/** Function to free context when we're done. */
     86 	void (*free_context)(void *);
     87 	/** Input filter */
     88 	bufferevent_filter_cb process_in;
     89 	/** Output filter */
     90 	bufferevent_filter_cb process_out;
     91 	/** User-supplied argument to the filters. */
     92 	void *context;
     93 };
     94 
     95 const struct bufferevent_ops bufferevent_ops_filter = {
     96 	"filter",
     97 	evutil_offsetof(struct bufferevent_filtered, bev.bev),
     98 	be_filter_enable,
     99 	be_filter_disable,
    100 	be_filter_destruct,
    101 	_bufferevent_generic_adj_timeouts,
    102 	be_filter_flush,
    103 	be_filter_ctrl,
    104 };
    105 
    106 /* Given a bufferevent that's really the bev filter of a bufferevent_filtered,
    107  * return that bufferevent_filtered. Returns NULL otherwise.*/
    108 static inline struct bufferevent_filtered *
    109 upcast(struct bufferevent *bev)
    110 {
    111 	struct bufferevent_filtered *bev_f;
    112 	if (bev->be_ops != &bufferevent_ops_filter)
    113 		return NULL;
    114 	bev_f = (void*)( ((char*)bev) -
    115 			 evutil_offsetof(struct bufferevent_filtered, bev.bev));
    116 	EVUTIL_ASSERT(bev_f->bev.bev.be_ops == &bufferevent_ops_filter);
    117 	return bev_f;
    118 }
    119 
    120 #define downcast(bev_f) (&(bev_f)->bev.bev)
    121 
    122 /** Return 1 iff bevf's underlying bufferevent's output buffer is at or
    123  * over its high watermark such that we should not write to it in a given
    124  * flush mode. */
    125 static int
    126 be_underlying_writebuf_full(struct bufferevent_filtered *bevf,
    127     enum bufferevent_flush_mode state)
    128 {
    129 	struct bufferevent *u = bevf->underlying;
    130 	return state == BEV_NORMAL &&
    131 	    u->wm_write.high &&
    132 	    evbuffer_get_length(u->output) >= u->wm_write.high;
    133 }
    134 
    135 /** Return 1 if our input buffer is at or over its high watermark such that we
    136  * should not write to it in a given flush mode. */
    137 static int
    138 be_readbuf_full(struct bufferevent_filtered *bevf,
    139     enum bufferevent_flush_mode state)
    140 {
    141 	struct bufferevent *bufev = downcast(bevf);
    142 	return state == BEV_NORMAL &&
    143 	    bufev->wm_read.high &&
    144 	    evbuffer_get_length(bufev->input) >= bufev->wm_read.high;
    145 }
    146 
    147 
    148 /* Filter to use when we're created with a NULL filter. */
    149 static enum bufferevent_filter_result
    150 be_null_filter(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t lim,
    151 	       enum bufferevent_flush_mode state, void *ctx)
    152 {
    153 	(void)state;
    154 	if (evbuffer_remove_buffer(src, dst, lim) == 0)
    155 		return BEV_OK;
    156 	else
    157 		return BEV_ERROR;
    158 }
    159 
    160 struct bufferevent *
    161 bufferevent_filter_new(struct bufferevent *underlying,
    162 		       bufferevent_filter_cb input_filter,
    163 		       bufferevent_filter_cb output_filter,
    164 		       int options,
    165 		       void (*free_context)(void *),
    166 		       void *ctx)
    167 {
    168 	struct bufferevent_filtered *bufev_f;
    169 	int tmp_options = options & ~BEV_OPT_THREADSAFE;
    170 
    171 	if (!underlying)
    172 		return NULL;
    173 
    174 	if (!input_filter)
    175 		input_filter = be_null_filter;
    176 	if (!output_filter)
    177 		output_filter = be_null_filter;
    178 
    179 	bufev_f = mm_calloc(1, sizeof(struct bufferevent_filtered));
    180 	if (!bufev_f)
    181 		return NULL;
    182 
    183 	if (bufferevent_init_common(&bufev_f->bev, underlying->ev_base,
    184 				    &bufferevent_ops_filter, tmp_options) < 0) {
    185 		mm_free(bufev_f);
    186 		return NULL;
    187 	}
    188 	if (options & BEV_OPT_THREADSAFE) {
    189 		bufferevent_enable_locking(downcast(bufev_f), NULL);
    190 	}
    191 
    192 	bufev_f->underlying = underlying;
    193 
    194 	bufev_f->process_in = input_filter;
    195 	bufev_f->process_out = output_filter;
    196 	bufev_f->free_context = free_context;
    197 	bufev_f->context = ctx;
    198 
    199 	bufferevent_setcb(bufev_f->underlying,
    200 	    be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f);
    201 
    202 	bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
    203 	   bufferevent_filtered_outbuf_cb, bufev_f);
    204 
    205 	_bufferevent_init_generic_timeout_cbs(downcast(bufev_f));
    206 	bufferevent_incref(underlying);
    207 
    208 	bufferevent_enable(underlying, EV_READ|EV_WRITE);
    209 	bufferevent_suspend_read(underlying, BEV_SUSPEND_FILT_READ);
    210 
    211 	return downcast(bufev_f);
    212 }
    213 
    214 static void
    215 be_filter_destruct(struct bufferevent *bev)
    216 {
    217 	struct bufferevent_filtered *bevf = upcast(bev);
    218 	EVUTIL_ASSERT(bevf);
    219 	if (bevf->free_context)
    220 		bevf->free_context(bevf->context);
    221 
    222 	if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) {
    223 		/* Yes, there is also a decref in bufferevent_decref.
    224 		 * That decref corresponds to the incref when we set
    225 		 * underlying for the first time.  This decref is an
    226 		 * extra one to remove the last reference.
    227 		 */
    228 		if (BEV_UPCAST(bevf->underlying)->refcnt < 2) {
    229 			event_warnx("BEV_OPT_CLOSE_ON_FREE set on an "
    230 			    "bufferevent with too few references");
    231 		} else {
    232 			bufferevent_free(bevf->underlying);
    233 		}
    234 	} else {
    235 		if (bevf->underlying) {
    236 			if (bevf->underlying->errorcb == be_filter_eventcb)
    237 				bufferevent_setcb(bevf->underlying,
    238 				    NULL, NULL, NULL, NULL);
    239 			bufferevent_unsuspend_read(bevf->underlying,
    240 			    BEV_SUSPEND_FILT_READ);
    241 		}
    242 	}
    243 
    244 	_bufferevent_del_generic_timeout_cbs(bev);
    245 }
    246 
    247 static int
    248 be_filter_enable(struct bufferevent *bev, short event)
    249 {
    250 	struct bufferevent_filtered *bevf = upcast(bev);
    251 	if (event & EV_WRITE)
    252 		BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
    253 
    254 	if (event & EV_READ) {
    255 		BEV_RESET_GENERIC_READ_TIMEOUT(bev);
    256 		bufferevent_unsuspend_read(bevf->underlying,
    257 		    BEV_SUSPEND_FILT_READ);
    258 	}
    259 	return 0;
    260 }
    261 
    262 static int
    263 be_filter_disable(struct bufferevent *bev, short event)
    264 {
    265 	struct bufferevent_filtered *bevf = upcast(bev);
    266 	if (event & EV_WRITE)
    267 		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
    268 	if (event & EV_READ) {
    269 		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
    270 		bufferevent_suspend_read(bevf->underlying,
    271 		    BEV_SUSPEND_FILT_READ);
    272 	}
    273 	return 0;
    274 }
    275 
    276 static enum bufferevent_filter_result
    277 be_filter_process_input(struct bufferevent_filtered *bevf,
    278 			enum bufferevent_flush_mode state,
    279 			int *processed_out)
    280 {
    281 	enum bufferevent_filter_result res;
    282 	struct bufferevent *bev = downcast(bevf);
    283 
    284 	if (state == BEV_NORMAL) {
    285 		/* If we're in 'normal' mode, don't urge data on the filter
    286 		 * unless we're reading data and under our high-water mark.*/
    287 		if (!(bev->enabled & EV_READ) ||
    288 		    be_readbuf_full(bevf, state))
    289 			return BEV_OK;
    290 	}
    291 
    292 	do {
    293 		ev_ssize_t limit = -1;
    294 		if (state == BEV_NORMAL && bev->wm_read.high)
    295 			limit = bev->wm_read.high -
    296 			    evbuffer_get_length(bev->input);
    297 
    298 		res = bevf->process_in(bevf->underlying->input,
    299 		    bev->input, limit, state, bevf->context);
    300 
    301 		if (res == BEV_OK)
    302 			*processed_out = 1;
    303 	} while (res == BEV_OK &&
    304 		 (bev->enabled & EV_READ) &&
    305 		 evbuffer_get_length(bevf->underlying->input) &&
    306 		 !be_readbuf_full(bevf, state));
    307 
    308 	if (*processed_out)
    309 		BEV_RESET_GENERIC_READ_TIMEOUT(bev);
    310 
    311 	return res;
    312 }
    313 
    314 
    315 static enum bufferevent_filter_result
    316 be_filter_process_output(struct bufferevent_filtered *bevf,
    317 			 enum bufferevent_flush_mode state,
    318 			 int *processed_out)
    319 {
    320 	/* Requires references and lock: might call writecb */
    321 	enum bufferevent_filter_result res = BEV_OK;
    322 	struct bufferevent *bufev = downcast(bevf);
    323 	int again = 0;
    324 
    325 	if (state == BEV_NORMAL) {
    326 		/* If we're in 'normal' mode, don't urge data on the
    327 		 * filter unless we're writing data, and the underlying
    328 		 * bufferevent is accepting data, and we have data to
    329 		 * give the filter.  If we're in 'flush' or 'finish',
    330 		 * call the filter no matter what. */
    331 		if (!(bufev->enabled & EV_WRITE) ||
    332 		    be_underlying_writebuf_full(bevf, state) ||
    333 		    !evbuffer_get_length(bufev->output))
    334 			return BEV_OK;
    335 	}
    336 
    337 	/* disable the callback that calls this function
    338 	   when the user adds to the output buffer. */
    339 	evbuffer_cb_set_flags(bufev->output, bevf->outbuf_cb, 0);
    340 
    341 	do {
    342 		int processed = 0;
    343 		again = 0;
    344 
    345 		do {
    346 			ev_ssize_t limit = -1;
    347 			if (state == BEV_NORMAL &&
    348 			    bevf->underlying->wm_write.high)
    349 				limit = bevf->underlying->wm_write.high -
    350 				    evbuffer_get_length(bevf->underlying->output);
    351 
    352 			res = bevf->process_out(downcast(bevf)->output,
    353 			    bevf->underlying->output,
    354 			    limit,
    355 			    state,
    356 			    bevf->context);
    357 
    358 			if (res == BEV_OK)
    359 				processed = *processed_out = 1;
    360 		} while (/* Stop if the filter wasn't successful...*/
    361 			res == BEV_OK &&
    362 			/* Or if we aren't writing any more. */
    363 			(bufev->enabled & EV_WRITE) &&
    364 			/* Of if we have nothing more to write and we are
    365 			 * not flushing. */
    366 			evbuffer_get_length(bufev->output) &&
    367 			/* Or if we have filled the underlying output buffer. */
    368 			!be_underlying_writebuf_full(bevf,state));
    369 
    370 		if (processed &&
    371 		    evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
    372 			/* call the write callback.*/
    373 			_bufferevent_run_writecb(bufev);
    374 
    375 			if (res == BEV_OK &&
    376 			    (bufev->enabled & EV_WRITE) &&
    377 			    evbuffer_get_length(bufev->output) &&
    378 			    !be_underlying_writebuf_full(bevf, state)) {
    379 				again = 1;
    380 			}
    381 		}
    382 	} while (again);
    383 
    384 	/* reenable the outbuf_cb */
    385 	evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb,
    386 	    EVBUFFER_CB_ENABLED);
    387 
    388 	if (*processed_out)
    389 		BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
    390 
    391 	return res;
    392 }
    393 
    394 /* Called when the size of our outbuf changes. */
    395 static void
    396 bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
    397     const struct evbuffer_cb_info *cbinfo, void *arg)
    398 {
    399 	struct bufferevent_filtered *bevf = arg;
    400 	struct bufferevent *bev = downcast(bevf);
    401 
    402 	if (cbinfo->n_added) {
    403 		int processed_any = 0;
    404 		/* Somebody added more data to the output buffer. Try to
    405 		 * process it, if we should. */
    406 		_bufferevent_incref_and_lock(bev);
    407 		be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
    408 		_bufferevent_decref_and_unlock(bev);
    409 	}
    410 }
    411 
    412 /* Called when the underlying socket has read. */
    413 static void
    414 be_filter_readcb(struct bufferevent *underlying, void *_me)
    415 {
    416 	struct bufferevent_filtered *bevf = _me;
    417 	enum bufferevent_filter_result res;
    418 	enum bufferevent_flush_mode state;
    419 	struct bufferevent *bufev = downcast(bevf);
    420 	int processed_any = 0;
    421 
    422 	_bufferevent_incref_and_lock(bufev);
    423 
    424 	if (bevf->got_eof)
    425 		state = BEV_FINISHED;
    426 	else
    427 		state = BEV_NORMAL;
    428 
    429 	/* XXXX use return value */
    430 	res = be_filter_process_input(bevf, state, &processed_any);
    431 	(void)res;
    432 
    433 	/* XXX This should be in process_input, not here.  There are
    434 	 * other places that can call process-input, and they should
    435 	 * force readcb calls as needed. */
    436 	if (processed_any &&
    437 	    evbuffer_get_length(bufev->input) >= bufev->wm_read.low)
    438 		_bufferevent_run_readcb(bufev);
    439 
    440 	_bufferevent_decref_and_unlock(bufev);
    441 }
    442 
    443 /* Called when the underlying socket has drained enough that we can write to
    444    it. */
    445 static void
    446 be_filter_writecb(struct bufferevent *underlying, void *_me)
    447 {
    448 	struct bufferevent_filtered *bevf = _me;
    449 	struct bufferevent *bev = downcast(bevf);
    450 	int processed_any = 0;
    451 
    452 	_bufferevent_incref_and_lock(bev);
    453 	be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
    454 	_bufferevent_decref_and_unlock(bev);
    455 }
    456 
    457 /* Called when the underlying socket has given us an error */
    458 static void
    459 be_filter_eventcb(struct bufferevent *underlying, short what, void *_me)
    460 {
    461 	struct bufferevent_filtered *bevf = _me;
    462 	struct bufferevent *bev = downcast(bevf);
    463 
    464 	_bufferevent_incref_and_lock(bev);
    465 	/* All we can really to is tell our own eventcb. */
    466 	_bufferevent_run_eventcb(bev, what);
    467 	_bufferevent_decref_and_unlock(bev);
    468 }
    469 
    470 static int
    471 be_filter_flush(struct bufferevent *bufev,
    472     short iotype, enum bufferevent_flush_mode mode)
    473 {
    474 	struct bufferevent_filtered *bevf = upcast(bufev);
    475 	int processed_any = 0;
    476 	EVUTIL_ASSERT(bevf);
    477 
    478 	_bufferevent_incref_and_lock(bufev);
    479 
    480 	if (iotype & EV_READ) {
    481 		be_filter_process_input(bevf, mode, &processed_any);
    482 	}
    483 	if (iotype & EV_WRITE) {
    484 		be_filter_process_output(bevf, mode, &processed_any);
    485 	}
    486 	/* XXX check the return value? */
    487 	/* XXX does this want to recursively call lower-level flushes? */
    488 	bufferevent_flush(bevf->underlying, iotype, mode);
    489 
    490 	_bufferevent_decref_and_unlock(bufev);
    491 
    492 	return processed_any;
    493 }
    494 
    495 static int
    496 be_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
    497     union bufferevent_ctrl_data *data)
    498 {
    499 	struct bufferevent_filtered *bevf;
    500 	switch (op) {
    501 	case BEV_CTRL_GET_UNDERLYING:
    502 		bevf = upcast(bev);
    503 		data->ptr = bevf->underlying;
    504 		return 0;
    505 	case BEV_CTRL_GET_FD:
    506 	case BEV_CTRL_SET_FD:
    507 	case BEV_CTRL_CANCEL_ALL:
    508 	default:
    509 		return -1;
    510 	}
    511 }
    512