Home | History | Annotate | Download | only in libevent
      1 /*
      2  * Copyright (c) 2002-2007 Niels Provos <provos (at) citi.umich.edu>
      3  * Copyright (c) 2007-2012 Niels Provos, Nick Mathewson
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions
      7  * are met:
      8  * 1. Redistributions of source code must retain the above copyright
      9  *    notice, this list of conditions and the following disclaimer.
     10  * 2. Redistributions in binary form must reproduce the above copyright
     11  *    notice, this list of conditions and the following disclaimer in the
     12  *    documentation and/or other materials provided with the distribution.
     13  * 3. The name of the author may not be used to endorse or promote products
     14  *    derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     17  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     18  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     19  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     20  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     21  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     22  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     23  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     25  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #include "event2/event-config.h"
     29 #include "evconfig-private.h"
     30 
     31 #include <sys/types.h>
     32 
     33 #ifdef EVENT__HAVE_SYS_TIME_H
     34 #include <sys/time.h>
     35 #endif
     36 
     37 #include <errno.h>
     38 #include <stdio.h>
     39 #include <stdlib.h>
     40 #include <string.h>
     41 #ifdef EVENT__HAVE_STDARG_H
     42 #include <stdarg.h>
     43 #endif
     44 
     45 #ifdef _WIN32
     46 #include <winsock2.h>
     47 #endif
     48 
     49 #include "event2/util.h"
     50 #include "event2/buffer.h"
     51 #include "event2/buffer_compat.h"
     52 #include "event2/bufferevent.h"
     53 #include "event2/bufferevent_struct.h"
     54 #include "event2/bufferevent_compat.h"
     55 #include "event2/event.h"
     56 #include "event-internal.h"
     57 #include "log-internal.h"
     58 #include "mm-internal.h"
     59 #include "bufferevent-internal.h"
     60 #include "evbuffer-internal.h"
     61 #include "util-internal.h"
     62 
     63 static void bufferevent_cancel_all_(struct bufferevent *bev);
     64 static void bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_);
     65 
     66 void
     67 bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
     68 {
     69 	struct bufferevent_private *bufev_private =
     70 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
     71 	BEV_LOCK(bufev);
     72 	if (!bufev_private->read_suspended)
     73 		bufev->be_ops->disable(bufev, EV_READ);
     74 	bufev_private->read_suspended |= what;
     75 	BEV_UNLOCK(bufev);
     76 }
     77 
     78 void
     79 bufferevent_unsuspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
     80 {
     81 	struct bufferevent_private *bufev_private =
     82 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
     83 	BEV_LOCK(bufev);
     84 	bufev_private->read_suspended &= ~what;
     85 	if (!bufev_private->read_suspended && (bufev->enabled & EV_READ))
     86 		bufev->be_ops->enable(bufev, EV_READ);
     87 	BEV_UNLOCK(bufev);
     88 }
     89 
     90 void
     91 bufferevent_suspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what)
     92 {
     93 	struct bufferevent_private *bufev_private =
     94 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
     95 	BEV_LOCK(bufev);
     96 	if (!bufev_private->write_suspended)
     97 		bufev->be_ops->disable(bufev, EV_WRITE);
     98 	bufev_private->write_suspended |= what;
     99 	BEV_UNLOCK(bufev);
    100 }
    101 
    102 void
    103 bufferevent_unsuspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what)
    104 {
    105 	struct bufferevent_private *bufev_private =
    106 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    107 	BEV_LOCK(bufev);
    108 	bufev_private->write_suspended &= ~what;
    109 	if (!bufev_private->write_suspended && (bufev->enabled & EV_WRITE))
    110 		bufev->be_ops->enable(bufev, EV_WRITE);
    111 	BEV_UNLOCK(bufev);
    112 }
    113 
    114 
    115 /* Callback to implement watermarks on the input buffer.  Only enabled
    116  * if the watermark is set. */
    117 static void
    118 bufferevent_inbuf_wm_cb(struct evbuffer *buf,
    119     const struct evbuffer_cb_info *cbinfo,
    120     void *arg)
    121 {
    122 	struct bufferevent *bufev = arg;
    123 	size_t size;
    124 
    125 	size = evbuffer_get_length(buf);
    126 
    127 	if (size >= bufev->wm_read.high)
    128 		bufferevent_wm_suspend_read(bufev);
    129 	else
    130 		bufferevent_wm_unsuspend_read(bufev);
    131 }
    132 
    133 static void
    134 bufferevent_run_deferred_callbacks_locked(struct event_callback *cb, void *arg)
    135 {
    136 	struct bufferevent_private *bufev_private = arg;
    137 	struct bufferevent *bufev = &bufev_private->bev;
    138 
    139 	BEV_LOCK(bufev);
    140 	if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
    141 	    bufev->errorcb) {
    142 		/* The "connected" happened before any reads or writes, so
    143 		   send it first. */
    144 		bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
    145 		bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg);
    146 	}
    147 	if (bufev_private->readcb_pending && bufev->readcb) {
    148 		bufev_private->readcb_pending = 0;
    149 		bufev->readcb(bufev, bufev->cbarg);
    150 	}
    151 	if (bufev_private->writecb_pending && bufev->writecb) {
    152 		bufev_private->writecb_pending = 0;
    153 		bufev->writecb(bufev, bufev->cbarg);
    154 	}
    155 	if (bufev_private->eventcb_pending && bufev->errorcb) {
    156 		short what = bufev_private->eventcb_pending;
    157 		int err = bufev_private->errno_pending;
    158 		bufev_private->eventcb_pending = 0;
    159 		bufev_private->errno_pending = 0;
    160 		EVUTIL_SET_SOCKET_ERROR(err);
    161 		bufev->errorcb(bufev, what, bufev->cbarg);
    162 	}
    163 	bufferevent_decref_and_unlock_(bufev);
    164 }
    165 
    166 static void
    167 bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg)
    168 {
    169 	struct bufferevent_private *bufev_private = arg;
    170 	struct bufferevent *bufev = &bufev_private->bev;
    171 
    172 	BEV_LOCK(bufev);
    173 #define UNLOCKED(stmt) \
    174 	do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0)
    175 
    176 	if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
    177 	    bufev->errorcb) {
    178 		/* The "connected" happened before any reads or writes, so
    179 		   send it first. */
    180 		bufferevent_event_cb errorcb = bufev->errorcb;
    181 		void *cbarg = bufev->cbarg;
    182 		bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
    183 		UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg));
    184 	}
    185 	if (bufev_private->readcb_pending && bufev->readcb) {
    186 		bufferevent_data_cb readcb = bufev->readcb;
    187 		void *cbarg = bufev->cbarg;
    188 		bufev_private->readcb_pending = 0;
    189 		UNLOCKED(readcb(bufev, cbarg));
    190 	}
    191 	if (bufev_private->writecb_pending && bufev->writecb) {
    192 		bufferevent_data_cb writecb = bufev->writecb;
    193 		void *cbarg = bufev->cbarg;
    194 		bufev_private->writecb_pending = 0;
    195 		UNLOCKED(writecb(bufev, cbarg));
    196 	}
    197 	if (bufev_private->eventcb_pending && bufev->errorcb) {
    198 		bufferevent_event_cb errorcb = bufev->errorcb;
    199 		void *cbarg = bufev->cbarg;
    200 		short what = bufev_private->eventcb_pending;
    201 		int err = bufev_private->errno_pending;
    202 		bufev_private->eventcb_pending = 0;
    203 		bufev_private->errno_pending = 0;
    204 		EVUTIL_SET_SOCKET_ERROR(err);
    205 		UNLOCKED(errorcb(bufev,what,cbarg));
    206 	}
    207 	bufferevent_decref_and_unlock_(bufev);
    208 #undef UNLOCKED
    209 }
    210 
    211 #define SCHEDULE_DEFERRED(bevp)						\
    212 	do {								\
    213 		if (event_deferred_cb_schedule_(			\
    214 			    (bevp)->bev.ev_base,			\
    215 			&(bevp)->deferred))				\
    216 			bufferevent_incref_(&(bevp)->bev);		\
    217 	} while (0)
    218 
    219 
    220 void
    221 bufferevent_run_readcb_(struct bufferevent *bufev, int options)
    222 {
    223 	/* Requires that we hold the lock and a reference */
    224 	struct bufferevent_private *p =
    225 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    226 	if (bufev->readcb == NULL)
    227 		return;
    228 	if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
    229 		p->readcb_pending = 1;
    230 		SCHEDULE_DEFERRED(p);
    231 	} else {
    232 		bufev->readcb(bufev, bufev->cbarg);
    233 	}
    234 }
    235 
    236 void
    237 bufferevent_run_writecb_(struct bufferevent *bufev, int options)
    238 {
    239 	/* Requires that we hold the lock and a reference */
    240 	struct bufferevent_private *p =
    241 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    242 	if (bufev->writecb == NULL)
    243 		return;
    244 	if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
    245 		p->writecb_pending = 1;
    246 		SCHEDULE_DEFERRED(p);
    247 	} else {
    248 		bufev->writecb(bufev, bufev->cbarg);
    249 	}
    250 }
    251 
    252 #define BEV_TRIG_ALL_OPTS (			\
    253 		BEV_TRIG_IGNORE_WATERMARKS|	\
    254 		BEV_TRIG_DEFER_CALLBACKS	\
    255 	)
    256 
    257 void
    258 bufferevent_trigger(struct bufferevent *bufev, short iotype, int options)
    259 {
    260 	bufferevent_incref_and_lock_(bufev);
    261 	bufferevent_trigger_nolock_(bufev, iotype, options&BEV_TRIG_ALL_OPTS);
    262 	bufferevent_decref_and_unlock_(bufev);
    263 }
    264 
    265 void
    266 bufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options)
    267 {
    268 	/* Requires that we hold the lock and a reference */
    269 	struct bufferevent_private *p =
    270 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    271 	if (bufev->errorcb == NULL)
    272 		return;
    273 	if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
    274 		p->eventcb_pending |= what;
    275 		p->errno_pending = EVUTIL_SOCKET_ERROR();
    276 		SCHEDULE_DEFERRED(p);
    277 	} else {
    278 		bufev->errorcb(bufev, what, bufev->cbarg);
    279 	}
    280 }
    281 
    282 void
    283 bufferevent_trigger_event(struct bufferevent *bufev, short what, int options)
    284 {
    285 	bufferevent_incref_and_lock_(bufev);
    286 	bufferevent_run_eventcb_(bufev, what, options&BEV_TRIG_ALL_OPTS);
    287 	bufferevent_decref_and_unlock_(bufev);
    288 }
    289 
    290 int
    291 bufferevent_init_common_(struct bufferevent_private *bufev_private,
    292     struct event_base *base,
    293     const struct bufferevent_ops *ops,
    294     enum bufferevent_options options)
    295 {
    296 	struct bufferevent *bufev = &bufev_private->bev;
    297 
    298 	if (!bufev->input) {
    299 		if ((bufev->input = evbuffer_new()) == NULL)
    300 			return -1;
    301 	}
    302 
    303 	if (!bufev->output) {
    304 		if ((bufev->output = evbuffer_new()) == NULL) {
    305 			evbuffer_free(bufev->input);
    306 			return -1;
    307 		}
    308 	}
    309 
    310 	bufev_private->refcnt = 1;
    311 	bufev->ev_base = base;
    312 
    313 	/* Disable timeouts. */
    314 	evutil_timerclear(&bufev->timeout_read);
    315 	evutil_timerclear(&bufev->timeout_write);
    316 
    317 	bufev->be_ops = ops;
    318 
    319 	bufferevent_ratelim_init_(bufev_private);
    320 
    321 	/*
    322 	 * Set to EV_WRITE so that using bufferevent_write is going to
    323 	 * trigger a callback.  Reading needs to be explicitly enabled
    324 	 * because otherwise no data will be available.
    325 	 */
    326 	bufev->enabled = EV_WRITE;
    327 
    328 #ifndef EVENT__DISABLE_THREAD_SUPPORT
    329 	if (options & BEV_OPT_THREADSAFE) {
    330 		if (bufferevent_enable_locking_(bufev, NULL) < 0) {
    331 			/* cleanup */
    332 			evbuffer_free(bufev->input);
    333 			evbuffer_free(bufev->output);
    334 			bufev->input = NULL;
    335 			bufev->output = NULL;
    336 			return -1;
    337 		}
    338 	}
    339 #endif
    340 	if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
    341 	    == BEV_OPT_UNLOCK_CALLBACKS) {
    342 		event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
    343 		return -1;
    344 	}
    345 	if (options & BEV_OPT_UNLOCK_CALLBACKS)
    346 		event_deferred_cb_init_(
    347 		    &bufev_private->deferred,
    348 		    event_base_get_npriorities(base) / 2,
    349 		    bufferevent_run_deferred_callbacks_unlocked,
    350 		    bufev_private);
    351 	else
    352 		event_deferred_cb_init_(
    353 		    &bufev_private->deferred,
    354 		    event_base_get_npriorities(base) / 2,
    355 		    bufferevent_run_deferred_callbacks_locked,
    356 		    bufev_private);
    357 
    358 	bufev_private->options = options;
    359 
    360 	evbuffer_set_parent_(bufev->input, bufev);
    361 	evbuffer_set_parent_(bufev->output, bufev);
    362 
    363 	return 0;
    364 }
    365 
    366 void
    367 bufferevent_setcb(struct bufferevent *bufev,
    368     bufferevent_data_cb readcb, bufferevent_data_cb writecb,
    369     bufferevent_event_cb eventcb, void *cbarg)
    370 {
    371 	BEV_LOCK(bufev);
    372 
    373 	bufev->readcb = readcb;
    374 	bufev->writecb = writecb;
    375 	bufev->errorcb = eventcb;
    376 
    377 	bufev->cbarg = cbarg;
    378 	BEV_UNLOCK(bufev);
    379 }
    380 
    381 void
    382 bufferevent_getcb(struct bufferevent *bufev,
    383     bufferevent_data_cb *readcb_ptr,
    384     bufferevent_data_cb *writecb_ptr,
    385     bufferevent_event_cb *eventcb_ptr,
    386     void **cbarg_ptr)
    387 {
    388 	BEV_LOCK(bufev);
    389 	if (readcb_ptr)
    390 		*readcb_ptr = bufev->readcb;
    391 	if (writecb_ptr)
    392 		*writecb_ptr = bufev->writecb;
    393 	if (eventcb_ptr)
    394 		*eventcb_ptr = bufev->errorcb;
    395 	if (cbarg_ptr)
    396 		*cbarg_ptr = bufev->cbarg;
    397 
    398 	BEV_UNLOCK(bufev);
    399 }
    400 
    401 struct evbuffer *
    402 bufferevent_get_input(struct bufferevent *bufev)
    403 {
    404 	return bufev->input;
    405 }
    406 
    407 struct evbuffer *
    408 bufferevent_get_output(struct bufferevent *bufev)
    409 {
    410 	return bufev->output;
    411 }
    412 
    413 struct event_base *
    414 bufferevent_get_base(struct bufferevent *bufev)
    415 {
    416 	return bufev->ev_base;
    417 }
    418 
    419 int
    420 bufferevent_get_priority(const struct bufferevent *bufev)
    421 {
    422 	if (event_initialized(&bufev->ev_read)) {
    423 		return event_get_priority(&bufev->ev_read);
    424 	} else {
    425 		return event_base_get_npriorities(bufev->ev_base) / 2;
    426 	}
    427 }
    428 
    429 int
    430 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
    431 {
    432 	if (evbuffer_add(bufev->output, data, size) == -1)
    433 		return (-1);
    434 
    435 	return 0;
    436 }
    437 
    438 int
    439 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
    440 {
    441 	if (evbuffer_add_buffer(bufev->output, buf) == -1)
    442 		return (-1);
    443 
    444 	return 0;
    445 }
    446 
    447 size_t
    448 bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
    449 {
    450 	return (evbuffer_remove(bufev->input, data, size));
    451 }
    452 
    453 int
    454 bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf)
    455 {
    456 	return (evbuffer_add_buffer(buf, bufev->input));
    457 }
    458 
    459 int
    460 bufferevent_enable(struct bufferevent *bufev, short event)
    461 {
    462 	struct bufferevent_private *bufev_private =
    463 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    464 	short impl_events = event;
    465 	int r = 0;
    466 
    467 	bufferevent_incref_and_lock_(bufev);
    468 	if (bufev_private->read_suspended)
    469 		impl_events &= ~EV_READ;
    470 	if (bufev_private->write_suspended)
    471 		impl_events &= ~EV_WRITE;
    472 
    473 	bufev->enabled |= event;
    474 
    475 	if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
    476 		r = -1;
    477 
    478 	bufferevent_decref_and_unlock_(bufev);
    479 	return r;
    480 }
    481 
    482 int
    483 bufferevent_set_timeouts(struct bufferevent *bufev,
    484 			 const struct timeval *tv_read,
    485 			 const struct timeval *tv_write)
    486 {
    487 	int r = 0;
    488 	BEV_LOCK(bufev);
    489 	if (tv_read) {
    490 		bufev->timeout_read = *tv_read;
    491 	} else {
    492 		evutil_timerclear(&bufev->timeout_read);
    493 	}
    494 	if (tv_write) {
    495 		bufev->timeout_write = *tv_write;
    496 	} else {
    497 		evutil_timerclear(&bufev->timeout_write);
    498 	}
    499 
    500 	if (bufev->be_ops->adj_timeouts)
    501 		r = bufev->be_ops->adj_timeouts(bufev);
    502 	BEV_UNLOCK(bufev);
    503 
    504 	return r;
    505 }
    506 
    507 
    508 /* Obsolete; use bufferevent_set_timeouts */
    509 void
    510 bufferevent_settimeout(struct bufferevent *bufev,
    511 		       int timeout_read, int timeout_write)
    512 {
    513 	struct timeval tv_read, tv_write;
    514 	struct timeval *ptv_read = NULL, *ptv_write = NULL;
    515 
    516 	memset(&tv_read, 0, sizeof(tv_read));
    517 	memset(&tv_write, 0, sizeof(tv_write));
    518 
    519 	if (timeout_read) {
    520 		tv_read.tv_sec = timeout_read;
    521 		ptv_read = &tv_read;
    522 	}
    523 	if (timeout_write) {
    524 		tv_write.tv_sec = timeout_write;
    525 		ptv_write = &tv_write;
    526 	}
    527 
    528 	bufferevent_set_timeouts(bufev, ptv_read, ptv_write);
    529 }
    530 
    531 
    532 int
    533 bufferevent_disable_hard_(struct bufferevent *bufev, short event)
    534 {
    535 	int r = 0;
    536 	struct bufferevent_private *bufev_private =
    537 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    538 
    539 	BEV_LOCK(bufev);
    540 	bufev->enabled &= ~event;
    541 
    542 	bufev_private->connecting = 0;
    543 	if (bufev->be_ops->disable(bufev, event) < 0)
    544 		r = -1;
    545 
    546 	BEV_UNLOCK(bufev);
    547 	return r;
    548 }
    549 
    550 int
    551 bufferevent_disable(struct bufferevent *bufev, short event)
    552 {
    553 	int r = 0;
    554 
    555 	BEV_LOCK(bufev);
    556 	bufev->enabled &= ~event;
    557 
    558 	if (bufev->be_ops->disable(bufev, event) < 0)
    559 		r = -1;
    560 
    561 	BEV_UNLOCK(bufev);
    562 	return r;
    563 }
    564 
    565 /*
    566  * Sets the water marks
    567  */
    568 
    569 void
    570 bufferevent_setwatermark(struct bufferevent *bufev, short events,
    571     size_t lowmark, size_t highmark)
    572 {
    573 	struct bufferevent_private *bufev_private =
    574 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    575 
    576 	BEV_LOCK(bufev);
    577 	if (events & EV_WRITE) {
    578 		bufev->wm_write.low = lowmark;
    579 		bufev->wm_write.high = highmark;
    580 	}
    581 
    582 	if (events & EV_READ) {
    583 		bufev->wm_read.low = lowmark;
    584 		bufev->wm_read.high = highmark;
    585 
    586 		if (highmark) {
    587 			/* There is now a new high-water mark for read.
    588 			   enable the callback if needed, and see if we should
    589 			   suspend/bufferevent_wm_unsuspend. */
    590 
    591 			if (bufev_private->read_watermarks_cb == NULL) {
    592 				bufev_private->read_watermarks_cb =
    593 				    evbuffer_add_cb(bufev->input,
    594 						    bufferevent_inbuf_wm_cb,
    595 						    bufev);
    596 			}
    597 			evbuffer_cb_set_flags(bufev->input,
    598 				      bufev_private->read_watermarks_cb,
    599 				      EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER);
    600 
    601 			if (evbuffer_get_length(bufev->input) >= highmark)
    602 				bufferevent_wm_suspend_read(bufev);
    603 			else if (evbuffer_get_length(bufev->input) < highmark)
    604 				bufferevent_wm_unsuspend_read(bufev);
    605 		} else {
    606 			/* There is now no high-water mark for read. */
    607 			if (bufev_private->read_watermarks_cb)
    608 				evbuffer_cb_clear_flags(bufev->input,
    609 				    bufev_private->read_watermarks_cb,
    610 				    EVBUFFER_CB_ENABLED);
    611 			bufferevent_wm_unsuspend_read(bufev);
    612 		}
    613 	}
    614 	BEV_UNLOCK(bufev);
    615 }
    616 
    617 int
    618 bufferevent_getwatermark(struct bufferevent *bufev, short events,
    619     size_t *lowmark, size_t *highmark)
    620 {
    621 	if (events == EV_WRITE) {
    622 		BEV_LOCK(bufev);
    623 		if (lowmark)
    624 			*lowmark = bufev->wm_write.low;
    625 		if (highmark)
    626 			*highmark = bufev->wm_write.high;
    627 		BEV_UNLOCK(bufev);
    628 		return 0;
    629 	}
    630 
    631 	if (events == EV_READ) {
    632 		BEV_LOCK(bufev);
    633 		if (lowmark)
    634 			*lowmark = bufev->wm_read.low;
    635 		if (highmark)
    636 			*highmark = bufev->wm_read.high;
    637 		BEV_UNLOCK(bufev);
    638 		return 0;
    639 	}
    640 	return -1;
    641 }
    642 
    643 int
    644 bufferevent_flush(struct bufferevent *bufev,
    645     short iotype,
    646     enum bufferevent_flush_mode mode)
    647 {
    648 	int r = -1;
    649 	BEV_LOCK(bufev);
    650 	if (bufev->be_ops->flush)
    651 		r = bufev->be_ops->flush(bufev, iotype, mode);
    652 	BEV_UNLOCK(bufev);
    653 	return r;
    654 }
    655 
    656 void
    657 bufferevent_incref_and_lock_(struct bufferevent *bufev)
    658 {
    659 	struct bufferevent_private *bufev_private =
    660 	    BEV_UPCAST(bufev);
    661 	BEV_LOCK(bufev);
    662 	++bufev_private->refcnt;
    663 }
    664 
    665 #if 0
    666 static void
    667 bufferevent_transfer_lock_ownership_(struct bufferevent *donor,
    668     struct bufferevent *recipient)
    669 {
    670 	struct bufferevent_private *d = BEV_UPCAST(donor);
    671 	struct bufferevent_private *r = BEV_UPCAST(recipient);
    672 	if (d->lock != r->lock)
    673 		return;
    674 	if (r->own_lock)
    675 		return;
    676 	if (d->own_lock) {
    677 		d->own_lock = 0;
    678 		r->own_lock = 1;
    679 	}
    680 }
    681 #endif
    682 
    683 int
    684 bufferevent_decref_and_unlock_(struct bufferevent *bufev)
    685 {
    686 	struct bufferevent_private *bufev_private =
    687 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    688 	int n_cbs = 0;
    689 #define MAX_CBS 16
    690 	struct event_callback *cbs[MAX_CBS];
    691 
    692 	EVUTIL_ASSERT(bufev_private->refcnt > 0);
    693 
    694 	if (--bufev_private->refcnt) {
    695 		BEV_UNLOCK(bufev);
    696 		return 0;
    697 	}
    698 
    699 	if (bufev->be_ops->unlink)
    700 		bufev->be_ops->unlink(bufev);
    701 
    702 	/* Okay, we're out of references. Let's finalize this once all the
    703 	 * callbacks are done running. */
    704 	cbs[0] = &bufev->ev_read.ev_evcallback;
    705 	cbs[1] = &bufev->ev_write.ev_evcallback;
    706 	cbs[2] = &bufev_private->deferred;
    707 	n_cbs = 3;
    708 	if (bufev_private->rate_limiting) {
    709 		struct event *e = &bufev_private->rate_limiting->refill_bucket_event;
    710 		if (event_initialized(e))
    711 			cbs[n_cbs++] = &e->ev_evcallback;
    712 	}
    713 	n_cbs += evbuffer_get_callbacks_(bufev->input, cbs+n_cbs, MAX_CBS-n_cbs);
    714 	n_cbs += evbuffer_get_callbacks_(bufev->output, cbs+n_cbs, MAX_CBS-n_cbs);
    715 
    716 	event_callback_finalize_many_(bufev->ev_base, n_cbs, cbs,
    717 	    bufferevent_finalize_cb_);
    718 
    719 #undef MAX_CBS
    720 	BEV_UNLOCK(bufev);
    721 
    722 	return 1;
    723 }
    724 
    725 static void
    726 bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_)
    727 {
    728 	struct bufferevent *bufev = arg_;
    729 	struct bufferevent *underlying;
    730 	struct bufferevent_private *bufev_private =
    731 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    732 
    733 	BEV_LOCK(bufev);
    734 	underlying = bufferevent_get_underlying(bufev);
    735 
    736 	/* Clean up the shared info */
    737 	if (bufev->be_ops->destruct)
    738 		bufev->be_ops->destruct(bufev);
    739 
    740 	/* XXX what happens if refcnt for these buffers is > 1?
    741 	 * The buffers can share a lock with this bufferevent object,
    742 	 * but the lock might be destroyed below. */
    743 	/* evbuffer will free the callbacks */
    744 	evbuffer_free(bufev->input);
    745 	evbuffer_free(bufev->output);
    746 
    747 	if (bufev_private->rate_limiting) {
    748 		if (bufev_private->rate_limiting->group)
    749 			bufferevent_remove_from_rate_limit_group_internal_(bufev,0);
    750 		mm_free(bufev_private->rate_limiting);
    751 		bufev_private->rate_limiting = NULL;
    752 	}
    753 
    754 
    755 	BEV_UNLOCK(bufev);
    756 
    757 	if (bufev_private->own_lock)
    758 		EVTHREAD_FREE_LOCK(bufev_private->lock,
    759 		    EVTHREAD_LOCKTYPE_RECURSIVE);
    760 
    761 	/* Free the actual allocated memory. */
    762 	mm_free(((char*)bufev) - bufev->be_ops->mem_offset);
    763 
    764 	/* Release the reference to underlying now that we no longer need the
    765 	 * reference to it.  We wait this long mainly in case our lock is
    766 	 * shared with underlying.
    767 	 *
    768 	 * The 'destruct' function will also drop a reference to underlying
    769 	 * if BEV_OPT_CLOSE_ON_FREE is set.
    770 	 *
    771 	 * XXX Should we/can we just refcount evbuffer/bufferevent locks?
    772 	 * It would probably save us some headaches.
    773 	 */
    774 	if (underlying)
    775 		bufferevent_decref_(underlying);
    776 }
    777 
    778 int
    779 bufferevent_decref(struct bufferevent *bufev)
    780 {
    781 	BEV_LOCK(bufev);
    782 	return bufferevent_decref_and_unlock_(bufev);
    783 }
    784 
    785 void
    786 bufferevent_free(struct bufferevent *bufev)
    787 {
    788 	BEV_LOCK(bufev);
    789 	bufferevent_setcb(bufev, NULL, NULL, NULL, NULL);
    790 	bufferevent_cancel_all_(bufev);
    791 	bufferevent_decref_and_unlock_(bufev);
    792 }
    793 
    794 void
    795 bufferevent_incref(struct bufferevent *bufev)
    796 {
    797 	struct bufferevent_private *bufev_private =
    798 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    799 
    800 	/* XXX: now that this function is public, we might want to
    801 	 * - return the count from this function
    802 	 * - create a new function to atomically grab the current refcount
    803 	 */
    804 	BEV_LOCK(bufev);
    805 	++bufev_private->refcnt;
    806 	BEV_UNLOCK(bufev);
    807 }
    808 
    809 int
    810 bufferevent_enable_locking_(struct bufferevent *bufev, void *lock)
    811 {
    812 #ifdef EVENT__DISABLE_THREAD_SUPPORT
    813 	return -1;
    814 #else
    815 	struct bufferevent *underlying;
    816 
    817 	if (BEV_UPCAST(bufev)->lock)
    818 		return -1;
    819 	underlying = bufferevent_get_underlying(bufev);
    820 
    821 	if (!lock && underlying && BEV_UPCAST(underlying)->lock) {
    822 		lock = BEV_UPCAST(underlying)->lock;
    823 		BEV_UPCAST(bufev)->lock = lock;
    824 		BEV_UPCAST(bufev)->own_lock = 0;
    825 	} else if (!lock) {
    826 		EVTHREAD_ALLOC_LOCK(lock, EVTHREAD_LOCKTYPE_RECURSIVE);
    827 		if (!lock)
    828 			return -1;
    829 		BEV_UPCAST(bufev)->lock = lock;
    830 		BEV_UPCAST(bufev)->own_lock = 1;
    831 	} else {
    832 		BEV_UPCAST(bufev)->lock = lock;
    833 		BEV_UPCAST(bufev)->own_lock = 0;
    834 	}
    835 	evbuffer_enable_locking(bufev->input, lock);
    836 	evbuffer_enable_locking(bufev->output, lock);
    837 
    838 	if (underlying && !BEV_UPCAST(underlying)->lock)
    839 		bufferevent_enable_locking_(underlying, lock);
    840 
    841 	return 0;
    842 #endif
    843 }
    844 
    845 int
    846 bufferevent_setfd(struct bufferevent *bev, evutil_socket_t fd)
    847 {
    848 	union bufferevent_ctrl_data d;
    849 	int res = -1;
    850 	d.fd = fd;
    851 	BEV_LOCK(bev);
    852 	if (bev->be_ops->ctrl)
    853 		res = bev->be_ops->ctrl(bev, BEV_CTRL_SET_FD, &d);
    854 	BEV_UNLOCK(bev);
    855 	return res;
    856 }
    857 
    858 evutil_socket_t
    859 bufferevent_getfd(struct bufferevent *bev)
    860 {
    861 	union bufferevent_ctrl_data d;
    862 	int res = -1;
    863 	d.fd = -1;
    864 	BEV_LOCK(bev);
    865 	if (bev->be_ops->ctrl)
    866 		res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_FD, &d);
    867 	BEV_UNLOCK(bev);
    868 	return (res<0) ? -1 : d.fd;
    869 }
    870 
    871 enum bufferevent_options
    872 bufferevent_get_options_(struct bufferevent *bev)
    873 {
    874 	struct bufferevent_private *bev_p =
    875 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
    876 	enum bufferevent_options options;
    877 
    878 	BEV_LOCK(bev);
    879 	options = bev_p->options;
    880 	BEV_UNLOCK(bev);
    881 	return options;
    882 }
    883 
    884 
    885 static void
    886 bufferevent_cancel_all_(struct bufferevent *bev)
    887 {
    888 	union bufferevent_ctrl_data d;
    889 	memset(&d, 0, sizeof(d));
    890 	BEV_LOCK(bev);
    891 	if (bev->be_ops->ctrl)
    892 		bev->be_ops->ctrl(bev, BEV_CTRL_CANCEL_ALL, &d);
    893 	BEV_UNLOCK(bev);
    894 }
    895 
    896 short
    897 bufferevent_get_enabled(struct bufferevent *bufev)
    898 {
    899 	short r;
    900 	BEV_LOCK(bufev);
    901 	r = bufev->enabled;
    902 	BEV_UNLOCK(bufev);
    903 	return r;
    904 }
    905 
    906 struct bufferevent *
    907 bufferevent_get_underlying(struct bufferevent *bev)
    908 {
    909 	union bufferevent_ctrl_data d;
    910 	int res = -1;
    911 	d.ptr = NULL;
    912 	BEV_LOCK(bev);
    913 	if (bev->be_ops->ctrl)
    914 		res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_UNDERLYING, &d);
    915 	BEV_UNLOCK(bev);
    916 	return (res<0) ? NULL : d.ptr;
    917 }
    918 
    919 static void
    920 bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx)
    921 {
    922 	struct bufferevent *bev = ctx;
    923 	bufferevent_incref_and_lock_(bev);
    924 	bufferevent_disable(bev, EV_READ);
    925 	bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING, 0);
    926 	bufferevent_decref_and_unlock_(bev);
    927 }
    928 static void
    929 bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
    930 {
    931 	struct bufferevent *bev = ctx;
    932 	bufferevent_incref_and_lock_(bev);
    933 	bufferevent_disable(bev, EV_WRITE);
    934 	bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING, 0);
    935 	bufferevent_decref_and_unlock_(bev);
    936 }
    937 
    938 void
    939 bufferevent_init_generic_timeout_cbs_(struct bufferevent *bev)
    940 {
    941 	event_assign(&bev->ev_read, bev->ev_base, -1, EV_FINALIZE,
    942 	    bufferevent_generic_read_timeout_cb, bev);
    943 	event_assign(&bev->ev_write, bev->ev_base, -1, EV_FINALIZE,
    944 	    bufferevent_generic_write_timeout_cb, bev);
    945 }
    946 
    947 int
    948 bufferevent_generic_adj_timeouts_(struct bufferevent *bev)
    949 {
    950 	const short enabled = bev->enabled;
    951 	struct bufferevent_private *bev_p =
    952 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
    953 	int r1=0, r2=0;
    954 	if ((enabled & EV_READ) && !bev_p->read_suspended &&
    955 	    evutil_timerisset(&bev->timeout_read))
    956 		r1 = event_add(&bev->ev_read, &bev->timeout_read);
    957 	else
    958 		r1 = event_del(&bev->ev_read);
    959 
    960 	if ((enabled & EV_WRITE) && !bev_p->write_suspended &&
    961 	    evutil_timerisset(&bev->timeout_write) &&
    962 	    evbuffer_get_length(bev->output))
    963 		r2 = event_add(&bev->ev_write, &bev->timeout_write);
    964 	else
    965 		r2 = event_del(&bev->ev_write);
    966 	if (r1 < 0 || r2 < 0)
    967 		return -1;
    968 	return 0;
    969 }
    970 
    971 int
    972 bufferevent_generic_adj_existing_timeouts_(struct bufferevent *bev)
    973 {
    974 	int r = 0;
    975 	if (event_pending(&bev->ev_read, EV_READ, NULL)) {
    976 		if (evutil_timerisset(&bev->timeout_read)) {
    977 			    if (bufferevent_add_event_(&bev->ev_read, &bev->timeout_read) < 0)
    978 				    r = -1;
    979 		} else {
    980 			event_remove_timer(&bev->ev_read);
    981 		}
    982 	}
    983 	if (event_pending(&bev->ev_write, EV_WRITE, NULL)) {
    984 		if (evutil_timerisset(&bev->timeout_write)) {
    985 			if (bufferevent_add_event_(&bev->ev_write, &bev->timeout_write) < 0)
    986 				r = -1;
    987 		} else {
    988 			event_remove_timer(&bev->ev_write);
    989 		}
    990 	}
    991 	return r;
    992 }
    993 
    994 int
    995 bufferevent_add_event_(struct event *ev, const struct timeval *tv)
    996 {
    997 	if (!evutil_timerisset(tv))
    998 		return event_add(ev, NULL);
    999 	else
   1000 		return event_add(ev, tv);
   1001 }
   1002 
   1003 /* For use by user programs only; internally, we should be calling
   1004    either bufferevent_incref_and_lock_(), or BEV_LOCK. */
   1005 void
   1006 bufferevent_lock(struct bufferevent *bev)
   1007 {
   1008 	bufferevent_incref_and_lock_(bev);
   1009 }
   1010 
   1011 void
   1012 bufferevent_unlock(struct bufferevent *bev)
   1013 {
   1014 	bufferevent_decref_and_unlock_(bev);
   1015 }
   1016