Home | History | Annotate | Download | only in libevent
      1 /*
      2  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
      3  * Copyright (c) 2002-2006 Niels Provos <provos (at) citi.umich.edu>
      4  * All rights reserved.
      5  *
      6  * Redistribution and use in source and binary forms, with or without
      7  * modification, are permitted provided that the following conditions
      8  * are met:
      9  * 1. Redistributions of source code must retain the above copyright
     10  *    notice, this list of conditions and the following disclaimer.
     11  * 2. Redistributions in binary form must reproduce the above copyright
     12  *    notice, this list of conditions and the following disclaimer in the
     13  *    documentation and/or other materials provided with the distribution.
     14  * 3. The name of the author may not be used to endorse or promote products
     15  *    derived from this software without specific prior written permission.
     16  *
     17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     27  */
     28 
     29 #include "evconfig-private.h"
     30 
     31 #include <sys/types.h>
     32 
     33 #include "event2/event-config.h"
     34 
     35 #ifdef EVENT__HAVE_SYS_TIME_H
     36 #include <sys/time.h>
     37 #endif
     38 
     39 #include <errno.h>
     40 #include <stdio.h>
     41 #include <stdlib.h>
     42 #include <string.h>
     43 #ifdef EVENT__HAVE_STDARG_H
     44 #include <stdarg.h>
     45 #endif
     46 
     47 #ifdef _WIN32
     48 #include <winsock2.h>
     49 #endif
     50 
     51 #include "event2/util.h"
     52 #include "event2/bufferevent.h"
     53 #include "event2/buffer.h"
     54 #include "event2/bufferevent_struct.h"
     55 #include "event2/event.h"
     56 #include "log-internal.h"
     57 #include "mm-internal.h"
     58 #include "bufferevent-internal.h"
     59 #include "util-internal.h"
     60 
     61 /* prototypes */
     62 static int be_filter_enable(struct bufferevent *, short);
     63 static int be_filter_disable(struct bufferevent *, short);
     64 static void be_filter_unlink(struct bufferevent *);
     65 static void be_filter_destruct(struct bufferevent *);
     66 
     67 static void be_filter_readcb(struct bufferevent *, void *);
     68 static void be_filter_writecb(struct bufferevent *, void *);
     69 static void be_filter_eventcb(struct bufferevent *, short, void *);
     70 static int be_filter_flush(struct bufferevent *bufev,
     71     short iotype, enum bufferevent_flush_mode mode);
     72 static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
     73 
     74 static void bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
     75     const struct evbuffer_cb_info *cbinfo, void *arg);
     76 
     77 static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
     78     const struct evbuffer_cb_info *info, void *arg);
     79 
     80 struct bufferevent_filtered {
     81 	struct bufferevent_private bev;
     82 
     83 	/** The bufferevent that we read/write filtered data from/to. */
     84 	struct bufferevent *underlying;
     85 	/** A callback on our inbuf to notice somebory removes data */
     86 	struct evbuffer_cb_entry *inbuf_cb;
     87 	/** A callback on our outbuf to notice when somebody adds data */
     88 	struct evbuffer_cb_entry *outbuf_cb;
     89 	/** True iff we have received an EOF callback from the underlying
     90 	 * bufferevent. */
     91 	unsigned got_eof;
     92 
     93 	/** Function to free context when we're done. */
     94 	void (*free_context)(void *);
     95 	/** Input filter */
     96 	bufferevent_filter_cb process_in;
     97 	/** Output filter */
     98 	bufferevent_filter_cb process_out;
     99 	/** User-supplied argument to the filters. */
    100 	void *context;
    101 };
    102 
    103 const struct bufferevent_ops bufferevent_ops_filter = {
    104 	"filter",
    105 	evutil_offsetof(struct bufferevent_filtered, bev.bev),
    106 	be_filter_enable,
    107 	be_filter_disable,
    108 	be_filter_unlink,
    109 	be_filter_destruct,
    110 	bufferevent_generic_adj_timeouts_,
    111 	be_filter_flush,
    112 	be_filter_ctrl,
    113 };
    114 
    115 /* Given a bufferevent that's really the bev filter of a bufferevent_filtered,
    116  * return that bufferevent_filtered. Returns NULL otherwise.*/
    117 static inline struct bufferevent_filtered *
    118 upcast(struct bufferevent *bev)
    119 {
    120 	struct bufferevent_filtered *bev_f;
    121 	if (bev->be_ops != &bufferevent_ops_filter)
    122 		return NULL;
    123 	bev_f = (void*)( ((char*)bev) -
    124 			 evutil_offsetof(struct bufferevent_filtered, bev.bev));
    125 	EVUTIL_ASSERT(bev_f->bev.bev.be_ops == &bufferevent_ops_filter);
    126 	return bev_f;
    127 }
    128 
    129 #define downcast(bev_f) (&(bev_f)->bev.bev)
    130 
    131 /** Return 1 iff bevf's underlying bufferevent's output buffer is at or
    132  * over its high watermark such that we should not write to it in a given
    133  * flush mode. */
    134 static int
    135 be_underlying_writebuf_full(struct bufferevent_filtered *bevf,
    136     enum bufferevent_flush_mode state)
    137 {
    138 	struct bufferevent *u = bevf->underlying;
    139 	return state == BEV_NORMAL &&
    140 	    u->wm_write.high &&
    141 	    evbuffer_get_length(u->output) >= u->wm_write.high;
    142 }
    143 
    144 /** Return 1 if our input buffer is at or over its high watermark such that we
    145  * should not write to it in a given flush mode. */
    146 static int
    147 be_readbuf_full(struct bufferevent_filtered *bevf,
    148     enum bufferevent_flush_mode state)
    149 {
    150 	struct bufferevent *bufev = downcast(bevf);
    151 	return state == BEV_NORMAL &&
    152 	    bufev->wm_read.high &&
    153 	    evbuffer_get_length(bufev->input) >= bufev->wm_read.high;
    154 }
    155 
    156 
    157 /* Filter to use when we're created with a NULL filter. */
    158 static enum bufferevent_filter_result
    159 be_null_filter(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t lim,
    160 	       enum bufferevent_flush_mode state, void *ctx)
    161 {
    162 	(void)state;
    163 	if (evbuffer_remove_buffer(src, dst, lim) == 0)
    164 		return BEV_OK;
    165 	else
    166 		return BEV_ERROR;
    167 }
    168 
    169 struct bufferevent *
    170 bufferevent_filter_new(struct bufferevent *underlying,
    171 		       bufferevent_filter_cb input_filter,
    172 		       bufferevent_filter_cb output_filter,
    173 		       int options,
    174 		       void (*free_context)(void *),
    175 		       void *ctx)
    176 {
    177 	struct bufferevent_filtered *bufev_f;
    178 	int tmp_options = options & ~BEV_OPT_THREADSAFE;
    179 
    180 	if (!underlying)
    181 		return NULL;
    182 
    183 	if (!input_filter)
    184 		input_filter = be_null_filter;
    185 	if (!output_filter)
    186 		output_filter = be_null_filter;
    187 
    188 	bufev_f = mm_calloc(1, sizeof(struct bufferevent_filtered));
    189 	if (!bufev_f)
    190 		return NULL;
    191 
    192 	if (bufferevent_init_common_(&bufev_f->bev, underlying->ev_base,
    193 				    &bufferevent_ops_filter, tmp_options) < 0) {
    194 		mm_free(bufev_f);
    195 		return NULL;
    196 	}
    197 	if (options & BEV_OPT_THREADSAFE) {
    198 		bufferevent_enable_locking_(downcast(bufev_f), NULL);
    199 	}
    200 
    201 	bufev_f->underlying = underlying;
    202 
    203 	bufev_f->process_in = input_filter;
    204 	bufev_f->process_out = output_filter;
    205 	bufev_f->free_context = free_context;
    206 	bufev_f->context = ctx;
    207 
    208 	bufferevent_setcb(bufev_f->underlying,
    209 	    be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f);
    210 
    211 	bufev_f->inbuf_cb = evbuffer_add_cb(downcast(bufev_f)->input,
    212 		bufferevent_filtered_inbuf_cb, bufev_f);
    213 	evbuffer_cb_clear_flags(downcast(bufev_f)->input, bufev_f->inbuf_cb,
    214 		EVBUFFER_CB_ENABLED);
    215 
    216 	bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
    217 	   bufferevent_filtered_outbuf_cb, bufev_f);
    218 
    219 	bufferevent_init_generic_timeout_cbs_(downcast(bufev_f));
    220 	bufferevent_incref_(underlying);
    221 
    222 	bufferevent_enable(underlying, EV_READ|EV_WRITE);
    223 	bufferevent_suspend_read_(underlying, BEV_SUSPEND_FILT_READ);
    224 
    225 	return downcast(bufev_f);
    226 }
    227 
    228 static void
    229 be_filter_unlink(struct bufferevent *bev)
    230 {
    231 	struct bufferevent_filtered *bevf = upcast(bev);
    232 	EVUTIL_ASSERT(bevf);
    233 
    234 	if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) {
    235 		/* Yes, there is also a decref in bufferevent_decref_.
    236 		 * That decref corresponds to the incref when we set
    237 		 * underlying for the first time.  This decref is an
    238 		 * extra one to remove the last reference.
    239 		 */
    240 		if (BEV_UPCAST(bevf->underlying)->refcnt < 2) {
    241 			event_warnx("BEV_OPT_CLOSE_ON_FREE set on an "
    242 			    "bufferevent with too few references");
    243 		} else {
    244 			bufferevent_free(bevf->underlying);
    245 		}
    246 	} else {
    247 		if (bevf->underlying) {
    248 			if (bevf->underlying->errorcb == be_filter_eventcb)
    249 				bufferevent_setcb(bevf->underlying,
    250 				    NULL, NULL, NULL, NULL);
    251 			bufferevent_unsuspend_read_(bevf->underlying,
    252 			    BEV_SUSPEND_FILT_READ);
    253 		}
    254 	}
    255 }
    256 
    257 static void
    258 be_filter_destruct(struct bufferevent *bev)
    259 {
    260 	struct bufferevent_filtered *bevf = upcast(bev);
    261 	EVUTIL_ASSERT(bevf);
    262 	if (bevf->free_context)
    263 		bevf->free_context(bevf->context);
    264 
    265 	if (bevf->inbuf_cb)
    266 		evbuffer_remove_cb_entry(bev->input, bevf->inbuf_cb);
    267 
    268 	if (bevf->outbuf_cb)
    269 		evbuffer_remove_cb_entry(bev->output, bevf->outbuf_cb);
    270 }
    271 
    272 static int
    273 be_filter_enable(struct bufferevent *bev, short event)
    274 {
    275 	struct bufferevent_filtered *bevf = upcast(bev);
    276 	if (event & EV_WRITE)
    277 		BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
    278 
    279 	if (event & EV_READ) {
    280 		BEV_RESET_GENERIC_READ_TIMEOUT(bev);
    281 		bufferevent_unsuspend_read_(bevf->underlying,
    282 		    BEV_SUSPEND_FILT_READ);
    283 	}
    284 	return 0;
    285 }
    286 
    287 static int
    288 be_filter_disable(struct bufferevent *bev, short event)
    289 {
    290 	struct bufferevent_filtered *bevf = upcast(bev);
    291 	if (event & EV_WRITE)
    292 		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
    293 	if (event & EV_READ) {
    294 		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
    295 		bufferevent_suspend_read_(bevf->underlying,
    296 		    BEV_SUSPEND_FILT_READ);
    297 	}
    298 	return 0;
    299 }
    300 
    301 static enum bufferevent_filter_result
    302 be_filter_process_input(struct bufferevent_filtered *bevf,
    303 			enum bufferevent_flush_mode state,
    304 			int *processed_out)
    305 {
    306 	enum bufferevent_filter_result res;
    307 	struct bufferevent *bev = downcast(bevf);
    308 
    309 	if (state == BEV_NORMAL) {
    310 		/* If we're in 'normal' mode, don't urge data on the filter
    311 		 * unless we're reading data and under our high-water mark.*/
    312 		if (!(bev->enabled & EV_READ) ||
    313 		    be_readbuf_full(bevf, state))
    314 			return BEV_OK;
    315 	}
    316 
    317 	do {
    318 		ev_ssize_t limit = -1;
    319 		if (state == BEV_NORMAL && bev->wm_read.high)
    320 			limit = bev->wm_read.high -
    321 			    evbuffer_get_length(bev->input);
    322 
    323 		res = bevf->process_in(bevf->underlying->input,
    324 		    bev->input, limit, state, bevf->context);
    325 
    326 		if (res == BEV_OK)
    327 			*processed_out = 1;
    328 	} while (res == BEV_OK &&
    329 		 (bev->enabled & EV_READ) &&
    330 		 evbuffer_get_length(bevf->underlying->input) &&
    331 		 !be_readbuf_full(bevf, state));
    332 
    333 	if (*processed_out)
    334 		BEV_RESET_GENERIC_READ_TIMEOUT(bev);
    335 
    336 	return res;
    337 }
    338 
    339 
    340 static enum bufferevent_filter_result
    341 be_filter_process_output(struct bufferevent_filtered *bevf,
    342 			 enum bufferevent_flush_mode state,
    343 			 int *processed_out)
    344 {
    345 	/* Requires references and lock: might call writecb */
    346 	enum bufferevent_filter_result res = BEV_OK;
    347 	struct bufferevent *bufev = downcast(bevf);
    348 	int again = 0;
    349 
    350 	if (state == BEV_NORMAL) {
    351 		/* If we're in 'normal' mode, don't urge data on the
    352 		 * filter unless we're writing data, and the underlying
    353 		 * bufferevent is accepting data, and we have data to
    354 		 * give the filter.  If we're in 'flush' or 'finish',
    355 		 * call the filter no matter what. */
    356 		if (!(bufev->enabled & EV_WRITE) ||
    357 		    be_underlying_writebuf_full(bevf, state) ||
    358 		    !evbuffer_get_length(bufev->output))
    359 			return BEV_OK;
    360 	}
    361 
    362 	/* disable the callback that calls this function
    363 	   when the user adds to the output buffer. */
    364 	evbuffer_cb_clear_flags(bufev->output, bevf->outbuf_cb,
    365 	    EVBUFFER_CB_ENABLED);
    366 
    367 	do {
    368 		int processed = 0;
    369 		again = 0;
    370 
    371 		do {
    372 			ev_ssize_t limit = -1;
    373 			if (state == BEV_NORMAL &&
    374 			    bevf->underlying->wm_write.high)
    375 				limit = bevf->underlying->wm_write.high -
    376 				    evbuffer_get_length(bevf->underlying->output);
    377 
    378 			res = bevf->process_out(downcast(bevf)->output,
    379 			    bevf->underlying->output,
    380 			    limit,
    381 			    state,
    382 			    bevf->context);
    383 
    384 			if (res == BEV_OK)
    385 				processed = *processed_out = 1;
    386 		} while (/* Stop if the filter wasn't successful...*/
    387 			res == BEV_OK &&
    388 			/* Or if we aren't writing any more. */
    389 			(bufev->enabled & EV_WRITE) &&
    390 			/* Of if we have nothing more to write and we are
    391 			 * not flushing. */
    392 			evbuffer_get_length(bufev->output) &&
    393 			/* Or if we have filled the underlying output buffer. */
    394 			!be_underlying_writebuf_full(bevf,state));
    395 
    396 		if (processed) {
    397 			/* call the write callback.*/
    398 			bufferevent_trigger_nolock_(bufev, EV_WRITE, 0);
    399 
    400 			if (res == BEV_OK &&
    401 			    (bufev->enabled & EV_WRITE) &&
    402 			    evbuffer_get_length(bufev->output) &&
    403 			    !be_underlying_writebuf_full(bevf, state)) {
    404 				again = 1;
    405 			}
    406 		}
    407 	} while (again);
    408 
    409 	/* reenable the outbuf_cb */
    410 	evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb,
    411 	    EVBUFFER_CB_ENABLED);
    412 
    413 	if (*processed_out)
    414 		BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
    415 
    416 	return res;
    417 }
    418 
    419 /* Called when the size of our outbuf changes. */
    420 static void
    421 bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
    422     const struct evbuffer_cb_info *cbinfo, void *arg)
    423 {
    424 	struct bufferevent_filtered *bevf = arg;
    425 	struct bufferevent *bev = downcast(bevf);
    426 
    427 	if (cbinfo->n_added) {
    428 		int processed_any = 0;
    429 		/* Somebody added more data to the output buffer. Try to
    430 		 * process it, if we should. */
    431 		bufferevent_incref_and_lock_(bev);
    432 		be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
    433 		bufferevent_decref_and_unlock_(bev);
    434 	}
    435 }
    436 
    437 static void
    438 be_filter_read_nolock_(struct bufferevent *underlying, void *me_)
    439 {
    440 	struct bufferevent_filtered *bevf = me_;
    441 	enum bufferevent_filter_result res;
    442 	enum bufferevent_flush_mode state;
    443 	struct bufferevent *bufev = downcast(bevf);
    444 	struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
    445 	int processed_any = 0;
    446 
    447 	// It's possible our refcount is 0 at this point if another thread free'd our filterevent
    448 	EVUTIL_ASSERT(bufev_private->refcnt >= 0);
    449 
    450 	// If our refcount is > 0
    451 	if (bufev_private->refcnt > 0) {
    452 
    453 		if (bevf->got_eof)
    454 			state = BEV_FINISHED;
    455 		else
    456 			state = BEV_NORMAL;
    457 
    458 		/* XXXX use return value */
    459 		res = be_filter_process_input(bevf, state, &processed_any);
    460 		(void)res;
    461 
    462 		/* XXX This should be in process_input, not here.  There are
    463 		 * other places that can call process-input, and they should
    464 		 * force readcb calls as needed. */
    465 		if (processed_any) {
    466 			bufferevent_trigger_nolock_(bufev, EV_READ, 0);
    467 			if (evbuffer_get_length(underlying->input) > 0 &&
    468 				be_readbuf_full(bevf, state)) {
    469 				/* data left in underlying buffer and filter input buffer
    470 				 * hit its read high watermark.
    471 				 * Schedule callback to avoid data gets stuck in underlying
    472 				 * input buffer.
    473 				 */
    474 				evbuffer_cb_set_flags(bufev->input, bevf->inbuf_cb,
    475 					EVBUFFER_CB_ENABLED);
    476 			}
    477 		}
    478 	}
    479 }
    480 
    481 /* Called when the size of our inbuf changes. */
    482 static void
    483 bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
    484     const struct evbuffer_cb_info *cbinfo, void *arg)
    485 {
    486 	struct bufferevent_filtered *bevf = arg;
    487 	enum bufferevent_flush_mode state;
    488 	struct bufferevent *bev = downcast(bevf);
    489 
    490 	BEV_LOCK(bev);
    491 
    492 	if (bevf->got_eof)
    493 		state = BEV_FINISHED;
    494 	else
    495 		state = BEV_NORMAL;
    496 
    497 
    498 	if (!be_readbuf_full(bevf, state)) {
    499 		/* opportunity to read data which was left in underlying
    500 		 * input buffer because filter input buffer hit read
    501 		 * high watermark.
    502 		 */
    503 		evbuffer_cb_clear_flags(bev->input, bevf->inbuf_cb,
    504 			EVBUFFER_CB_ENABLED);
    505 		if (evbuffer_get_length(bevf->underlying->input) > 0)
    506 			be_filter_read_nolock_(bevf->underlying, bevf);
    507 	}
    508 
    509 	BEV_UNLOCK(bev);
    510 }
    511 
    512 /* Called when the underlying socket has read. */
    513 static void
    514 be_filter_readcb(struct bufferevent *underlying, void *me_)
    515 {
    516 	struct bufferevent_filtered *bevf = me_;
    517 	struct bufferevent *bev = downcast(bevf);
    518 
    519 	BEV_LOCK(bev);
    520 
    521 	be_filter_read_nolock_(underlying, me_);
    522 
    523 	BEV_UNLOCK(bev);
    524 }
    525 
    526 /* Called when the underlying socket has drained enough that we can write to
    527    it. */
    528 static void
    529 be_filter_writecb(struct bufferevent *underlying, void *me_)
    530 {
    531 	struct bufferevent_filtered *bevf = me_;
    532 	struct bufferevent *bev = downcast(bevf);
    533 	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
    534 	int processed_any = 0;
    535 
    536 	BEV_LOCK(bev);
    537 
    538 	// It's possible our refcount is 0 at this point if another thread free'd our filterevent
    539 	EVUTIL_ASSERT(bufev_private->refcnt >= 0);
    540 
    541 	// If our refcount is > 0
    542 	if (bufev_private->refcnt > 0) {
    543 		be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
    544 	}
    545 
    546 	BEV_UNLOCK(bev);
    547 }
    548 
    549 /* Called when the underlying socket has given us an error */
    550 static void
    551 be_filter_eventcb(struct bufferevent *underlying, short what, void *me_)
    552 {
    553 	struct bufferevent_filtered *bevf = me_;
    554 	struct bufferevent *bev = downcast(bevf);
    555 	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
    556 
    557 	BEV_LOCK(bev);
    558 
    559 	// It's possible our refcount is 0 at this point if another thread free'd our filterevent
    560 	EVUTIL_ASSERT(bufev_private->refcnt >= 0);
    561 
    562 	// If our refcount is > 0
    563 	if (bufev_private->refcnt > 0) {
    564 
    565 		/* All we can really to is tell our own eventcb. */
    566 		bufferevent_run_eventcb_(bev, what, 0);
    567 	}
    568 
    569 	BEV_UNLOCK(bev);
    570 }
    571 
    572 static int
    573 be_filter_flush(struct bufferevent *bufev,
    574     short iotype, enum bufferevent_flush_mode mode)
    575 {
    576 	struct bufferevent_filtered *bevf = upcast(bufev);
    577 	int processed_any = 0;
    578 	EVUTIL_ASSERT(bevf);
    579 
    580 	bufferevent_incref_and_lock_(bufev);
    581 
    582 	if (iotype & EV_READ) {
    583 		be_filter_process_input(bevf, mode, &processed_any);
    584 	}
    585 	if (iotype & EV_WRITE) {
    586 		be_filter_process_output(bevf, mode, &processed_any);
    587 	}
    588 	/* XXX check the return value? */
    589 	/* XXX does this want to recursively call lower-level flushes? */
    590 	bufferevent_flush(bevf->underlying, iotype, mode);
    591 
    592 	bufferevent_decref_and_unlock_(bufev);
    593 
    594 	return processed_any;
    595 }
    596 
    597 static int
    598 be_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
    599     union bufferevent_ctrl_data *data)
    600 {
    601 	struct bufferevent_filtered *bevf;
    602 	switch (op) {
    603 	case BEV_CTRL_GET_UNDERLYING:
    604 		bevf = upcast(bev);
    605 		data->ptr = bevf->underlying;
    606 		return 0;
    607 	case BEV_CTRL_SET_FD:
    608 		bevf = upcast(bev);
    609 
    610 		if (bevf->underlying &&
    611 			bevf->underlying->be_ops &&
    612 			bevf->underlying->be_ops->ctrl) {
    613 		    return (bevf->underlying->be_ops->ctrl)(bevf->underlying, op, data);
    614 		}
    615 
    616 	case BEV_CTRL_GET_FD:
    617 	case BEV_CTRL_CANCEL_ALL:
    618 	default:
    619 		return -1;
    620 	}
    621 
    622 	return -1;
    623 }
    624