Home | History | Annotate | Download | only in libevent
      1 /*
      2  * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson
      3  *
      4  * Redistribution and use in source and binary forms, with or without
      5  * modification, are permitted provided that the following conditions
      6  * are met:
      7  * 1. Redistributions of source code must retain the above copyright
      8  *    notice, this list of conditions and the following disclaimer.
      9  * 2. Redistributions in binary form must reproduce the above copyright
     10  *    notice, this list of conditions and the following disclaimer in the
     11  *    documentation and/or other materials provided with the distribution.
     12  * 3. The name of the author may not be used to endorse or promote products
     13  *    derived from this software without specific prior written permission.
     14  *
     15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     17  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     18  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     19  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     20  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     21  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     22  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     24  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     25  */
     26 
     27 #include <sys/types.h>
     28 
     29 #ifdef WIN32
     30 #include <winsock2.h>
     31 #endif
     32 
     33 #include "event2/event-config.h"
     34 
     35 #include "event2/util.h"
     36 #include "event2/buffer.h"
     37 #include "event2/bufferevent.h"
     38 #include "event2/bufferevent_struct.h"
     39 #include "event2/event.h"
     40 #include "defer-internal.h"
     41 #include "bufferevent-internal.h"
     42 #include "mm-internal.h"
     43 #include "util-internal.h"
     44 
     45 struct bufferevent_pair {
     46 	struct bufferevent_private bev;
     47 	struct bufferevent_pair *partner;
     48 };
     49 
     50 
     51 /* Given a bufferevent that's really a bev part of a bufferevent_pair,
     52  * return that bufferevent_filtered. Returns NULL otherwise.*/
     53 static inline struct bufferevent_pair *
     54 upcast(struct bufferevent *bev)
     55 {
     56 	struct bufferevent_pair *bev_p;
     57 	if (bev->be_ops != &bufferevent_ops_pair)
     58 		return NULL;
     59 	bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev.bev);
     60 	EVUTIL_ASSERT(bev_p->bev.bev.be_ops == &bufferevent_ops_pair);
     61 	return bev_p;
     62 }
     63 
     64 #define downcast(bev_pair) (&(bev_pair)->bev.bev)
     65 
     66 static inline void
     67 incref_and_lock(struct bufferevent *b)
     68 {
     69 	struct bufferevent_pair *bevp;
     70 	_bufferevent_incref_and_lock(b);
     71 	bevp = upcast(b);
     72 	if (bevp->partner)
     73 		_bufferevent_incref_and_lock(downcast(bevp->partner));
     74 }
     75 
     76 static inline void
     77 decref_and_unlock(struct bufferevent *b)
     78 {
     79 	struct bufferevent_pair *bevp = upcast(b);
     80 	if (bevp->partner)
     81 		_bufferevent_decref_and_unlock(downcast(bevp->partner));
     82 	_bufferevent_decref_and_unlock(b);
     83 }
     84 
     85 /* XXX Handle close */
     86 
     87 static void be_pair_outbuf_cb(struct evbuffer *,
     88     const struct evbuffer_cb_info *, void *);
     89 
     90 static struct bufferevent_pair *
     91 bufferevent_pair_elt_new(struct event_base *base,
     92     int options)
     93 {
     94 	struct bufferevent_pair *bufev;
     95 	if (! (bufev = mm_calloc(1, sizeof(struct bufferevent_pair))))
     96 		return NULL;
     97 	if (bufferevent_init_common(&bufev->bev, base, &bufferevent_ops_pair,
     98 		options)) {
     99 		mm_free(bufev);
    100 		return NULL;
    101 	}
    102 	if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) {
    103 		bufferevent_free(downcast(bufev));
    104 		return NULL;
    105 	}
    106 
    107 	_bufferevent_init_generic_timeout_cbs(&bufev->bev.bev);
    108 
    109 	return bufev;
    110 }
    111 
    112 int
    113 bufferevent_pair_new(struct event_base *base, int options,
    114     struct bufferevent *pair[2])
    115 {
    116 	struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL;
    117 	int tmp_options;
    118 
    119 	options |= BEV_OPT_DEFER_CALLBACKS;
    120 	tmp_options = options & ~BEV_OPT_THREADSAFE;
    121 
    122 	bufev1 = bufferevent_pair_elt_new(base, options);
    123 	if (!bufev1)
    124 		return -1;
    125 	bufev2 = bufferevent_pair_elt_new(base, tmp_options);
    126 	if (!bufev2) {
    127 		bufferevent_free(downcast(bufev1));
    128 		return -1;
    129 	}
    130 
    131 	if (options & BEV_OPT_THREADSAFE) {
    132 		/*XXXX check return */
    133 		bufferevent_enable_locking(downcast(bufev2), bufev1->bev.lock);
    134 	}
    135 
    136 	bufev1->partner = bufev2;
    137 	bufev2->partner = bufev1;
    138 
    139 	evbuffer_freeze(downcast(bufev1)->input, 0);
    140 	evbuffer_freeze(downcast(bufev1)->output, 1);
    141 	evbuffer_freeze(downcast(bufev2)->input, 0);
    142 	evbuffer_freeze(downcast(bufev2)->output, 1);
    143 
    144 	pair[0] = downcast(bufev1);
    145 	pair[1] = downcast(bufev2);
    146 
    147 	return 0;
    148 }
    149 
    150 static void
    151 be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
    152     int ignore_wm)
    153 {
    154 	size_t src_size, dst_size;
    155 	size_t n;
    156 
    157 	evbuffer_unfreeze(src->output, 1);
    158 	evbuffer_unfreeze(dst->input, 0);
    159 
    160 	if (dst->wm_read.high) {
    161 		dst_size = evbuffer_get_length(dst->input);
    162 		if (dst_size < dst->wm_read.high) {
    163 			n = dst->wm_read.high - dst_size;
    164 			evbuffer_remove_buffer(src->output, dst->input, n);
    165 		} else {
    166 			if (!ignore_wm)
    167 				goto done;
    168 			n = evbuffer_get_length(src->output);
    169 			evbuffer_add_buffer(dst->input, src->output);
    170 		}
    171 	} else {
    172 		n = evbuffer_get_length(src->output);
    173 		evbuffer_add_buffer(dst->input, src->output);
    174 	}
    175 
    176 	if (n) {
    177 		BEV_RESET_GENERIC_READ_TIMEOUT(dst);
    178 
    179 		if (evbuffer_get_length(dst->output))
    180 			BEV_RESET_GENERIC_WRITE_TIMEOUT(dst);
    181 		else
    182 			BEV_DEL_GENERIC_WRITE_TIMEOUT(dst);
    183 	}
    184 
    185 	src_size = evbuffer_get_length(src->output);
    186 	dst_size = evbuffer_get_length(dst->input);
    187 
    188 	if (dst_size >= dst->wm_read.low) {
    189 		_bufferevent_run_readcb(dst);
    190 	}
    191 	if (src_size <= src->wm_write.low) {
    192 		_bufferevent_run_writecb(src);
    193 	}
    194 done:
    195 	evbuffer_freeze(src->output, 1);
    196 	evbuffer_freeze(dst->input, 0);
    197 }
    198 
    199 static inline int
    200 be_pair_wants_to_talk(struct bufferevent_pair *src,
    201     struct bufferevent_pair *dst)
    202 {
    203 	return (downcast(src)->enabled & EV_WRITE) &&
    204 	    (downcast(dst)->enabled & EV_READ) &&
    205 	    !dst->bev.read_suspended &&
    206 	    evbuffer_get_length(downcast(src)->output);
    207 }
    208 
    209 static void
    210 be_pair_outbuf_cb(struct evbuffer *outbuf,
    211     const struct evbuffer_cb_info *info, void *arg)
    212 {
    213 	struct bufferevent_pair *bev_pair = arg;
    214 	struct bufferevent_pair *partner = bev_pair->partner;
    215 
    216 	incref_and_lock(downcast(bev_pair));
    217 
    218 	if (info->n_added > info->n_deleted && partner) {
    219 		/* We got more data.  If the other side's reading, then
    220 		   hand it over. */
    221 		if (be_pair_wants_to_talk(bev_pair, partner)) {
    222 			be_pair_transfer(downcast(bev_pair), downcast(partner), 0);
    223 		}
    224 	}
    225 
    226 	decref_and_unlock(downcast(bev_pair));
    227 }
    228 
    229 static int
    230 be_pair_enable(struct bufferevent *bufev, short events)
    231 {
    232 	struct bufferevent_pair *bev_p = upcast(bufev);
    233 	struct bufferevent_pair *partner = bev_p->partner;
    234 
    235 	incref_and_lock(bufev);
    236 
    237 	if (events & EV_READ) {
    238 		BEV_RESET_GENERIC_READ_TIMEOUT(bufev);
    239 	}
    240 	if ((events & EV_WRITE) && evbuffer_get_length(bufev->output))
    241 		BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
    242 
    243 	/* We're starting to read! Does the other side have anything to write?*/
    244 	if ((events & EV_READ) && partner &&
    245 	    be_pair_wants_to_talk(partner, bev_p)) {
    246 		be_pair_transfer(downcast(partner), bufev, 0);
    247 	}
    248 	/* We're starting to write! Does the other side want to read? */
    249 	if ((events & EV_WRITE) && partner &&
    250 	    be_pair_wants_to_talk(bev_p, partner)) {
    251 		be_pair_transfer(bufev, downcast(partner), 0);
    252 	}
    253 	decref_and_unlock(bufev);
    254 	return 0;
    255 }
    256 
    257 static int
    258 be_pair_disable(struct bufferevent *bev, short events)
    259 {
    260 	if (events & EV_READ) {
    261 		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
    262 	}
    263 	if (events & EV_WRITE)
    264 		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
    265 	return 0;
    266 }
    267 
    268 static void
    269 be_pair_destruct(struct bufferevent *bev)
    270 {
    271 	struct bufferevent_pair *bev_p = upcast(bev);
    272 
    273 	if (bev_p->partner) {
    274 		bev_p->partner->partner = NULL;
    275 		bev_p->partner = NULL;
    276 	}
    277 
    278 	_bufferevent_del_generic_timeout_cbs(bev);
    279 }
    280 
    281 static int
    282 be_pair_flush(struct bufferevent *bev, short iotype,
    283     enum bufferevent_flush_mode mode)
    284 {
    285 	struct bufferevent_pair *bev_p = upcast(bev);
    286 	struct bufferevent *partner;
    287 	incref_and_lock(bev);
    288 	if (!bev_p->partner)
    289 		return -1;
    290 
    291 	partner = downcast(bev_p->partner);
    292 
    293 	if (mode == BEV_NORMAL)
    294 		return 0;
    295 
    296 	if ((iotype & EV_READ) != 0)
    297 		be_pair_transfer(partner, bev, 1);
    298 
    299 	if ((iotype & EV_WRITE) != 0)
    300 		be_pair_transfer(bev, partner, 1);
    301 
    302 	if (mode == BEV_FINISHED) {
    303 		_bufferevent_run_eventcb(partner, iotype|BEV_EVENT_EOF);
    304 	}
    305 	decref_and_unlock(bev);
    306 	return 0;
    307 }
    308 
    309 struct bufferevent *
    310 bufferevent_pair_get_partner(struct bufferevent *bev)
    311 {
    312 	struct bufferevent_pair *bev_p;
    313 	struct bufferevent *partner = NULL;
    314 	bev_p = upcast(bev);
    315 	if (! bev_p)
    316 		return NULL;
    317 
    318 	incref_and_lock(bev);
    319 	if (bev_p->partner)
    320 		partner = downcast(bev_p->partner);
    321 	decref_and_unlock(bev);
    322 	return partner;
    323 }
    324 
    325 const struct bufferevent_ops bufferevent_ops_pair = {
    326 	"pair_elt",
    327 	evutil_offsetof(struct bufferevent_pair, bev.bev),
    328 	be_pair_enable,
    329 	be_pair_disable,
    330 	be_pair_destruct,
    331 	_bufferevent_generic_adj_timeouts,
    332 	be_pair_flush,
    333 	NULL, /* ctrl */
    334 };
    335