Home | History | Annotate | Download | only in libevent
      1 /*
      2  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
      3  * Copyright (c) 2002-2006 Niels Provos <provos (at) citi.umich.edu>
      4  * All rights reserved.
      5  *
      6  * Redistribution and use in source and binary forms, with or without
      7  * modification, are permitted provided that the following conditions
      8  * are met:
      9  * 1. Redistributions of source code must retain the above copyright
     10  *    notice, this list of conditions and the following disclaimer.
     11  * 2. Redistributions in binary form must reproduce the above copyright
     12  *    notice, this list of conditions and the following disclaimer in the
     13  *    documentation and/or other materials provided with the distribution.
     14  * 3. The name of the author may not be used to endorse or promote products
     15  *    derived from this software without specific prior written permission.
     16  *
     17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     27  */
     28 
     29 #include <sys/types.h>
     30 #include <limits.h>
     31 #include <string.h>
     32 #include <stdlib.h>
     33 
     34 #include "event2/event.h"
     35 #include "event2/event_struct.h"
     36 #include "event2/util.h"
     37 #include "event2/bufferevent.h"
     38 #include "event2/bufferevent_struct.h"
     39 #include "event2/buffer.h"
     40 
     41 #include "ratelim-internal.h"
     42 
     43 #include "bufferevent-internal.h"
     44 #include "mm-internal.h"
     45 #include "util-internal.h"
     46 #include "event-internal.h"
     47 
     48 int
     49 ev_token_bucket_init(struct ev_token_bucket *bucket,
     50     const struct ev_token_bucket_cfg *cfg,
     51     ev_uint32_t current_tick,
     52     int reinitialize)
     53 {
     54 	if (reinitialize) {
     55 		/* on reinitialization, we only clip downwards, since we've
     56 		   already used who-knows-how-much bandwidth this tick.  We
     57 		   leave "last_updated" as it is; the next update will add the
     58 		   appropriate amount of bandwidth to the bucket.
     59 		*/
     60 		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
     61 			bucket->read_limit = cfg->read_maximum;
     62 		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
     63 			bucket->write_limit = cfg->write_maximum;
     64 	} else {
     65 		bucket->read_limit = cfg->read_rate;
     66 		bucket->write_limit = cfg->write_rate;
     67 		bucket->last_updated = current_tick;
     68 	}
     69 	return 0;
     70 }
     71 
     72 int
     73 ev_token_bucket_update(struct ev_token_bucket *bucket,
     74     const struct ev_token_bucket_cfg *cfg,
     75     ev_uint32_t current_tick)
     76 {
     77 	/* It's okay if the tick number overflows, since we'll just
     78 	 * wrap around when we do the unsigned substraction. */
     79 	unsigned n_ticks = current_tick - bucket->last_updated;
     80 
     81 	/* Make sure some ticks actually happened, and that time didn't
     82 	 * roll back. */
     83 	if (n_ticks == 0 || n_ticks > INT_MAX)
     84 		return 0;
     85 
     86 	/* Naively, we would say
     87 		bucket->limit += n_ticks * cfg->rate;
     88 
     89 		if (bucket->limit > cfg->maximum)
     90 			bucket->limit = cfg->maximum;
     91 
     92 	   But we're worried about overflow, so we do it like this:
     93 	*/
     94 
     95 	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
     96 		bucket->read_limit = cfg->read_maximum;
     97 	else
     98 		bucket->read_limit += n_ticks * cfg->read_rate;
     99 
    100 
    101 	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
    102 		bucket->write_limit = cfg->write_maximum;
    103 	else
    104 		bucket->write_limit += n_ticks * cfg->write_rate;
    105 
    106 
    107 	bucket->last_updated = current_tick;
    108 
    109 	return 1;
    110 }
    111 
    112 static inline void
    113 bufferevent_update_buckets(struct bufferevent_private *bev)
    114 {
    115 	/* Must hold lock on bev. */
    116 	struct timeval now;
    117 	unsigned tick;
    118 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
    119 	tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
    120 	if (tick != bev->rate_limiting->limit.last_updated)
    121 		ev_token_bucket_update(&bev->rate_limiting->limit,
    122 		    bev->rate_limiting->cfg, tick);
    123 }
    124 
    125 ev_uint32_t
    126 ev_token_bucket_get_tick(const struct timeval *tv,
    127     const struct ev_token_bucket_cfg *cfg)
    128 {
    129 	/* This computation uses two multiplies and a divide.  We could do
    130 	 * fewer if we knew that the tick length was an integer number of
    131 	 * seconds, or if we knew it divided evenly into a second.  We should
    132 	 * investigate that more.
    133 	 */
    134 
    135 	/* We cast to an ev_uint64_t first, since we don't want to overflow
    136 	 * before we do the final divide. */
    137 	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
    138 	return (unsigned)(msec / cfg->msec_per_tick);
    139 }
    140 
    141 struct ev_token_bucket_cfg *
    142 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
    143     size_t write_rate, size_t write_burst,
    144     const struct timeval *tick_len)
    145 {
    146 	struct ev_token_bucket_cfg *r;
    147 	struct timeval g;
    148 	if (! tick_len) {
    149 		g.tv_sec = 1;
    150 		g.tv_usec = 0;
    151 		tick_len = &g;
    152 	}
    153 	if (read_rate > read_burst || write_rate > write_burst ||
    154 	    read_rate < 1 || write_rate < 1)
    155 		return NULL;
    156 	if (read_rate > EV_RATE_LIMIT_MAX ||
    157 	    write_rate > EV_RATE_LIMIT_MAX ||
    158 	    read_burst > EV_RATE_LIMIT_MAX ||
    159 	    write_burst > EV_RATE_LIMIT_MAX)
    160 		return NULL;
    161 	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
    162 	if (!r)
    163 		return NULL;
    164 	r->read_rate = read_rate;
    165 	r->write_rate = write_rate;
    166 	r->read_maximum = read_burst;
    167 	r->write_maximum = write_burst;
    168 	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
    169 	r->msec_per_tick = (tick_len->tv_sec * 1000) +
    170 	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
    171 	return r;
    172 }
    173 
    174 void
    175 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
    176 {
    177 	mm_free(cfg);
    178 }
    179 
    180 /* No matter how big our bucket gets, don't try to read more than this
    181  * much in a single read operation. */
    182 #define MAX_TO_READ_EVER 16384
    183 /* No matter how big our bucket gets, don't try to write more than this
    184  * much in a single write operation. */
    185 #define MAX_TO_WRITE_EVER 16384
    186 
    187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
    188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
    189 
    190 static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
    191 static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
    192 static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g);
    193 static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g);
    194 
    195 /** Helper: figure out the maximum amount we should write if is_write, or
    196     the maximum amount we should read if is_read.  Return that maximum, or
    197     0 if our bucket is wholly exhausted.
    198  */
    199 static inline ev_ssize_t
    200 _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
    201 {
    202 	/* needs lock on bev. */
    203 	ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
    204 
    205 #define LIM(x)						\
    206 	(is_write ? (x).write_limit : (x).read_limit)
    207 
    208 #define GROUP_SUSPENDED(g)			\
    209 	(is_write ? (g)->write_suspended : (g)->read_suspended)
    210 
    211 	/* Sets max_so_far to MIN(x, max_so_far) */
    212 #define CLAMPTO(x)				\
    213 	do {					\
    214 		if (max_so_far > (x))		\
    215 			max_so_far = (x);	\
    216 	} while (0);
    217 
    218 	if (!bev->rate_limiting)
    219 		return max_so_far;
    220 
    221 	/* If rate-limiting is enabled at all, update the appropriate
    222 	   bucket, and take the smaller of our rate limit and the group
    223 	   rate limit.
    224 	 */
    225 
    226 	if (bev->rate_limiting->cfg) {
    227 		bufferevent_update_buckets(bev);
    228 		max_so_far = LIM(bev->rate_limiting->limit);
    229 	}
    230 	if (bev->rate_limiting->group) {
    231 		struct bufferevent_rate_limit_group *g =
    232 		    bev->rate_limiting->group;
    233 		ev_ssize_t share;
    234 		LOCK_GROUP(g);
    235 		if (GROUP_SUSPENDED(g)) {
    236 			/* We can get here if we failed to lock this
    237 			 * particular bufferevent while suspending the whole
    238 			 * group. */
    239 			if (is_write)
    240 				bufferevent_suspend_write(&bev->bev,
    241 				    BEV_SUSPEND_BW_GROUP);
    242 			else
    243 				bufferevent_suspend_read(&bev->bev,
    244 				    BEV_SUSPEND_BW_GROUP);
    245 			share = 0;
    246 		} else {
    247 			/* XXXX probably we should divide among the active
    248 			 * members, not the total members. */
    249 			share = LIM(g->rate_limit) / g->n_members;
    250 			if (share < g->min_share)
    251 				share = g->min_share;
    252 		}
    253 		UNLOCK_GROUP(g);
    254 		CLAMPTO(share);
    255 	}
    256 
    257 	if (max_so_far < 0)
    258 		max_so_far = 0;
    259 	return max_so_far;
    260 }
    261 
    262 ev_ssize_t
    263 _bufferevent_get_read_max(struct bufferevent_private *bev)
    264 {
    265 	return _bufferevent_get_rlim_max(bev, 0);
    266 }
    267 
    268 ev_ssize_t
    269 _bufferevent_get_write_max(struct bufferevent_private *bev)
    270 {
    271 	return _bufferevent_get_rlim_max(bev, 1);
    272 }
    273 
    274 int
    275 _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
    276 {
    277 	/* XXXXX Make sure all users of this function check its return value */
    278 	int r = 0;
    279 	/* need to hold lock on bev */
    280 	if (!bev->rate_limiting)
    281 		return 0;
    282 
    283 	if (bev->rate_limiting->cfg) {
    284 		bev->rate_limiting->limit.read_limit -= bytes;
    285 		if (bev->rate_limiting->limit.read_limit <= 0) {
    286 			bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW);
    287 			if (event_add(&bev->rate_limiting->refill_bucket_event,
    288 				&bev->rate_limiting->cfg->tick_timeout) < 0)
    289 				r = -1;
    290 		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
    291 			if (!(bev->write_suspended & BEV_SUSPEND_BW))
    292 				event_del(&bev->rate_limiting->refill_bucket_event);
    293 			bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
    294 		}
    295 	}
    296 
    297 	if (bev->rate_limiting->group) {
    298 		LOCK_GROUP(bev->rate_limiting->group);
    299 		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
    300 		bev->rate_limiting->group->total_read += bytes;
    301 		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
    302 			_bev_group_suspend_reading(bev->rate_limiting->group);
    303 		} else if (bev->rate_limiting->group->read_suspended) {
    304 			_bev_group_unsuspend_reading(bev->rate_limiting->group);
    305 		}
    306 		UNLOCK_GROUP(bev->rate_limiting->group);
    307 	}
    308 
    309 	return r;
    310 }
    311 
    312 int
    313 _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
    314 {
    315 	/* XXXXX Make sure all users of this function check its return value */
    316 	int r = 0;
    317 	/* need to hold lock */
    318 	if (!bev->rate_limiting)
    319 		return 0;
    320 
    321 	if (bev->rate_limiting->cfg) {
    322 		bev->rate_limiting->limit.write_limit -= bytes;
    323 		if (bev->rate_limiting->limit.write_limit <= 0) {
    324 			bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW);
    325 			if (event_add(&bev->rate_limiting->refill_bucket_event,
    326 				&bev->rate_limiting->cfg->tick_timeout) < 0)
    327 				r = -1;
    328 		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
    329 			if (!(bev->read_suspended & BEV_SUSPEND_BW))
    330 				event_del(&bev->rate_limiting->refill_bucket_event);
    331 			bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
    332 		}
    333 	}
    334 
    335 	if (bev->rate_limiting->group) {
    336 		LOCK_GROUP(bev->rate_limiting->group);
    337 		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
    338 		bev->rate_limiting->group->total_written += bytes;
    339 		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
    340 			_bev_group_suspend_writing(bev->rate_limiting->group);
    341 		} else if (bev->rate_limiting->group->write_suspended) {
    342 			_bev_group_unsuspend_writing(bev->rate_limiting->group);
    343 		}
    344 		UNLOCK_GROUP(bev->rate_limiting->group);
    345 	}
    346 
    347 	return r;
    348 }
    349 
    350 /** Stop reading on every bufferevent in <b>g</b> */
    351 static int
    352 _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g)
    353 {
    354 	/* Needs group lock */
    355 	struct bufferevent_private *bev;
    356 	g->read_suspended = 1;
    357 	g->pending_unsuspend_read = 0;
    358 
    359 	/* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK,
    360 	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
    361 	   the bufferevent locks.  If we are unable to lock any individual
    362 	   bufferevent, it will find out later when it looks at its limit
    363 	   and sees that its group is suspended.
    364 	*/
    365 	TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
    366 		if (EVLOCK_TRY_LOCK(bev->lock)) {
    367 			bufferevent_suspend_read(&bev->bev,
    368 			    BEV_SUSPEND_BW_GROUP);
    369 			EVLOCK_UNLOCK(bev->lock, 0);
    370 		}
    371 	}
    372 	return 0;
    373 }
    374 
    375 /** Stop writing on every bufferevent in <b>g</b> */
    376 static int
    377 _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g)
    378 {
    379 	/* Needs group lock */
    380 	struct bufferevent_private *bev;
    381 	g->write_suspended = 1;
    382 	g->pending_unsuspend_write = 0;
    383 	TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
    384 		if (EVLOCK_TRY_LOCK(bev->lock)) {
    385 			bufferevent_suspend_write(&bev->bev,
    386 			    BEV_SUSPEND_BW_GROUP);
    387 			EVLOCK_UNLOCK(bev->lock, 0);
    388 		}
    389 	}
    390 	return 0;
    391 }
    392 
    393 /** Timer callback invoked on a single bufferevent with one or more exhausted
    394     buckets when they are ready to refill. */
    395 static void
    396 _bev_refill_callback(evutil_socket_t fd, short what, void *arg)
    397 {
    398 	unsigned tick;
    399 	struct timeval now;
    400 	struct bufferevent_private *bev = arg;
    401 	int again = 0;
    402 	BEV_LOCK(&bev->bev);
    403 	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
    404 		BEV_UNLOCK(&bev->bev);
    405 		return;
    406 	}
    407 
    408 	/* First, update the bucket */
    409 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
    410 	tick = ev_token_bucket_get_tick(&now,
    411 	    bev->rate_limiting->cfg);
    412 	ev_token_bucket_update(&bev->rate_limiting->limit,
    413 	    bev->rate_limiting->cfg,
    414 	    tick);
    415 
    416 	/* Now unsuspend any read/write operations as appropriate. */
    417 	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
    418 		if (bev->rate_limiting->limit.read_limit > 0)
    419 			bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
    420 		else
    421 			again = 1;
    422 	}
    423 	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
    424 		if (bev->rate_limiting->limit.write_limit > 0)
    425 			bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
    426 		else
    427 			again = 1;
    428 	}
    429 	if (again) {
    430 		/* One or more of the buckets may need another refill if they
    431 		   started negative.
    432 
    433 		   XXXX if we need to be quiet for more ticks, we should
    434 		   maybe figure out what timeout we really want.
    435 		*/
    436 		/* XXXX Handle event_add failure somehow */
    437 		event_add(&bev->rate_limiting->refill_bucket_event,
    438 		    &bev->rate_limiting->cfg->tick_timeout);
    439 	}
    440 	BEV_UNLOCK(&bev->bev);
    441 }
    442 
    443 /** Helper: grab a random element from a bufferevent group. */
    444 static struct bufferevent_private *
    445 _bev_group_random_element(struct bufferevent_rate_limit_group *group)
    446 {
    447 	int which;
    448 	struct bufferevent_private *bev;
    449 
    450 	/* requires group lock */
    451 
    452 	if (!group->n_members)
    453 		return NULL;
    454 
    455 	EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members));
    456 
    457 	which = _evutil_weakrand() % group->n_members;
    458 
    459 	bev = TAILQ_FIRST(&group->members);
    460 	while (which--)
    461 		bev = TAILQ_NEXT(bev, rate_limiting->next_in_group);
    462 
    463 	return bev;
    464 }
    465 
    466 /** Iterate over the elements of a rate-limiting group 'g' with a random
    467     starting point, assigning each to the variable 'bev', and executing the
    468     block 'block'.
    469 
    470     We do this in a half-baked effort to get fairness among group members.
    471     XXX Round-robin or some kind of priority queue would be even more fair.
    472  */
    473 #define FOREACH_RANDOM_ORDER(block)			\
    474 	do {						\
    475 		first = _bev_group_random_element(g);	\
    476 		for (bev = first; bev != TAILQ_END(&g->members); \
    477 		    bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
    478 			block ;					 \
    479 		}						 \
    480 		for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \
    481 		    bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
    482 			block ;						\
    483 		}							\
    484 	} while (0)
    485 
    486 static void
    487 _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
    488 {
    489 	int again = 0;
    490 	struct bufferevent_private *bev, *first;
    491 
    492 	g->read_suspended = 0;
    493 	FOREACH_RANDOM_ORDER({
    494 		if (EVLOCK_TRY_LOCK(bev->lock)) {
    495 			bufferevent_unsuspend_read(&bev->bev,
    496 			    BEV_SUSPEND_BW_GROUP);
    497 			EVLOCK_UNLOCK(bev->lock, 0);
    498 		} else {
    499 			again = 1;
    500 		}
    501 	});
    502 	g->pending_unsuspend_read = again;
    503 }
    504 
    505 static void
    506 _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
    507 {
    508 	int again = 0;
    509 	struct bufferevent_private *bev, *first;
    510 	g->write_suspended = 0;
    511 
    512 	FOREACH_RANDOM_ORDER({
    513 		if (EVLOCK_TRY_LOCK(bev->lock)) {
    514 			bufferevent_unsuspend_write(&bev->bev,
    515 			    BEV_SUSPEND_BW_GROUP);
    516 			EVLOCK_UNLOCK(bev->lock, 0);
    517 		} else {
    518 			again = 1;
    519 		}
    520 	});
    521 	g->pending_unsuspend_write = again;
    522 }
    523 
    524 /** Callback invoked every tick to add more elements to the group bucket
    525     and unsuspend group members as needed.
    526  */
    527 static void
    528 _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
    529 {
    530 	struct bufferevent_rate_limit_group *g = arg;
    531 	unsigned tick;
    532 	struct timeval now;
    533 
    534 	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
    535 
    536 	LOCK_GROUP(g);
    537 
    538 	tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
    539 	ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
    540 
    541 	if (g->pending_unsuspend_read ||
    542 	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
    543 		_bev_group_unsuspend_reading(g);
    544 	}
    545 	if (g->pending_unsuspend_write ||
    546 	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
    547 		_bev_group_unsuspend_writing(g);
    548 	}
    549 
    550 	/* XXXX Rather than waiting to the next tick to unsuspend stuff
    551 	 * with pending_unsuspend_write/read, we should do it on the
    552 	 * next iteration of the mainloop.
    553 	 */
    554 
    555 	UNLOCK_GROUP(g);
    556 }
    557 
    558 int
    559 bufferevent_set_rate_limit(struct bufferevent *bev,
    560     struct ev_token_bucket_cfg *cfg)
    561 {
    562 	struct bufferevent_private *bevp =
    563 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
    564 	int r = -1;
    565 	struct bufferevent_rate_limit *rlim;
    566 	struct timeval now;
    567 	ev_uint32_t tick;
    568 	int reinit = 0, suspended = 0;
    569 	/* XXX reference-count cfg */
    570 
    571 	BEV_LOCK(bev);
    572 
    573 	if (cfg == NULL) {
    574 		if (bevp->rate_limiting) {
    575 			rlim = bevp->rate_limiting;
    576 			rlim->cfg = NULL;
    577 			bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
    578 			bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
    579 			if (event_initialized(&rlim->refill_bucket_event))
    580 				event_del(&rlim->refill_bucket_event);
    581 		}
    582 		r = 0;
    583 		goto done;
    584 	}
    585 
    586 	event_base_gettimeofday_cached(bev->ev_base, &now);
    587 	tick = ev_token_bucket_get_tick(&now, cfg);
    588 
    589 	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
    590 		/* no-op */
    591 		r = 0;
    592 		goto done;
    593 	}
    594 	if (bevp->rate_limiting == NULL) {
    595 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
    596 		if (!rlim)
    597 			goto done;
    598 		bevp->rate_limiting = rlim;
    599 	} else {
    600 		rlim = bevp->rate_limiting;
    601 	}
    602 	reinit = rlim->cfg != NULL;
    603 
    604 	rlim->cfg = cfg;
    605 	ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
    606 
    607 	if (reinit) {
    608 		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
    609 		event_del(&rlim->refill_bucket_event);
    610 	}
    611 	evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
    612 	    _bev_refill_callback, bevp);
    613 
    614 	if (rlim->limit.read_limit > 0) {
    615 		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
    616 	} else {
    617 		bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
    618 		suspended=1;
    619 	}
    620 	if (rlim->limit.write_limit > 0) {
    621 		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
    622 	} else {
    623 		bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
    624 		suspended = 1;
    625 	}
    626 
    627 	if (suspended)
    628 		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
    629 
    630 	r = 0;
    631 
    632 done:
    633 	BEV_UNLOCK(bev);
    634 	return r;
    635 }
    636 
    637 struct bufferevent_rate_limit_group *
    638 bufferevent_rate_limit_group_new(struct event_base *base,
    639     const struct ev_token_bucket_cfg *cfg)
    640 {
    641 	struct bufferevent_rate_limit_group *g;
    642 	struct timeval now;
    643 	ev_uint32_t tick;
    644 
    645 	event_base_gettimeofday_cached(base, &now);
    646 	tick = ev_token_bucket_get_tick(&now, cfg);
    647 
    648 	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
    649 	if (!g)
    650 		return NULL;
    651 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
    652 	TAILQ_INIT(&g->members);
    653 
    654 	ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
    655 
    656 	event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
    657 	    _bev_group_refill_callback, g);
    658 	/*XXXX handle event_add failure */
    659 	event_add(&g->master_refill_event, &cfg->tick_timeout);
    660 
    661 	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
    662 
    663 	bufferevent_rate_limit_group_set_min_share(g, 64);
    664 
    665 	return g;
    666 }
    667 
    668 int
    669 bufferevent_rate_limit_group_set_cfg(
    670 	struct bufferevent_rate_limit_group *g,
    671 	const struct ev_token_bucket_cfg *cfg)
    672 {
    673 	int same_tick;
    674 	if (!g || !cfg)
    675 		return -1;
    676 
    677 	LOCK_GROUP(g);
    678 	same_tick = evutil_timercmp(
    679 		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
    680 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
    681 
    682 	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
    683 		g->rate_limit.read_limit = cfg->read_maximum;
    684 	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
    685 		g->rate_limit.write_limit = cfg->write_maximum;
    686 
    687 	if (!same_tick) {
    688 		/* This can cause a hiccup in the schedule */
    689 		event_add(&g->master_refill_event, &cfg->tick_timeout);
    690 	}
    691 
    692 	/* The new limits might force us to adjust min_share differently. */
    693 	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
    694 
    695 	UNLOCK_GROUP(g);
    696 	return 0;
    697 }
    698 
    699 int
    700 bufferevent_rate_limit_group_set_min_share(
    701 	struct bufferevent_rate_limit_group *g,
    702 	size_t share)
    703 {
    704 	if (share > EV_SSIZE_MAX)
    705 		return -1;
    706 
    707 	g->configured_min_share = share;
    708 
    709 	/* Can't set share to less than the one-tick maximum.  IOW, at steady
    710 	 * state, at least one connection can go per tick. */
    711 	if (share > g->rate_limit_cfg.read_rate)
    712 		share = g->rate_limit_cfg.read_rate;
    713 	if (share > g->rate_limit_cfg.write_rate)
    714 		share = g->rate_limit_cfg.write_rate;
    715 
    716 	g->min_share = share;
    717 	return 0;
    718 }
    719 
    720 void
    721 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
    722 {
    723 	LOCK_GROUP(g);
    724 	EVUTIL_ASSERT(0 == g->n_members);
    725 	event_del(&g->master_refill_event);
    726 	UNLOCK_GROUP(g);
    727 	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
    728 	mm_free(g);
    729 }
    730 
    731 int
    732 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
    733     struct bufferevent_rate_limit_group *g)
    734 {
    735 	int wsuspend, rsuspend;
    736 	struct bufferevent_private *bevp =
    737 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
    738 	BEV_LOCK(bev);
    739 
    740 	if (!bevp->rate_limiting) {
    741 		struct bufferevent_rate_limit *rlim;
    742 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
    743 		if (!rlim) {
    744 			BEV_UNLOCK(bev);
    745 			return -1;
    746 		}
    747 		evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
    748 		    _bev_refill_callback, bevp);
    749 		bevp->rate_limiting = rlim;
    750 	}
    751 
    752 	if (bevp->rate_limiting->group == g) {
    753 		BEV_UNLOCK(bev);
    754 		return 0;
    755 	}
    756 	if (bevp->rate_limiting->group)
    757 		bufferevent_remove_from_rate_limit_group(bev);
    758 
    759 	LOCK_GROUP(g);
    760 	bevp->rate_limiting->group = g;
    761 	++g->n_members;
    762 	TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group);
    763 
    764 	rsuspend = g->read_suspended;
    765 	wsuspend = g->write_suspended;
    766 
    767 	UNLOCK_GROUP(g);
    768 
    769 	if (rsuspend)
    770 		bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP);
    771 	if (wsuspend)
    772 		bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP);
    773 
    774 	BEV_UNLOCK(bev);
    775 	return 0;
    776 }
    777 
    778 int
    779 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
    780 {
    781 	return bufferevent_remove_from_rate_limit_group_internal(bev, 1);
    782 }
    783 
    784 int
    785 bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev,
    786     int unsuspend)
    787 {
    788 	struct bufferevent_private *bevp =
    789 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
    790 	BEV_LOCK(bev);
    791 	if (bevp->rate_limiting && bevp->rate_limiting->group) {
    792 		struct bufferevent_rate_limit_group *g =
    793 		    bevp->rate_limiting->group;
    794 		LOCK_GROUP(g);
    795 		bevp->rate_limiting->group = NULL;
    796 		--g->n_members;
    797 		TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group);
    798 		UNLOCK_GROUP(g);
    799 	}
    800 	if (unsuspend) {
    801 		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
    802 		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
    803 	}
    804 	BEV_UNLOCK(bev);
    805 	return 0;
    806 }
    807 
    808 /* ===
    809  * API functions to expose rate limits.
    810  *
    811  * Don't use these from inside Libevent; they're meant to be for use by
    812  * the program.
    813  * === */
    814 
    815 /* Mostly you don't want to use this function from inside libevent;
    816  * _bufferevent_get_read_max() is more likely what you want*/
    817 ev_ssize_t
    818 bufferevent_get_read_limit(struct bufferevent *bev)
    819 {
    820 	ev_ssize_t r;
    821 	struct bufferevent_private *bevp;
    822 	BEV_LOCK(bev);
    823 	bevp = BEV_UPCAST(bev);
    824 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
    825 		bufferevent_update_buckets(bevp);
    826 		r = bevp->rate_limiting->limit.read_limit;
    827 	} else {
    828 		r = EV_SSIZE_MAX;
    829 	}
    830 	BEV_UNLOCK(bev);
    831 	return r;
    832 }
    833 
    834 /* Mostly you don't want to use this function from inside libevent;
    835  * _bufferevent_get_write_max() is more likely what you want*/
    836 ev_ssize_t
    837 bufferevent_get_write_limit(struct bufferevent *bev)
    838 {
    839 	ev_ssize_t r;
    840 	struct bufferevent_private *bevp;
    841 	BEV_LOCK(bev);
    842 	bevp = BEV_UPCAST(bev);
    843 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
    844 		bufferevent_update_buckets(bevp);
    845 		r = bevp->rate_limiting->limit.write_limit;
    846 	} else {
    847 		r = EV_SSIZE_MAX;
    848 	}
    849 	BEV_UNLOCK(bev);
    850 	return r;
    851 }
    852 
    853 ev_ssize_t
    854 bufferevent_get_max_to_read(struct bufferevent *bev)
    855 {
    856 	ev_ssize_t r;
    857 	BEV_LOCK(bev);
    858 	r = _bufferevent_get_read_max(BEV_UPCAST(bev));
    859 	BEV_UNLOCK(bev);
    860 	return r;
    861 }
    862 
    863 ev_ssize_t
    864 bufferevent_get_max_to_write(struct bufferevent *bev)
    865 {
    866 	ev_ssize_t r;
    867 	BEV_LOCK(bev);
    868 	r = _bufferevent_get_write_max(BEV_UPCAST(bev));
    869 	BEV_UNLOCK(bev);
    870 	return r;
    871 }
    872 
    873 
    874 /* Mostly you don't want to use this function from inside libevent;
    875  * _bufferevent_get_read_max() is more likely what you want*/
    876 ev_ssize_t
    877 bufferevent_rate_limit_group_get_read_limit(
    878 	struct bufferevent_rate_limit_group *grp)
    879 {
    880 	ev_ssize_t r;
    881 	LOCK_GROUP(grp);
    882 	r = grp->rate_limit.read_limit;
    883 	UNLOCK_GROUP(grp);
    884 	return r;
    885 }
    886 
    887 /* Mostly you don't want to use this function from inside libevent;
    888  * _bufferevent_get_write_max() is more likely what you want. */
    889 ev_ssize_t
    890 bufferevent_rate_limit_group_get_write_limit(
    891 	struct bufferevent_rate_limit_group *grp)
    892 {
    893 	ev_ssize_t r;
    894 	LOCK_GROUP(grp);
    895 	r = grp->rate_limit.write_limit;
    896 	UNLOCK_GROUP(grp);
    897 	return r;
    898 }
    899 
    900 int
    901 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
    902 {
    903 	int r = 0;
    904 	ev_ssize_t old_limit, new_limit;
    905 	struct bufferevent_private *bevp;
    906 	BEV_LOCK(bev);
    907 	bevp = BEV_UPCAST(bev);
    908 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
    909 	old_limit = bevp->rate_limiting->limit.read_limit;
    910 
    911 	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
    912 	if (old_limit > 0 && new_limit <= 0) {
    913 		bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
    914 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
    915 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
    916 			r = -1;
    917 	} else if (old_limit <= 0 && new_limit > 0) {
    918 		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
    919 			event_del(&bevp->rate_limiting->refill_bucket_event);
    920 		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
    921 	}
    922 
    923 	BEV_UNLOCK(bev);
    924 	return r;
    925 }
    926 
    927 int
    928 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
    929 {
    930 	/* XXXX this is mostly copy-and-paste from
    931 	 * bufferevent_decrement_read_limit */
    932 	int r = 0;
    933 	ev_ssize_t old_limit, new_limit;
    934 	struct bufferevent_private *bevp;
    935 	BEV_LOCK(bev);
    936 	bevp = BEV_UPCAST(bev);
    937 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
    938 	old_limit = bevp->rate_limiting->limit.write_limit;
    939 
    940 	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
    941 	if (old_limit > 0 && new_limit <= 0) {
    942 		bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
    943 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
    944 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
    945 			r = -1;
    946 	} else if (old_limit <= 0 && new_limit > 0) {
    947 		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
    948 			event_del(&bevp->rate_limiting->refill_bucket_event);
    949 		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
    950 	}
    951 
    952 	BEV_UNLOCK(bev);
    953 	return r;
    954 }
    955 
    956 int
    957 bufferevent_rate_limit_group_decrement_read(
    958 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
    959 {
    960 	int r = 0;
    961 	ev_ssize_t old_limit, new_limit;
    962 	LOCK_GROUP(grp);
    963 	old_limit = grp->rate_limit.read_limit;
    964 	new_limit = (grp->rate_limit.read_limit -= decr);
    965 
    966 	if (old_limit > 0 && new_limit <= 0) {
    967 		_bev_group_suspend_reading(grp);
    968 	} else if (old_limit <= 0 && new_limit > 0) {
    969 		_bev_group_unsuspend_reading(grp);
    970 	}
    971 
    972 	UNLOCK_GROUP(grp);
    973 	return r;
    974 }
    975 
    976 int
    977 bufferevent_rate_limit_group_decrement_write(
    978 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
    979 {
    980 	int r = 0;
    981 	ev_ssize_t old_limit, new_limit;
    982 	LOCK_GROUP(grp);
    983 	old_limit = grp->rate_limit.write_limit;
    984 	new_limit = (grp->rate_limit.write_limit -= decr);
    985 
    986 	if (old_limit > 0 && new_limit <= 0) {
    987 		_bev_group_suspend_writing(grp);
    988 	} else if (old_limit <= 0 && new_limit > 0) {
    989 		_bev_group_unsuspend_writing(grp);
    990 	}
    991 
    992 	UNLOCK_GROUP(grp);
    993 	return r;
    994 }
    995 
    996 void
    997 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
    998     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
    999 {
   1000 	EVUTIL_ASSERT(grp != NULL);
   1001 	if (total_read_out)
   1002 		*total_read_out = grp->total_read;
   1003 	if (total_written_out)
   1004 		*total_written_out = grp->total_written;
   1005 }
   1006 
   1007 void
   1008 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
   1009 {
   1010 	grp->total_read = grp->total_written = 0;
   1011 }
   1012