1 /* 2 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 3 * Copyright (c) 2002-2006 Niels Provos <provos (at) citi.umich.edu> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. The name of the author may not be used to endorse or promote products 15 * derived from this software without specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 #include <sys/types.h> 30 31 #include "event2/event-config.h" 32 33 #ifdef _EVENT_HAVE_SYS_TIME_H 34 #include <sys/time.h> 35 #endif 36 37 #include <errno.h> 38 #include <stdio.h> 39 #include <stdlib.h> 40 #include <string.h> 41 #ifdef _EVENT_HAVE_STDARG_H 42 #include <stdarg.h> 43 #endif 44 45 #ifdef WIN32 46 #include <winsock2.h> 47 #endif 48 49 #include "event2/util.h" 50 #include "event2/bufferevent.h" 51 #include "event2/buffer.h" 52 #include "event2/bufferevent_struct.h" 53 #include "event2/event.h" 54 #include "log-internal.h" 55 #include "mm-internal.h" 56 #include "bufferevent-internal.h" 57 #include "util-internal.h" 58 59 /* prototypes */ 60 static int be_filter_enable(struct bufferevent *, short); 61 static int be_filter_disable(struct bufferevent *, short); 62 static void be_filter_destruct(struct bufferevent *); 63 64 static void be_filter_readcb(struct bufferevent *, void *); 65 static void be_filter_writecb(struct bufferevent *, void *); 66 static void be_filter_eventcb(struct bufferevent *, short, void *); 67 static int be_filter_flush(struct bufferevent *bufev, 68 short iotype, enum bufferevent_flush_mode mode); 69 static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 70 71 static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf, 72 const struct evbuffer_cb_info *info, void *arg); 73 74 struct bufferevent_filtered { 75 struct bufferevent_private bev; 76 77 /** The bufferevent that we read/write filtered data from/to. */ 78 struct bufferevent *underlying; 79 /** A callback on our outbuf to notice when somebody adds data */ 80 struct evbuffer_cb_entry *outbuf_cb; 81 /** True iff we have received an EOF callback from the underlying 82 * bufferevent. */ 83 unsigned got_eof; 84 85 /** Function to free context when we're done. */ 86 void (*free_context)(void *); 87 /** Input filter */ 88 bufferevent_filter_cb process_in; 89 /** Output filter */ 90 bufferevent_filter_cb process_out; 91 /** User-supplied argument to the filters. */ 92 void *context; 93 }; 94 95 const struct bufferevent_ops bufferevent_ops_filter = { 96 "filter", 97 evutil_offsetof(struct bufferevent_filtered, bev.bev), 98 be_filter_enable, 99 be_filter_disable, 100 be_filter_destruct, 101 _bufferevent_generic_adj_timeouts, 102 be_filter_flush, 103 be_filter_ctrl, 104 }; 105 106 /* Given a bufferevent that's really the bev filter of a bufferevent_filtered, 107 * return that bufferevent_filtered. Returns NULL otherwise.*/ 108 static inline struct bufferevent_filtered * 109 upcast(struct bufferevent *bev) 110 { 111 struct bufferevent_filtered *bev_f; 112 if (bev->be_ops != &bufferevent_ops_filter) 113 return NULL; 114 bev_f = (void*)( ((char*)bev) - 115 evutil_offsetof(struct bufferevent_filtered, bev.bev)); 116 EVUTIL_ASSERT(bev_f->bev.bev.be_ops == &bufferevent_ops_filter); 117 return bev_f; 118 } 119 120 #define downcast(bev_f) (&(bev_f)->bev.bev) 121 122 /** Return 1 iff bevf's underlying bufferevent's output buffer is at or 123 * over its high watermark such that we should not write to it in a given 124 * flush mode. */ 125 static int 126 be_underlying_writebuf_full(struct bufferevent_filtered *bevf, 127 enum bufferevent_flush_mode state) 128 { 129 struct bufferevent *u = bevf->underlying; 130 return state == BEV_NORMAL && 131 u->wm_write.high && 132 evbuffer_get_length(u->output) >= u->wm_write.high; 133 } 134 135 /** Return 1 if our input buffer is at or over its high watermark such that we 136 * should not write to it in a given flush mode. */ 137 static int 138 be_readbuf_full(struct bufferevent_filtered *bevf, 139 enum bufferevent_flush_mode state) 140 { 141 struct bufferevent *bufev = downcast(bevf); 142 return state == BEV_NORMAL && 143 bufev->wm_read.high && 144 evbuffer_get_length(bufev->input) >= bufev->wm_read.high; 145 } 146 147 148 /* Filter to use when we're created with a NULL filter. */ 149 static enum bufferevent_filter_result 150 be_null_filter(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t lim, 151 enum bufferevent_flush_mode state, void *ctx) 152 { 153 (void)state; 154 if (evbuffer_remove_buffer(src, dst, lim) == 0) 155 return BEV_OK; 156 else 157 return BEV_ERROR; 158 } 159 160 struct bufferevent * 161 bufferevent_filter_new(struct bufferevent *underlying, 162 bufferevent_filter_cb input_filter, 163 bufferevent_filter_cb output_filter, 164 int options, 165 void (*free_context)(void *), 166 void *ctx) 167 { 168 struct bufferevent_filtered *bufev_f; 169 int tmp_options = options & ~BEV_OPT_THREADSAFE; 170 171 if (!underlying) 172 return NULL; 173 174 if (!input_filter) 175 input_filter = be_null_filter; 176 if (!output_filter) 177 output_filter = be_null_filter; 178 179 bufev_f = mm_calloc(1, sizeof(struct bufferevent_filtered)); 180 if (!bufev_f) 181 return NULL; 182 183 if (bufferevent_init_common(&bufev_f->bev, underlying->ev_base, 184 &bufferevent_ops_filter, tmp_options) < 0) { 185 mm_free(bufev_f); 186 return NULL; 187 } 188 if (options & BEV_OPT_THREADSAFE) { 189 bufferevent_enable_locking(downcast(bufev_f), NULL); 190 } 191 192 bufev_f->underlying = underlying; 193 194 bufev_f->process_in = input_filter; 195 bufev_f->process_out = output_filter; 196 bufev_f->free_context = free_context; 197 bufev_f->context = ctx; 198 199 bufferevent_setcb(bufev_f->underlying, 200 be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f); 201 202 bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output, 203 bufferevent_filtered_outbuf_cb, bufev_f); 204 205 _bufferevent_init_generic_timeout_cbs(downcast(bufev_f)); 206 bufferevent_incref(underlying); 207 208 bufferevent_enable(underlying, EV_READ|EV_WRITE); 209 bufferevent_suspend_read(underlying, BEV_SUSPEND_FILT_READ); 210 211 return downcast(bufev_f); 212 } 213 214 static void 215 be_filter_destruct(struct bufferevent *bev) 216 { 217 struct bufferevent_filtered *bevf = upcast(bev); 218 EVUTIL_ASSERT(bevf); 219 if (bevf->free_context) 220 bevf->free_context(bevf->context); 221 222 if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) { 223 /* Yes, there is also a decref in bufferevent_decref. 224 * That decref corresponds to the incref when we set 225 * underlying for the first time. This decref is an 226 * extra one to remove the last reference. 227 */ 228 if (BEV_UPCAST(bevf->underlying)->refcnt < 2) { 229 event_warnx("BEV_OPT_CLOSE_ON_FREE set on an " 230 "bufferevent with too few references"); 231 } else { 232 bufferevent_free(bevf->underlying); 233 } 234 } else { 235 if (bevf->underlying) { 236 if (bevf->underlying->errorcb == be_filter_eventcb) 237 bufferevent_setcb(bevf->underlying, 238 NULL, NULL, NULL, NULL); 239 bufferevent_unsuspend_read(bevf->underlying, 240 BEV_SUSPEND_FILT_READ); 241 } 242 } 243 244 _bufferevent_del_generic_timeout_cbs(bev); 245 } 246 247 static int 248 be_filter_enable(struct bufferevent *bev, short event) 249 { 250 struct bufferevent_filtered *bevf = upcast(bev); 251 if (event & EV_WRITE) 252 BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); 253 254 if (event & EV_READ) { 255 BEV_RESET_GENERIC_READ_TIMEOUT(bev); 256 bufferevent_unsuspend_read(bevf->underlying, 257 BEV_SUSPEND_FILT_READ); 258 } 259 return 0; 260 } 261 262 static int 263 be_filter_disable(struct bufferevent *bev, short event) 264 { 265 struct bufferevent_filtered *bevf = upcast(bev); 266 if (event & EV_WRITE) 267 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 268 if (event & EV_READ) { 269 BEV_DEL_GENERIC_READ_TIMEOUT(bev); 270 bufferevent_suspend_read(bevf->underlying, 271 BEV_SUSPEND_FILT_READ); 272 } 273 return 0; 274 } 275 276 static enum bufferevent_filter_result 277 be_filter_process_input(struct bufferevent_filtered *bevf, 278 enum bufferevent_flush_mode state, 279 int *processed_out) 280 { 281 enum bufferevent_filter_result res; 282 struct bufferevent *bev = downcast(bevf); 283 284 if (state == BEV_NORMAL) { 285 /* If we're in 'normal' mode, don't urge data on the filter 286 * unless we're reading data and under our high-water mark.*/ 287 if (!(bev->enabled & EV_READ) || 288 be_readbuf_full(bevf, state)) 289 return BEV_OK; 290 } 291 292 do { 293 ev_ssize_t limit = -1; 294 if (state == BEV_NORMAL && bev->wm_read.high) 295 limit = bev->wm_read.high - 296 evbuffer_get_length(bev->input); 297 298 res = bevf->process_in(bevf->underlying->input, 299 bev->input, limit, state, bevf->context); 300 301 if (res == BEV_OK) 302 *processed_out = 1; 303 } while (res == BEV_OK && 304 (bev->enabled & EV_READ) && 305 evbuffer_get_length(bevf->underlying->input) && 306 !be_readbuf_full(bevf, state)); 307 308 if (*processed_out) 309 BEV_RESET_GENERIC_READ_TIMEOUT(bev); 310 311 return res; 312 } 313 314 315 static enum bufferevent_filter_result 316 be_filter_process_output(struct bufferevent_filtered *bevf, 317 enum bufferevent_flush_mode state, 318 int *processed_out) 319 { 320 /* Requires references and lock: might call writecb */ 321 enum bufferevent_filter_result res = BEV_OK; 322 struct bufferevent *bufev = downcast(bevf); 323 int again = 0; 324 325 if (state == BEV_NORMAL) { 326 /* If we're in 'normal' mode, don't urge data on the 327 * filter unless we're writing data, and the underlying 328 * bufferevent is accepting data, and we have data to 329 * give the filter. If we're in 'flush' or 'finish', 330 * call the filter no matter what. */ 331 if (!(bufev->enabled & EV_WRITE) || 332 be_underlying_writebuf_full(bevf, state) || 333 !evbuffer_get_length(bufev->output)) 334 return BEV_OK; 335 } 336 337 /* disable the callback that calls this function 338 when the user adds to the output buffer. */ 339 evbuffer_cb_set_flags(bufev->output, bevf->outbuf_cb, 0); 340 341 do { 342 int processed = 0; 343 again = 0; 344 345 do { 346 ev_ssize_t limit = -1; 347 if (state == BEV_NORMAL && 348 bevf->underlying->wm_write.high) 349 limit = bevf->underlying->wm_write.high - 350 evbuffer_get_length(bevf->underlying->output); 351 352 res = bevf->process_out(downcast(bevf)->output, 353 bevf->underlying->output, 354 limit, 355 state, 356 bevf->context); 357 358 if (res == BEV_OK) 359 processed = *processed_out = 1; 360 } while (/* Stop if the filter wasn't successful...*/ 361 res == BEV_OK && 362 /* Or if we aren't writing any more. */ 363 (bufev->enabled & EV_WRITE) && 364 /* Of if we have nothing more to write and we are 365 * not flushing. */ 366 evbuffer_get_length(bufev->output) && 367 /* Or if we have filled the underlying output buffer. */ 368 !be_underlying_writebuf_full(bevf,state)); 369 370 if (processed && 371 evbuffer_get_length(bufev->output) <= bufev->wm_write.low) { 372 /* call the write callback.*/ 373 _bufferevent_run_writecb(bufev); 374 375 if (res == BEV_OK && 376 (bufev->enabled & EV_WRITE) && 377 evbuffer_get_length(bufev->output) && 378 !be_underlying_writebuf_full(bevf, state)) { 379 again = 1; 380 } 381 } 382 } while (again); 383 384 /* reenable the outbuf_cb */ 385 evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb, 386 EVBUFFER_CB_ENABLED); 387 388 if (*processed_out) 389 BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev); 390 391 return res; 392 } 393 394 /* Called when the size of our outbuf changes. */ 395 static void 396 bufferevent_filtered_outbuf_cb(struct evbuffer *buf, 397 const struct evbuffer_cb_info *cbinfo, void *arg) 398 { 399 struct bufferevent_filtered *bevf = arg; 400 struct bufferevent *bev = downcast(bevf); 401 402 if (cbinfo->n_added) { 403 int processed_any = 0; 404 /* Somebody added more data to the output buffer. Try to 405 * process it, if we should. */ 406 _bufferevent_incref_and_lock(bev); 407 be_filter_process_output(bevf, BEV_NORMAL, &processed_any); 408 _bufferevent_decref_and_unlock(bev); 409 } 410 } 411 412 /* Called when the underlying socket has read. */ 413 static void 414 be_filter_readcb(struct bufferevent *underlying, void *_me) 415 { 416 struct bufferevent_filtered *bevf = _me; 417 enum bufferevent_filter_result res; 418 enum bufferevent_flush_mode state; 419 struct bufferevent *bufev = downcast(bevf); 420 int processed_any = 0; 421 422 _bufferevent_incref_and_lock(bufev); 423 424 if (bevf->got_eof) 425 state = BEV_FINISHED; 426 else 427 state = BEV_NORMAL; 428 429 /* XXXX use return value */ 430 res = be_filter_process_input(bevf, state, &processed_any); 431 (void)res; 432 433 /* XXX This should be in process_input, not here. There are 434 * other places that can call process-input, and they should 435 * force readcb calls as needed. */ 436 if (processed_any && 437 evbuffer_get_length(bufev->input) >= bufev->wm_read.low) 438 _bufferevent_run_readcb(bufev); 439 440 _bufferevent_decref_and_unlock(bufev); 441 } 442 443 /* Called when the underlying socket has drained enough that we can write to 444 it. */ 445 static void 446 be_filter_writecb(struct bufferevent *underlying, void *_me) 447 { 448 struct bufferevent_filtered *bevf = _me; 449 struct bufferevent *bev = downcast(bevf); 450 int processed_any = 0; 451 452 _bufferevent_incref_and_lock(bev); 453 be_filter_process_output(bevf, BEV_NORMAL, &processed_any); 454 _bufferevent_decref_and_unlock(bev); 455 } 456 457 /* Called when the underlying socket has given us an error */ 458 static void 459 be_filter_eventcb(struct bufferevent *underlying, short what, void *_me) 460 { 461 struct bufferevent_filtered *bevf = _me; 462 struct bufferevent *bev = downcast(bevf); 463 464 _bufferevent_incref_and_lock(bev); 465 /* All we can really to is tell our own eventcb. */ 466 _bufferevent_run_eventcb(bev, what); 467 _bufferevent_decref_and_unlock(bev); 468 } 469 470 static int 471 be_filter_flush(struct bufferevent *bufev, 472 short iotype, enum bufferevent_flush_mode mode) 473 { 474 struct bufferevent_filtered *bevf = upcast(bufev); 475 int processed_any = 0; 476 EVUTIL_ASSERT(bevf); 477 478 _bufferevent_incref_and_lock(bufev); 479 480 if (iotype & EV_READ) { 481 be_filter_process_input(bevf, mode, &processed_any); 482 } 483 if (iotype & EV_WRITE) { 484 be_filter_process_output(bevf, mode, &processed_any); 485 } 486 /* XXX check the return value? */ 487 /* XXX does this want to recursively call lower-level flushes? */ 488 bufferevent_flush(bevf->underlying, iotype, mode); 489 490 _bufferevent_decref_and_unlock(bufev); 491 492 return processed_any; 493 } 494 495 static int 496 be_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 497 union bufferevent_ctrl_data *data) 498 { 499 struct bufferevent_filtered *bevf; 500 switch (op) { 501 case BEV_CTRL_GET_UNDERLYING: 502 bevf = upcast(bev); 503 data->ptr = bevf->underlying; 504 return 0; 505 case BEV_CTRL_GET_FD: 506 case BEV_CTRL_SET_FD: 507 case BEV_CTRL_CANCEL_ALL: 508 default: 509 return -1; 510 } 511 } 512