Home | History | Annotate | Download | only in libevent
      1 /*
      2  * Copyright (c) 2002-2004 Niels Provos <provos (at) citi.umich.edu>
      3  * All rights reserved.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions
      7  * are met:
      8  * 1. Redistributions of source code must retain the above copyright
      9  *    notice, this list of conditions and the following disclaimer.
     10  * 2. Redistributions in binary form must reproduce the above copyright
     11  *    notice, this list of conditions and the following disclaimer in the
     12  *    documentation and/or other materials provided with the distribution.
     13  * 3. The name of the author may not be used to endorse or promote products
     14  *    derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     17  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     18  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     19  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     20  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     21  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     22  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     23  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     25  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #include <sys/types.h>
     29 
     30 #ifdef HAVE_CONFIG_H
     31 #include "config.h"
     32 #endif
     33 
     34 #ifdef HAVE_SYS_TIME_H
     35 #include <sys/time.h>
     36 #endif
     37 
     38 #include <errno.h>
     39 #include <stdio.h>
     40 #include <stdlib.h>
     41 #include <string.h>
     42 #ifdef HAVE_STDARG_H
     43 #include <stdarg.h>
     44 #endif
     45 
     46 #ifdef WIN32
     47 #include <winsock2.h>
     48 #endif
     49 
     50 #include "evutil.h"
     51 #include "event.h"
     52 
     53 /* prototypes */
     54 
     55 void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
     56 
     57 static int
     58 bufferevent_add(struct event *ev, int timeout)
     59 {
     60 	struct timeval tv, *ptv = NULL;
     61 
     62 	if (timeout) {
     63 		evutil_timerclear(&tv);
     64 		tv.tv_sec = timeout;
     65 		ptv = &tv;
     66 	}
     67 
     68 	return (event_add(ev, ptv));
     69 }
     70 
     71 /*
     72  * This callback is executed when the size of the input buffer changes.
     73  * We use it to apply back pressure on the reading side.
     74  */
     75 
     76 void
     77 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
     78     void *arg) {
     79 	struct bufferevent *bufev = arg;
     80 	/*
     81 	 * If we are below the watermark then reschedule reading if it's
     82 	 * still enabled.
     83 	 */
     84 	if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
     85 		evbuffer_setcb(buf, NULL, NULL);
     86 
     87 		if (bufev->enabled & EV_READ)
     88 			bufferevent_add(&bufev->ev_read, bufev->timeout_read);
     89 	}
     90 }
     91 
     92 static void
     93 bufferevent_readcb(int fd, short event, void *arg)
     94 {
     95 	struct bufferevent *bufev = arg;
     96 	int res = 0;
     97 	short what = EVBUFFER_READ;
     98 	size_t len;
     99 	int howmuch = -1;
    100 
    101 	if (event == EV_TIMEOUT) {
    102 		what |= EVBUFFER_TIMEOUT;
    103 		goto error;
    104 	}
    105 
    106 	/*
    107 	 * If we have a high watermark configured then we don't want to
    108 	 * read more data than would make us reach the watermark.
    109 	 */
    110 	if (bufev->wm_read.high != 0) {
    111 		howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
    112 		/* we might have lowered the watermark, stop reading */
    113 		if (howmuch <= 0) {
    114 			struct evbuffer *buf = bufev->input;
    115 			event_del(&bufev->ev_read);
    116 			evbuffer_setcb(buf,
    117 			    bufferevent_read_pressure_cb, bufev);
    118 			return;
    119 		}
    120 	}
    121 
    122 	res = evbuffer_read(bufev->input, fd, howmuch);
    123 	if (res == -1) {
    124 		if (errno == EAGAIN || errno == EINTR)
    125 			goto reschedule;
    126 		/* error case */
    127 		what |= EVBUFFER_ERROR;
    128 	} else if (res == 0) {
    129 		/* eof case */
    130 		what |= EVBUFFER_EOF;
    131 	}
    132 
    133 	if (res <= 0)
    134 		goto error;
    135 
    136 	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
    137 
    138 	/* See if this callbacks meets the water marks */
    139 	len = EVBUFFER_LENGTH(bufev->input);
    140 	if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
    141 		return;
    142 	if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
    143 		struct evbuffer *buf = bufev->input;
    144 		event_del(&bufev->ev_read);
    145 
    146 		/* Now schedule a callback for us when the buffer changes */
    147 		evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
    148 	}
    149 
    150 	/* Invoke the user callback - must always be called last */
    151 	if (bufev->readcb != NULL)
    152 		(*bufev->readcb)(bufev, bufev->cbarg);
    153 	return;
    154 
    155  reschedule:
    156 	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
    157 	return;
    158 
    159  error:
    160 	(*bufev->errorcb)(bufev, what, bufev->cbarg);
    161 }
    162 
    163 static void
    164 bufferevent_writecb(int fd, short event, void *arg)
    165 {
    166 	struct bufferevent *bufev = arg;
    167 	int res = 0;
    168 	short what = EVBUFFER_WRITE;
    169 
    170 	if (event == EV_TIMEOUT) {
    171 		what |= EVBUFFER_TIMEOUT;
    172 		goto error;
    173 	}
    174 
    175 	if (EVBUFFER_LENGTH(bufev->output)) {
    176 	    res = evbuffer_write(bufev->output, fd);
    177 	    if (res == -1) {
    178 #ifndef WIN32
    179 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
    180  *set errno. thus this error checking is not portable*/
    181 		    if (errno == EAGAIN ||
    182 			errno == EINTR ||
    183 			errno == EINPROGRESS)
    184 			    goto reschedule;
    185 		    /* error case */
    186 		    what |= EVBUFFER_ERROR;
    187 
    188 #else
    189 				goto reschedule;
    190 #endif
    191 
    192 	    } else if (res == 0) {
    193 		    /* eof case */
    194 		    what |= EVBUFFER_EOF;
    195 	    }
    196 	    if (res <= 0)
    197 		    goto error;
    198 	}
    199 
    200 	if (EVBUFFER_LENGTH(bufev->output) != 0)
    201 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
    202 
    203 	/*
    204 	 * Invoke the user callback if our buffer is drained or below the
    205 	 * low watermark.
    206 	 */
    207 	if (bufev->writecb != NULL &&
    208 	    EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
    209 		(*bufev->writecb)(bufev, bufev->cbarg);
    210 
    211 	return;
    212 
    213  reschedule:
    214 	if (EVBUFFER_LENGTH(bufev->output) != 0)
    215 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
    216 	return;
    217 
    218  error:
    219 	(*bufev->errorcb)(bufev, what, bufev->cbarg);
    220 }
    221 
    222 /*
    223  * Create a new buffered event object.
    224  *
    225  * The read callback is invoked whenever we read new data.
    226  * The write callback is invoked whenever the output buffer is drained.
    227  * The error callback is invoked on a write/read error or on EOF.
    228  *
    229  * Both read and write callbacks maybe NULL.  The error callback is not
    230  * allowed to be NULL and have to be provided always.
    231  */
    232 
    233 struct bufferevent *
    234 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
    235     everrorcb errorcb, void *cbarg)
    236 {
    237 	struct bufferevent *bufev;
    238 
    239 	if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
    240 		return (NULL);
    241 
    242 	if ((bufev->input = evbuffer_new()) == NULL) {
    243 		free(bufev);
    244 		return (NULL);
    245 	}
    246 
    247 	if ((bufev->output = evbuffer_new()) == NULL) {
    248 		evbuffer_free(bufev->input);
    249 		free(bufev);
    250 		return (NULL);
    251 	}
    252 
    253 	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
    254 	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
    255 
    256 	bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
    257 
    258 	/*
    259 	 * Set to EV_WRITE so that using bufferevent_write is going to
    260 	 * trigger a callback.  Reading needs to be explicitly enabled
    261 	 * because otherwise no data will be available.
    262 	 */
    263 	bufev->enabled = EV_WRITE;
    264 
    265 	return (bufev);
    266 }
    267 
    268 void
    269 bufferevent_setcb(struct bufferevent *bufev,
    270     evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
    271 {
    272 	bufev->readcb = readcb;
    273 	bufev->writecb = writecb;
    274 	bufev->errorcb = errorcb;
    275 
    276 	bufev->cbarg = cbarg;
    277 }
    278 
    279 void
    280 bufferevent_setfd(struct bufferevent *bufev, int fd)
    281 {
    282 	event_del(&bufev->ev_read);
    283 	event_del(&bufev->ev_write);
    284 
    285 	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
    286 	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
    287 	if (bufev->ev_base != NULL) {
    288 		event_base_set(bufev->ev_base, &bufev->ev_read);
    289 		event_base_set(bufev->ev_base, &bufev->ev_write);
    290 	}
    291 
    292 	/* might have to manually trigger event registration */
    293 }
    294 
    295 int
    296 bufferevent_priority_set(struct bufferevent *bufev, int priority)
    297 {
    298 	if (event_priority_set(&bufev->ev_read, priority) == -1)
    299 		return (-1);
    300 	if (event_priority_set(&bufev->ev_write, priority) == -1)
    301 		return (-1);
    302 
    303 	return (0);
    304 }
    305 
    306 /* Closing the file descriptor is the responsibility of the caller */
    307 
    308 void
    309 bufferevent_free(struct bufferevent *bufev)
    310 {
    311 	event_del(&bufev->ev_read);
    312 	event_del(&bufev->ev_write);
    313 
    314 	evbuffer_free(bufev->input);
    315 	evbuffer_free(bufev->output);
    316 
    317 	free(bufev);
    318 }
    319 
    320 /*
    321  * Returns 0 on success;
    322  *        -1 on failure.
    323  */
    324 
    325 int
    326 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
    327 {
    328 	int res;
    329 
    330 	res = evbuffer_add(bufev->output, data, size);
    331 
    332 	if (res == -1)
    333 		return (res);
    334 
    335 	/* If everything is okay, we need to schedule a write */
    336 	if (size > 0 && (bufev->enabled & EV_WRITE))
    337 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
    338 
    339 	return (res);
    340 }
    341 
    342 int
    343 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
    344 {
    345 	int res;
    346 
    347 	res = bufferevent_write(bufev, buf->buffer, buf->off);
    348 	if (res != -1)
    349 		evbuffer_drain(buf, buf->off);
    350 
    351 	return (res);
    352 }
    353 
    354 size_t
    355 bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
    356 {
    357 	struct evbuffer *buf = bufev->input;
    358 
    359 	if (buf->off < size)
    360 		size = buf->off;
    361 
    362 	/* Copy the available data to the user buffer */
    363 	memcpy(data, buf->buffer, size);
    364 
    365 	if (size)
    366 		evbuffer_drain(buf, size);
    367 
    368 	return (size);
    369 }
    370 
    371 int
    372 bufferevent_enable(struct bufferevent *bufev, short event)
    373 {
    374 	if (event & EV_READ) {
    375 		if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
    376 			return (-1);
    377 	}
    378 	if (event & EV_WRITE) {
    379 		if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
    380 			return (-1);
    381 	}
    382 
    383 	bufev->enabled |= event;
    384 	return (0);
    385 }
    386 
    387 int
    388 bufferevent_disable(struct bufferevent *bufev, short event)
    389 {
    390 	if (event & EV_READ) {
    391 		if (event_del(&bufev->ev_read) == -1)
    392 			return (-1);
    393 	}
    394 	if (event & EV_WRITE) {
    395 		if (event_del(&bufev->ev_write) == -1)
    396 			return (-1);
    397 	}
    398 
    399 	bufev->enabled &= ~event;
    400 	return (0);
    401 }
    402 
    403 /*
    404  * Sets the read and write timeout for a buffered event.
    405  */
    406 
    407 void
    408 bufferevent_settimeout(struct bufferevent *bufev,
    409     int timeout_read, int timeout_write) {
    410 	bufev->timeout_read = timeout_read;
    411 	bufev->timeout_write = timeout_write;
    412 
    413 	if (event_pending(&bufev->ev_read, EV_READ, NULL))
    414 		bufferevent_add(&bufev->ev_read, timeout_read);
    415 	if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
    416 		bufferevent_add(&bufev->ev_write, timeout_write);
    417 }
    418 
    419 /*
    420  * Sets the water marks
    421  */
    422 
    423 void
    424 bufferevent_setwatermark(struct bufferevent *bufev, short events,
    425     size_t lowmark, size_t highmark)
    426 {
    427 	if (events & EV_READ) {
    428 		bufev->wm_read.low = lowmark;
    429 		bufev->wm_read.high = highmark;
    430 	}
    431 
    432 	if (events & EV_WRITE) {
    433 		bufev->wm_write.low = lowmark;
    434 		bufev->wm_write.high = highmark;
    435 	}
    436 
    437 	/* If the watermarks changed then see if we should call read again */
    438 	bufferevent_read_pressure_cb(bufev->input,
    439 	    0, EVBUFFER_LENGTH(bufev->input), bufev);
    440 }
    441 
    442 int
    443 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
    444 {
    445 	int res;
    446 
    447 	bufev->ev_base = base;
    448 
    449 	res = event_base_set(base, &bufev->ev_read);
    450 	if (res == -1)
    451 		return (res);
    452 
    453 	res = event_base_set(base, &bufev->ev_write);
    454 	return (res);
    455 }
    456