1 /* 2 * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 1. Redistributions of source code must retain the above copyright 8 * notice, this list of conditions and the following disclaimer. 9 * 2. Redistributions in binary form must reproduce the above copyright 10 * notice, this list of conditions and the following disclaimer in the 11 * documentation and/or other materials provided with the distribution. 12 * 3. The name of the author may not be used to endorse or promote products 13 * derived from this software without specific prior written permission. 14 * 15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 16 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 17 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 18 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 19 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 20 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 21 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 22 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 24 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 25 */ 26 27 #include <sys/types.h> 28 29 #ifdef WIN32 30 #include <winsock2.h> 31 #endif 32 33 #include "event2/event-config.h" 34 35 #include "event2/util.h" 36 #include "event2/buffer.h" 37 #include "event2/bufferevent.h" 38 #include "event2/bufferevent_struct.h" 39 #include "event2/event.h" 40 #include "defer-internal.h" 41 #include "bufferevent-internal.h" 42 #include "mm-internal.h" 43 #include "util-internal.h" 44 45 struct bufferevent_pair { 46 struct bufferevent_private bev; 47 struct bufferevent_pair *partner; 48 }; 49 50 51 /* Given a bufferevent that's really a bev part of a bufferevent_pair, 52 * return that bufferevent_filtered. Returns NULL otherwise.*/ 53 static inline struct bufferevent_pair * 54 upcast(struct bufferevent *bev) 55 { 56 struct bufferevent_pair *bev_p; 57 if (bev->be_ops != &bufferevent_ops_pair) 58 return NULL; 59 bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev.bev); 60 EVUTIL_ASSERT(bev_p->bev.bev.be_ops == &bufferevent_ops_pair); 61 return bev_p; 62 } 63 64 #define downcast(bev_pair) (&(bev_pair)->bev.bev) 65 66 static inline void 67 incref_and_lock(struct bufferevent *b) 68 { 69 struct bufferevent_pair *bevp; 70 _bufferevent_incref_and_lock(b); 71 bevp = upcast(b); 72 if (bevp->partner) 73 _bufferevent_incref_and_lock(downcast(bevp->partner)); 74 } 75 76 static inline void 77 decref_and_unlock(struct bufferevent *b) 78 { 79 struct bufferevent_pair *bevp = upcast(b); 80 if (bevp->partner) 81 _bufferevent_decref_and_unlock(downcast(bevp->partner)); 82 _bufferevent_decref_and_unlock(b); 83 } 84 85 /* XXX Handle close */ 86 87 static void be_pair_outbuf_cb(struct evbuffer *, 88 const struct evbuffer_cb_info *, void *); 89 90 static struct bufferevent_pair * 91 bufferevent_pair_elt_new(struct event_base *base, 92 int options) 93 { 94 struct bufferevent_pair *bufev; 95 if (! (bufev = mm_calloc(1, sizeof(struct bufferevent_pair)))) 96 return NULL; 97 if (bufferevent_init_common(&bufev->bev, base, &bufferevent_ops_pair, 98 options)) { 99 mm_free(bufev); 100 return NULL; 101 } 102 if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) { 103 bufferevent_free(downcast(bufev)); 104 return NULL; 105 } 106 107 _bufferevent_init_generic_timeout_cbs(&bufev->bev.bev); 108 109 return bufev; 110 } 111 112 int 113 bufferevent_pair_new(struct event_base *base, int options, 114 struct bufferevent *pair[2]) 115 { 116 struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL; 117 int tmp_options; 118 119 options |= BEV_OPT_DEFER_CALLBACKS; 120 tmp_options = options & ~BEV_OPT_THREADSAFE; 121 122 bufev1 = bufferevent_pair_elt_new(base, options); 123 if (!bufev1) 124 return -1; 125 bufev2 = bufferevent_pair_elt_new(base, tmp_options); 126 if (!bufev2) { 127 bufferevent_free(downcast(bufev1)); 128 return -1; 129 } 130 131 if (options & BEV_OPT_THREADSAFE) { 132 /*XXXX check return */ 133 bufferevent_enable_locking(downcast(bufev2), bufev1->bev.lock); 134 } 135 136 bufev1->partner = bufev2; 137 bufev2->partner = bufev1; 138 139 evbuffer_freeze(downcast(bufev1)->input, 0); 140 evbuffer_freeze(downcast(bufev1)->output, 1); 141 evbuffer_freeze(downcast(bufev2)->input, 0); 142 evbuffer_freeze(downcast(bufev2)->output, 1); 143 144 pair[0] = downcast(bufev1); 145 pair[1] = downcast(bufev2); 146 147 return 0; 148 } 149 150 static void 151 be_pair_transfer(struct bufferevent *src, struct bufferevent *dst, 152 int ignore_wm) 153 { 154 size_t src_size, dst_size; 155 size_t n; 156 157 evbuffer_unfreeze(src->output, 1); 158 evbuffer_unfreeze(dst->input, 0); 159 160 if (dst->wm_read.high) { 161 dst_size = evbuffer_get_length(dst->input); 162 if (dst_size < dst->wm_read.high) { 163 n = dst->wm_read.high - dst_size; 164 evbuffer_remove_buffer(src->output, dst->input, n); 165 } else { 166 if (!ignore_wm) 167 goto done; 168 n = evbuffer_get_length(src->output); 169 evbuffer_add_buffer(dst->input, src->output); 170 } 171 } else { 172 n = evbuffer_get_length(src->output); 173 evbuffer_add_buffer(dst->input, src->output); 174 } 175 176 if (n) { 177 BEV_RESET_GENERIC_READ_TIMEOUT(dst); 178 179 if (evbuffer_get_length(dst->output)) 180 BEV_RESET_GENERIC_WRITE_TIMEOUT(dst); 181 else 182 BEV_DEL_GENERIC_WRITE_TIMEOUT(dst); 183 } 184 185 src_size = evbuffer_get_length(src->output); 186 dst_size = evbuffer_get_length(dst->input); 187 188 if (dst_size >= dst->wm_read.low) { 189 _bufferevent_run_readcb(dst); 190 } 191 if (src_size <= src->wm_write.low) { 192 _bufferevent_run_writecb(src); 193 } 194 done: 195 evbuffer_freeze(src->output, 1); 196 evbuffer_freeze(dst->input, 0); 197 } 198 199 static inline int 200 be_pair_wants_to_talk(struct bufferevent_pair *src, 201 struct bufferevent_pair *dst) 202 { 203 return (downcast(src)->enabled & EV_WRITE) && 204 (downcast(dst)->enabled & EV_READ) && 205 !dst->bev.read_suspended && 206 evbuffer_get_length(downcast(src)->output); 207 } 208 209 static void 210 be_pair_outbuf_cb(struct evbuffer *outbuf, 211 const struct evbuffer_cb_info *info, void *arg) 212 { 213 struct bufferevent_pair *bev_pair = arg; 214 struct bufferevent_pair *partner = bev_pair->partner; 215 216 incref_and_lock(downcast(bev_pair)); 217 218 if (info->n_added > info->n_deleted && partner) { 219 /* We got more data. If the other side's reading, then 220 hand it over. */ 221 if (be_pair_wants_to_talk(bev_pair, partner)) { 222 be_pair_transfer(downcast(bev_pair), downcast(partner), 0); 223 } 224 } 225 226 decref_and_unlock(downcast(bev_pair)); 227 } 228 229 static int 230 be_pair_enable(struct bufferevent *bufev, short events) 231 { 232 struct bufferevent_pair *bev_p = upcast(bufev); 233 struct bufferevent_pair *partner = bev_p->partner; 234 235 incref_and_lock(bufev); 236 237 if (events & EV_READ) { 238 BEV_RESET_GENERIC_READ_TIMEOUT(bufev); 239 } 240 if ((events & EV_WRITE) && evbuffer_get_length(bufev->output)) 241 BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev); 242 243 /* We're starting to read! Does the other side have anything to write?*/ 244 if ((events & EV_READ) && partner && 245 be_pair_wants_to_talk(partner, bev_p)) { 246 be_pair_transfer(downcast(partner), bufev, 0); 247 } 248 /* We're starting to write! Does the other side want to read? */ 249 if ((events & EV_WRITE) && partner && 250 be_pair_wants_to_talk(bev_p, partner)) { 251 be_pair_transfer(bufev, downcast(partner), 0); 252 } 253 decref_and_unlock(bufev); 254 return 0; 255 } 256 257 static int 258 be_pair_disable(struct bufferevent *bev, short events) 259 { 260 if (events & EV_READ) { 261 BEV_DEL_GENERIC_READ_TIMEOUT(bev); 262 } 263 if (events & EV_WRITE) 264 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 265 return 0; 266 } 267 268 static void 269 be_pair_destruct(struct bufferevent *bev) 270 { 271 struct bufferevent_pair *bev_p = upcast(bev); 272 273 if (bev_p->partner) { 274 bev_p->partner->partner = NULL; 275 bev_p->partner = NULL; 276 } 277 278 _bufferevent_del_generic_timeout_cbs(bev); 279 } 280 281 static int 282 be_pair_flush(struct bufferevent *bev, short iotype, 283 enum bufferevent_flush_mode mode) 284 { 285 struct bufferevent_pair *bev_p = upcast(bev); 286 struct bufferevent *partner; 287 incref_and_lock(bev); 288 if (!bev_p->partner) 289 return -1; 290 291 partner = downcast(bev_p->partner); 292 293 if (mode == BEV_NORMAL) 294 return 0; 295 296 if ((iotype & EV_READ) != 0) 297 be_pair_transfer(partner, bev, 1); 298 299 if ((iotype & EV_WRITE) != 0) 300 be_pair_transfer(bev, partner, 1); 301 302 if (mode == BEV_FINISHED) { 303 _bufferevent_run_eventcb(partner, iotype|BEV_EVENT_EOF); 304 } 305 decref_and_unlock(bev); 306 return 0; 307 } 308 309 struct bufferevent * 310 bufferevent_pair_get_partner(struct bufferevent *bev) 311 { 312 struct bufferevent_pair *bev_p; 313 struct bufferevent *partner = NULL; 314 bev_p = upcast(bev); 315 if (! bev_p) 316 return NULL; 317 318 incref_and_lock(bev); 319 if (bev_p->partner) 320 partner = downcast(bev_p->partner); 321 decref_and_unlock(bev); 322 return partner; 323 } 324 325 const struct bufferevent_ops bufferevent_ops_pair = { 326 "pair_elt", 327 evutil_offsetof(struct bufferevent_pair, bev.bev), 328 be_pair_enable, 329 be_pair_disable, 330 be_pair_destruct, 331 _bufferevent_generic_adj_timeouts, 332 be_pair_flush, 333 NULL, /* ctrl */ 334 }; 335