1 /* 2 * Copyright (c) 2002-2004 Niels Provos <provos (at) citi.umich.edu> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include <sys/types.h> 29 30 #ifdef HAVE_CONFIG_H 31 #include "config.h" 32 #endif 33 34 #ifdef HAVE_SYS_TIME_H 35 #include <sys/time.h> 36 #endif 37 38 #include <errno.h> 39 #include <stdio.h> 40 #include <stdlib.h> 41 #include <string.h> 42 #ifdef HAVE_STDARG_H 43 #include <stdarg.h> 44 #endif 45 46 #ifdef WIN32 47 #include <winsock2.h> 48 #endif 49 50 #include "evutil.h" 51 #include "event.h" 52 53 /* prototypes */ 54 55 void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *); 56 57 static int 58 bufferevent_add(struct event *ev, int timeout) 59 { 60 struct timeval tv, *ptv = NULL; 61 62 if (timeout) { 63 evutil_timerclear(&tv); 64 tv.tv_sec = timeout; 65 ptv = &tv; 66 } 67 68 return (event_add(ev, ptv)); 69 } 70 71 /* 72 * This callback is executed when the size of the input buffer changes. 73 * We use it to apply back pressure on the reading side. 74 */ 75 76 void 77 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now, 78 void *arg) { 79 struct bufferevent *bufev = arg; 80 /* 81 * If we are below the watermark then reschedule reading if it's 82 * still enabled. 83 */ 84 if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) { 85 evbuffer_setcb(buf, NULL, NULL); 86 87 if (bufev->enabled & EV_READ) 88 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 89 } 90 } 91 92 static void 93 bufferevent_readcb(int fd, short event, void *arg) 94 { 95 struct bufferevent *bufev = arg; 96 int res = 0; 97 short what = EVBUFFER_READ; 98 size_t len; 99 int howmuch = -1; 100 101 if (event == EV_TIMEOUT) { 102 what |= EVBUFFER_TIMEOUT; 103 goto error; 104 } 105 106 /* 107 * If we have a high watermark configured then we don't want to 108 * read more data than would make us reach the watermark. 109 */ 110 if (bufev->wm_read.high != 0) { 111 howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input); 112 /* we might have lowered the watermark, stop reading */ 113 if (howmuch <= 0) { 114 struct evbuffer *buf = bufev->input; 115 event_del(&bufev->ev_read); 116 evbuffer_setcb(buf, 117 bufferevent_read_pressure_cb, bufev); 118 return; 119 } 120 } 121 122 res = evbuffer_read(bufev->input, fd, howmuch); 123 if (res == -1) { 124 if (errno == EAGAIN || errno == EINTR) 125 goto reschedule; 126 /* error case */ 127 what |= EVBUFFER_ERROR; 128 } else if (res == 0) { 129 /* eof case */ 130 what |= EVBUFFER_EOF; 131 } 132 133 if (res <= 0) 134 goto error; 135 136 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 137 138 /* See if this callbacks meets the water marks */ 139 len = EVBUFFER_LENGTH(bufev->input); 140 if (bufev->wm_read.low != 0 && len < bufev->wm_read.low) 141 return; 142 if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) { 143 struct evbuffer *buf = bufev->input; 144 event_del(&bufev->ev_read); 145 146 /* Now schedule a callback for us when the buffer changes */ 147 evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev); 148 } 149 150 /* Invoke the user callback - must always be called last */ 151 if (bufev->readcb != NULL) 152 (*bufev->readcb)(bufev, bufev->cbarg); 153 return; 154 155 reschedule: 156 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 157 return; 158 159 error: 160 (*bufev->errorcb)(bufev, what, bufev->cbarg); 161 } 162 163 static void 164 bufferevent_writecb(int fd, short event, void *arg) 165 { 166 struct bufferevent *bufev = arg; 167 int res = 0; 168 short what = EVBUFFER_WRITE; 169 170 if (event == EV_TIMEOUT) { 171 what |= EVBUFFER_TIMEOUT; 172 goto error; 173 } 174 175 if (EVBUFFER_LENGTH(bufev->output)) { 176 res = evbuffer_write(bufev->output, fd); 177 if (res == -1) { 178 #ifndef WIN32 179 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not 180 *set errno. thus this error checking is not portable*/ 181 if (errno == EAGAIN || 182 errno == EINTR || 183 errno == EINPROGRESS) 184 goto reschedule; 185 /* error case */ 186 what |= EVBUFFER_ERROR; 187 188 #else 189 goto reschedule; 190 #endif 191 192 } else if (res == 0) { 193 /* eof case */ 194 what |= EVBUFFER_EOF; 195 } 196 if (res <= 0) 197 goto error; 198 } 199 200 if (EVBUFFER_LENGTH(bufev->output) != 0) 201 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 202 203 /* 204 * Invoke the user callback if our buffer is drained or below the 205 * low watermark. 206 */ 207 if (bufev->writecb != NULL && 208 EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) 209 (*bufev->writecb)(bufev, bufev->cbarg); 210 211 return; 212 213 reschedule: 214 if (EVBUFFER_LENGTH(bufev->output) != 0) 215 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 216 return; 217 218 error: 219 (*bufev->errorcb)(bufev, what, bufev->cbarg); 220 } 221 222 /* 223 * Create a new buffered event object. 224 * 225 * The read callback is invoked whenever we read new data. 226 * The write callback is invoked whenever the output buffer is drained. 227 * The error callback is invoked on a write/read error or on EOF. 228 * 229 * Both read and write callbacks maybe NULL. The error callback is not 230 * allowed to be NULL and have to be provided always. 231 */ 232 233 struct bufferevent * 234 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb, 235 everrorcb errorcb, void *cbarg) 236 { 237 struct bufferevent *bufev; 238 239 if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL) 240 return (NULL); 241 242 if ((bufev->input = evbuffer_new()) == NULL) { 243 free(bufev); 244 return (NULL); 245 } 246 247 if ((bufev->output = evbuffer_new()) == NULL) { 248 evbuffer_free(bufev->input); 249 free(bufev); 250 return (NULL); 251 } 252 253 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev); 254 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev); 255 256 bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg); 257 258 /* 259 * Set to EV_WRITE so that using bufferevent_write is going to 260 * trigger a callback. Reading needs to be explicitly enabled 261 * because otherwise no data will be available. 262 */ 263 bufev->enabled = EV_WRITE; 264 265 return (bufev); 266 } 267 268 void 269 bufferevent_setcb(struct bufferevent *bufev, 270 evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg) 271 { 272 bufev->readcb = readcb; 273 bufev->writecb = writecb; 274 bufev->errorcb = errorcb; 275 276 bufev->cbarg = cbarg; 277 } 278 279 void 280 bufferevent_setfd(struct bufferevent *bufev, int fd) 281 { 282 event_del(&bufev->ev_read); 283 event_del(&bufev->ev_write); 284 285 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev); 286 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev); 287 if (bufev->ev_base != NULL) { 288 event_base_set(bufev->ev_base, &bufev->ev_read); 289 event_base_set(bufev->ev_base, &bufev->ev_write); 290 } 291 292 /* might have to manually trigger event registration */ 293 } 294 295 int 296 bufferevent_priority_set(struct bufferevent *bufev, int priority) 297 { 298 if (event_priority_set(&bufev->ev_read, priority) == -1) 299 return (-1); 300 if (event_priority_set(&bufev->ev_write, priority) == -1) 301 return (-1); 302 303 return (0); 304 } 305 306 /* Closing the file descriptor is the responsibility of the caller */ 307 308 void 309 bufferevent_free(struct bufferevent *bufev) 310 { 311 event_del(&bufev->ev_read); 312 event_del(&bufev->ev_write); 313 314 evbuffer_free(bufev->input); 315 evbuffer_free(bufev->output); 316 317 free(bufev); 318 } 319 320 /* 321 * Returns 0 on success; 322 * -1 on failure. 323 */ 324 325 int 326 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) 327 { 328 int res; 329 330 res = evbuffer_add(bufev->output, data, size); 331 332 if (res == -1) 333 return (res); 334 335 /* If everything is okay, we need to schedule a write */ 336 if (size > 0 && (bufev->enabled & EV_WRITE)) 337 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 338 339 return (res); 340 } 341 342 int 343 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf) 344 { 345 int res; 346 347 res = bufferevent_write(bufev, buf->buffer, buf->off); 348 if (res != -1) 349 evbuffer_drain(buf, buf->off); 350 351 return (res); 352 } 353 354 size_t 355 bufferevent_read(struct bufferevent *bufev, void *data, size_t size) 356 { 357 struct evbuffer *buf = bufev->input; 358 359 if (buf->off < size) 360 size = buf->off; 361 362 /* Copy the available data to the user buffer */ 363 memcpy(data, buf->buffer, size); 364 365 if (size) 366 evbuffer_drain(buf, size); 367 368 return (size); 369 } 370 371 int 372 bufferevent_enable(struct bufferevent *bufev, short event) 373 { 374 if (event & EV_READ) { 375 if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1) 376 return (-1); 377 } 378 if (event & EV_WRITE) { 379 if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1) 380 return (-1); 381 } 382 383 bufev->enabled |= event; 384 return (0); 385 } 386 387 int 388 bufferevent_disable(struct bufferevent *bufev, short event) 389 { 390 if (event & EV_READ) { 391 if (event_del(&bufev->ev_read) == -1) 392 return (-1); 393 } 394 if (event & EV_WRITE) { 395 if (event_del(&bufev->ev_write) == -1) 396 return (-1); 397 } 398 399 bufev->enabled &= ~event; 400 return (0); 401 } 402 403 /* 404 * Sets the read and write timeout for a buffered event. 405 */ 406 407 void 408 bufferevent_settimeout(struct bufferevent *bufev, 409 int timeout_read, int timeout_write) { 410 bufev->timeout_read = timeout_read; 411 bufev->timeout_write = timeout_write; 412 413 if (event_pending(&bufev->ev_read, EV_READ, NULL)) 414 bufferevent_add(&bufev->ev_read, timeout_read); 415 if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) 416 bufferevent_add(&bufev->ev_write, timeout_write); 417 } 418 419 /* 420 * Sets the water marks 421 */ 422 423 void 424 bufferevent_setwatermark(struct bufferevent *bufev, short events, 425 size_t lowmark, size_t highmark) 426 { 427 if (events & EV_READ) { 428 bufev->wm_read.low = lowmark; 429 bufev->wm_read.high = highmark; 430 } 431 432 if (events & EV_WRITE) { 433 bufev->wm_write.low = lowmark; 434 bufev->wm_write.high = highmark; 435 } 436 437 /* If the watermarks changed then see if we should call read again */ 438 bufferevent_read_pressure_cb(bufev->input, 439 0, EVBUFFER_LENGTH(bufev->input), bufev); 440 } 441 442 int 443 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev) 444 { 445 int res; 446 447 bufev->ev_base = base; 448 449 res = event_base_set(base, &bufev->ev_read); 450 if (res == -1) 451 return (res); 452 453 res = event_base_set(base, &bufev->ev_write); 454 return (res); 455 } 456