Home | History | Annotate | Download | only in libevent
      1 /*
      2  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
      3  * Copyright (c) 2002-2006 Niels Provos <provos (at) citi.umich.edu>
      4  * All rights reserved.
      5  *
      6  * Redistribution and use in source and binary forms, with or without
      7  * modification, are permitted provided that the following conditions
      8  * are met:
      9  * 1. Redistributions of source code must retain the above copyright
     10  *    notice, this list of conditions and the following disclaimer.
     11  * 2. Redistributions in binary form must reproduce the above copyright
     12  *    notice, this list of conditions and the following disclaimer in the
     13  *    documentation and/or other materials provided with the distribution.
     14  * 3. The name of the author may not be used to endorse or promote products
     15  *    derived from this software without specific prior written permission.
     16  *
     17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     27  */
     28 #include "evconfig-private.h"
     29 
     30 #include <sys/types.h>
     31 #include <limits.h>
     32 #include <string.h>
     33 #include <stdlib.h>
     34 
     35 #include "event2/event.h"
     36 #include "event2/event_struct.h"
     37 #include "event2/util.h"
     38 #include "event2/bufferevent.h"
     39 #include "event2/bufferevent_struct.h"
     40 #include "event2/buffer.h"
     41 
     42 #include "ratelim-internal.h"
     43 
     44 #include "bufferevent-internal.h"
     45 #include "mm-internal.h"
     46 #include "util-internal.h"
     47 #include "event-internal.h"
     48 
     49 int
     50 ev_token_bucket_init_(struct ev_token_bucket *bucket,
     51     const struct ev_token_bucket_cfg *cfg,
     52     ev_uint32_t current_tick,
     53     int reinitialize)
     54 {
     55 	if (reinitialize) {
     56 		/* on reinitialization, we only clip downwards, since we've
     57 		   already used who-knows-how-much bandwidth this tick.  We
     58 		   leave "last_updated" as it is; the next update will add the
     59 		   appropriate amount of bandwidth to the bucket.
     60 		*/
     61 		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
     62 			bucket->read_limit = cfg->read_maximum;
     63 		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
     64 			bucket->write_limit = cfg->write_maximum;
     65 	} else {
     66 		bucket->read_limit = cfg->read_rate;
     67 		bucket->write_limit = cfg->write_rate;
     68 		bucket->last_updated = current_tick;
     69 	}
     70 	return 0;
     71 }
     72 
     73 int
     74 ev_token_bucket_update_(struct ev_token_bucket *bucket,
     75     const struct ev_token_bucket_cfg *cfg,
     76     ev_uint32_t current_tick)
     77 {
     78 	/* It's okay if the tick number overflows, since we'll just
     79 	 * wrap around when we do the unsigned substraction. */
     80 	unsigned n_ticks = current_tick - bucket->last_updated;
     81 
     82 	/* Make sure some ticks actually happened, and that time didn't
     83 	 * roll back. */
     84 	if (n_ticks == 0 || n_ticks > INT_MAX)
     85 		return 0;
     86 
     87 	/* Naively, we would say
     88 		bucket->limit += n_ticks * cfg->rate;
     89 
     90 		if (bucket->limit > cfg->maximum)
     91 			bucket->limit = cfg->maximum;
     92 
     93 	   But we're worried about overflow, so we do it like this:
     94 	*/
     95 
     96 	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
     97 		bucket->read_limit = cfg->read_maximum;
     98 	else
     99 		bucket->read_limit += n_ticks * cfg->read_rate;
    100 
    101 
    102 	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
    103 		bucket->write_limit = cfg->write_maximum;
    104 	else
    105 		bucket->write_limit += n_ticks * cfg->write_rate;
    106 
    107 
    108 	bucket->last_updated = current_tick;
    109 
    110 	return 1;
    111 }
    112 
    113 static inline void
    114 bufferevent_update_buckets(struct bufferevent_private *bev)
    115 {
    116 	/* Must hold lock on bev. */
    117 	struct timeval now;
    118 	unsigned tick;
    119 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
    120 	tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
    121 	if (tick != bev->rate_limiting->limit.last_updated)
    122 		ev_token_bucket_update_(&bev->rate_limiting->limit,
    123 		    bev->rate_limiting->cfg, tick);
    124 }
    125 
    126 ev_uint32_t
    127 ev_token_bucket_get_tick_(const struct timeval *tv,
    128     const struct ev_token_bucket_cfg *cfg)
    129 {
    130 	/* This computation uses two multiplies and a divide.  We could do
    131 	 * fewer if we knew that the tick length was an integer number of
    132 	 * seconds, or if we knew it divided evenly into a second.  We should
    133 	 * investigate that more.
    134 	 */
    135 
    136 	/* We cast to an ev_uint64_t first, since we don't want to overflow
    137 	 * before we do the final divide. */
    138 	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
    139 	return (unsigned)(msec / cfg->msec_per_tick);
    140 }
    141 
    142 struct ev_token_bucket_cfg *
    143 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
    144     size_t write_rate, size_t write_burst,
    145     const struct timeval *tick_len)
    146 {
    147 	struct ev_token_bucket_cfg *r;
    148 	struct timeval g;
    149 	if (! tick_len) {
    150 		g.tv_sec = 1;
    151 		g.tv_usec = 0;
    152 		tick_len = &g;
    153 	}
    154 	if (read_rate > read_burst || write_rate > write_burst ||
    155 	    read_rate < 1 || write_rate < 1)
    156 		return NULL;
    157 	if (read_rate > EV_RATE_LIMIT_MAX ||
    158 	    write_rate > EV_RATE_LIMIT_MAX ||
    159 	    read_burst > EV_RATE_LIMIT_MAX ||
    160 	    write_burst > EV_RATE_LIMIT_MAX)
    161 		return NULL;
    162 	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
    163 	if (!r)
    164 		return NULL;
    165 	r->read_rate = read_rate;
    166 	r->write_rate = write_rate;
    167 	r->read_maximum = read_burst;
    168 	r->write_maximum = write_burst;
    169 	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
    170 	r->msec_per_tick = (tick_len->tv_sec * 1000) +
    171 	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
    172 	return r;
    173 }
    174 
    175 void
    176 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
    177 {
    178 	mm_free(cfg);
    179 }
    180 
    181 /* Default values for max_single_read & max_single_write variables. */
    182 #define MAX_SINGLE_READ_DEFAULT 16384
    183 #define MAX_SINGLE_WRITE_DEFAULT 16384
    184 
    185 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
    186 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
    187 
    188 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
    189 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
    190 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
    191 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
    192 
    193 /** Helper: figure out the maximum amount we should write if is_write, or
    194     the maximum amount we should read if is_read.  Return that maximum, or
    195     0 if our bucket is wholly exhausted.
    196  */
    197 static inline ev_ssize_t
    198 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
    199 {
    200 	/* needs lock on bev. */
    201 	ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
    202 
    203 #define LIM(x)						\
    204 	(is_write ? (x).write_limit : (x).read_limit)
    205 
    206 #define GROUP_SUSPENDED(g)			\
    207 	(is_write ? (g)->write_suspended : (g)->read_suspended)
    208 
    209 	/* Sets max_so_far to MIN(x, max_so_far) */
    210 #define CLAMPTO(x)				\
    211 	do {					\
    212 		if (max_so_far > (x))		\
    213 			max_so_far = (x);	\
    214 	} while (0);
    215 
    216 	if (!bev->rate_limiting)
    217 		return max_so_far;
    218 
    219 	/* If rate-limiting is enabled at all, update the appropriate
    220 	   bucket, and take the smaller of our rate limit and the group
    221 	   rate limit.
    222 	 */
    223 
    224 	if (bev->rate_limiting->cfg) {
    225 		bufferevent_update_buckets(bev);
    226 		max_so_far = LIM(bev->rate_limiting->limit);
    227 	}
    228 	if (bev->rate_limiting->group) {
    229 		struct bufferevent_rate_limit_group *g =
    230 		    bev->rate_limiting->group;
    231 		ev_ssize_t share;
    232 		LOCK_GROUP(g);
    233 		if (GROUP_SUSPENDED(g)) {
    234 			/* We can get here if we failed to lock this
    235 			 * particular bufferevent while suspending the whole
    236 			 * group. */
    237 			if (is_write)
    238 				bufferevent_suspend_write_(&bev->bev,
    239 				    BEV_SUSPEND_BW_GROUP);
    240 			else
    241 				bufferevent_suspend_read_(&bev->bev,
    242 				    BEV_SUSPEND_BW_GROUP);
    243 			share = 0;
    244 		} else {
    245 			/* XXXX probably we should divide among the active
    246 			 * members, not the total members. */
    247 			share = LIM(g->rate_limit) / g->n_members;
    248 			if (share < g->min_share)
    249 				share = g->min_share;
    250 		}
    251 		UNLOCK_GROUP(g);
    252 		CLAMPTO(share);
    253 	}
    254 
    255 	if (max_so_far < 0)
    256 		max_so_far = 0;
    257 	return max_so_far;
    258 }
    259 
    260 ev_ssize_t
    261 bufferevent_get_read_max_(struct bufferevent_private *bev)
    262 {
    263 	return bufferevent_get_rlim_max_(bev, 0);
    264 }
    265 
    266 ev_ssize_t
    267 bufferevent_get_write_max_(struct bufferevent_private *bev)
    268 {
    269 	return bufferevent_get_rlim_max_(bev, 1);
    270 }
    271 
    272 int
    273 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
    274 {
    275 	/* XXXXX Make sure all users of this function check its return value */
    276 	int r = 0;
    277 	/* need to hold lock on bev */
    278 	if (!bev->rate_limiting)
    279 		return 0;
    280 
    281 	if (bev->rate_limiting->cfg) {
    282 		bev->rate_limiting->limit.read_limit -= bytes;
    283 		if (bev->rate_limiting->limit.read_limit <= 0) {
    284 			bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
    285 			if (event_add(&bev->rate_limiting->refill_bucket_event,
    286 				&bev->rate_limiting->cfg->tick_timeout) < 0)
    287 				r = -1;
    288 		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
    289 			if (!(bev->write_suspended & BEV_SUSPEND_BW))
    290 				event_del(&bev->rate_limiting->refill_bucket_event);
    291 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
    292 		}
    293 	}
    294 
    295 	if (bev->rate_limiting->group) {
    296 		LOCK_GROUP(bev->rate_limiting->group);
    297 		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
    298 		bev->rate_limiting->group->total_read += bytes;
    299 		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
    300 			bev_group_suspend_reading_(bev->rate_limiting->group);
    301 		} else if (bev->rate_limiting->group->read_suspended) {
    302 			bev_group_unsuspend_reading_(bev->rate_limiting->group);
    303 		}
    304 		UNLOCK_GROUP(bev->rate_limiting->group);
    305 	}
    306 
    307 	return r;
    308 }
    309 
    310 int
    311 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
    312 {
    313 	/* XXXXX Make sure all users of this function check its return value */
    314 	int r = 0;
    315 	/* need to hold lock */
    316 	if (!bev->rate_limiting)
    317 		return 0;
    318 
    319 	if (bev->rate_limiting->cfg) {
    320 		bev->rate_limiting->limit.write_limit -= bytes;
    321 		if (bev->rate_limiting->limit.write_limit <= 0) {
    322 			bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
    323 			if (event_add(&bev->rate_limiting->refill_bucket_event,
    324 				&bev->rate_limiting->cfg->tick_timeout) < 0)
    325 				r = -1;
    326 		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
    327 			if (!(bev->read_suspended & BEV_SUSPEND_BW))
    328 				event_del(&bev->rate_limiting->refill_bucket_event);
    329 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
    330 		}
    331 	}
    332 
    333 	if (bev->rate_limiting->group) {
    334 		LOCK_GROUP(bev->rate_limiting->group);
    335 		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
    336 		bev->rate_limiting->group->total_written += bytes;
    337 		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
    338 			bev_group_suspend_writing_(bev->rate_limiting->group);
    339 		} else if (bev->rate_limiting->group->write_suspended) {
    340 			bev_group_unsuspend_writing_(bev->rate_limiting->group);
    341 		}
    342 		UNLOCK_GROUP(bev->rate_limiting->group);
    343 	}
    344 
    345 	return r;
    346 }
    347 
    348 /** Stop reading on every bufferevent in <b>g</b> */
    349 static int
    350 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
    351 {
    352 	/* Needs group lock */
    353 	struct bufferevent_private *bev;
    354 	g->read_suspended = 1;
    355 	g->pending_unsuspend_read = 0;
    356 
    357 	/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
    358 	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
    359 	   the bufferevent locks.  If we are unable to lock any individual
    360 	   bufferevent, it will find out later when it looks at its limit
    361 	   and sees that its group is suspended.)
    362 	*/
    363 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
    364 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
    365 			bufferevent_suspend_read_(&bev->bev,
    366 			    BEV_SUSPEND_BW_GROUP);
    367 			EVLOCK_UNLOCK(bev->lock, 0);
    368 		}
    369 	}
    370 	return 0;
    371 }
    372 
    373 /** Stop writing on every bufferevent in <b>g</b> */
    374 static int
    375 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
    376 {
    377 	/* Needs group lock */
    378 	struct bufferevent_private *bev;
    379 	g->write_suspended = 1;
    380 	g->pending_unsuspend_write = 0;
    381 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
    382 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
    383 			bufferevent_suspend_write_(&bev->bev,
    384 			    BEV_SUSPEND_BW_GROUP);
    385 			EVLOCK_UNLOCK(bev->lock, 0);
    386 		}
    387 	}
    388 	return 0;
    389 }
    390 
    391 /** Timer callback invoked on a single bufferevent with one or more exhausted
    392     buckets when they are ready to refill. */
    393 static void
    394 bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
    395 {
    396 	unsigned tick;
    397 	struct timeval now;
    398 	struct bufferevent_private *bev = arg;
    399 	int again = 0;
    400 	BEV_LOCK(&bev->bev);
    401 	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
    402 		BEV_UNLOCK(&bev->bev);
    403 		return;
    404 	}
    405 
    406 	/* First, update the bucket */
    407 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
    408 	tick = ev_token_bucket_get_tick_(&now,
    409 	    bev->rate_limiting->cfg);
    410 	ev_token_bucket_update_(&bev->rate_limiting->limit,
    411 	    bev->rate_limiting->cfg,
    412 	    tick);
    413 
    414 	/* Now unsuspend any read/write operations as appropriate. */
    415 	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
    416 		if (bev->rate_limiting->limit.read_limit > 0)
    417 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
    418 		else
    419 			again = 1;
    420 	}
    421 	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
    422 		if (bev->rate_limiting->limit.write_limit > 0)
    423 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
    424 		else
    425 			again = 1;
    426 	}
    427 	if (again) {
    428 		/* One or more of the buckets may need another refill if they
    429 		   started negative.
    430 
    431 		   XXXX if we need to be quiet for more ticks, we should
    432 		   maybe figure out what timeout we really want.
    433 		*/
    434 		/* XXXX Handle event_add failure somehow */
    435 		event_add(&bev->rate_limiting->refill_bucket_event,
    436 		    &bev->rate_limiting->cfg->tick_timeout);
    437 	}
    438 	BEV_UNLOCK(&bev->bev);
    439 }
    440 
    441 /** Helper: grab a random element from a bufferevent group.
    442  *
    443  * Requires that we hold the lock on the group.
    444  */
    445 static struct bufferevent_private *
    446 bev_group_random_element_(struct bufferevent_rate_limit_group *group)
    447 {
    448 	int which;
    449 	struct bufferevent_private *bev;
    450 
    451 	/* requires group lock */
    452 
    453 	if (!group->n_members)
    454 		return NULL;
    455 
    456 	EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
    457 
    458 	which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
    459 
    460 	bev = LIST_FIRST(&group->members);
    461 	while (which--)
    462 		bev = LIST_NEXT(bev, rate_limiting->next_in_group);
    463 
    464 	return bev;
    465 }
    466 
    467 /** Iterate over the elements of a rate-limiting group 'g' with a random
    468     starting point, assigning each to the variable 'bev', and executing the
    469     block 'block'.
    470 
    471     We do this in a half-baked effort to get fairness among group members.
    472     XXX Round-robin or some kind of priority queue would be even more fair.
    473  */
    474 #define FOREACH_RANDOM_ORDER(block)			\
    475 	do {						\
    476 		first = bev_group_random_element_(g);	\
    477 		for (bev = first; bev != LIST_END(&g->members); \
    478 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
    479 			block ;					 \
    480 		}						 \
    481 		for (bev = LIST_FIRST(&g->members); bev && bev != first; \
    482 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
    483 			block ;						\
    484 		}							\
    485 	} while (0)
    486 
    487 static void
    488 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
    489 {
    490 	int again = 0;
    491 	struct bufferevent_private *bev, *first;
    492 
    493 	g->read_suspended = 0;
    494 	FOREACH_RANDOM_ORDER({
    495 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
    496 			bufferevent_unsuspend_read_(&bev->bev,
    497 			    BEV_SUSPEND_BW_GROUP);
    498 			EVLOCK_UNLOCK(bev->lock, 0);
    499 		} else {
    500 			again = 1;
    501 		}
    502 	});
    503 	g->pending_unsuspend_read = again;
    504 }
    505 
    506 static void
    507 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
    508 {
    509 	int again = 0;
    510 	struct bufferevent_private *bev, *first;
    511 	g->write_suspended = 0;
    512 
    513 	FOREACH_RANDOM_ORDER({
    514 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
    515 			bufferevent_unsuspend_write_(&bev->bev,
    516 			    BEV_SUSPEND_BW_GROUP);
    517 			EVLOCK_UNLOCK(bev->lock, 0);
    518 		} else {
    519 			again = 1;
    520 		}
    521 	});
    522 	g->pending_unsuspend_write = again;
    523 }
    524 
    525 /** Callback invoked every tick to add more elements to the group bucket
    526     and unsuspend group members as needed.
    527  */
    528 static void
    529 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
    530 {
    531 	struct bufferevent_rate_limit_group *g = arg;
    532 	unsigned tick;
    533 	struct timeval now;
    534 
    535 	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
    536 
    537 	LOCK_GROUP(g);
    538 
    539 	tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
    540 	ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
    541 
    542 	if (g->pending_unsuspend_read ||
    543 	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
    544 		bev_group_unsuspend_reading_(g);
    545 	}
    546 	if (g->pending_unsuspend_write ||
    547 	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
    548 		bev_group_unsuspend_writing_(g);
    549 	}
    550 
    551 	/* XXXX Rather than waiting to the next tick to unsuspend stuff
    552 	 * with pending_unsuspend_write/read, we should do it on the
    553 	 * next iteration of the mainloop.
    554 	 */
    555 
    556 	UNLOCK_GROUP(g);
    557 }
    558 
    559 int
    560 bufferevent_set_rate_limit(struct bufferevent *bev,
    561     struct ev_token_bucket_cfg *cfg)
    562 {
    563 	struct bufferevent_private *bevp =
    564 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
    565 	int r = -1;
    566 	struct bufferevent_rate_limit *rlim;
    567 	struct timeval now;
    568 	ev_uint32_t tick;
    569 	int reinit = 0, suspended = 0;
    570 	/* XXX reference-count cfg */
    571 
    572 	BEV_LOCK(bev);
    573 
    574 	if (cfg == NULL) {
    575 		if (bevp->rate_limiting) {
    576 			rlim = bevp->rate_limiting;
    577 			rlim->cfg = NULL;
    578 			bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
    579 			bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
    580 			if (event_initialized(&rlim->refill_bucket_event))
    581 				event_del(&rlim->refill_bucket_event);
    582 		}
    583 		r = 0;
    584 		goto done;
    585 	}
    586 
    587 	event_base_gettimeofday_cached(bev->ev_base, &now);
    588 	tick = ev_token_bucket_get_tick_(&now, cfg);
    589 
    590 	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
    591 		/* no-op */
    592 		r = 0;
    593 		goto done;
    594 	}
    595 	if (bevp->rate_limiting == NULL) {
    596 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
    597 		if (!rlim)
    598 			goto done;
    599 		bevp->rate_limiting = rlim;
    600 	} else {
    601 		rlim = bevp->rate_limiting;
    602 	}
    603 	reinit = rlim->cfg != NULL;
    604 
    605 	rlim->cfg = cfg;
    606 	ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
    607 
    608 	if (reinit) {
    609 		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
    610 		event_del(&rlim->refill_bucket_event);
    611 	}
    612 	event_assign(&rlim->refill_bucket_event, bev->ev_base,
    613 	    -1, EV_FINALIZE, bev_refill_callback_, bevp);
    614 
    615 	if (rlim->limit.read_limit > 0) {
    616 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
    617 	} else {
    618 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
    619 		suspended=1;
    620 	}
    621 	if (rlim->limit.write_limit > 0) {
    622 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
    623 	} else {
    624 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
    625 		suspended = 1;
    626 	}
    627 
    628 	if (suspended)
    629 		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
    630 
    631 	r = 0;
    632 
    633 done:
    634 	BEV_UNLOCK(bev);
    635 	return r;
    636 }
    637 
    638 struct bufferevent_rate_limit_group *
    639 bufferevent_rate_limit_group_new(struct event_base *base,
    640     const struct ev_token_bucket_cfg *cfg)
    641 {
    642 	struct bufferevent_rate_limit_group *g;
    643 	struct timeval now;
    644 	ev_uint32_t tick;
    645 
    646 	event_base_gettimeofday_cached(base, &now);
    647 	tick = ev_token_bucket_get_tick_(&now, cfg);
    648 
    649 	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
    650 	if (!g)
    651 		return NULL;
    652 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
    653 	LIST_INIT(&g->members);
    654 
    655 	ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
    656 
    657 	event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
    658 	    bev_group_refill_callback_, g);
    659 	/*XXXX handle event_add failure */
    660 	event_add(&g->master_refill_event, &cfg->tick_timeout);
    661 
    662 	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
    663 
    664 	bufferevent_rate_limit_group_set_min_share(g, 64);
    665 
    666 	evutil_weakrand_seed_(&g->weakrand_seed,
    667 	    (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
    668 
    669 	return g;
    670 }
    671 
    672 int
    673 bufferevent_rate_limit_group_set_cfg(
    674 	struct bufferevent_rate_limit_group *g,
    675 	const struct ev_token_bucket_cfg *cfg)
    676 {
    677 	int same_tick;
    678 	if (!g || !cfg)
    679 		return -1;
    680 
    681 	LOCK_GROUP(g);
    682 	same_tick = evutil_timercmp(
    683 		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
    684 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
    685 
    686 	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
    687 		g->rate_limit.read_limit = cfg->read_maximum;
    688 	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
    689 		g->rate_limit.write_limit = cfg->write_maximum;
    690 
    691 	if (!same_tick) {
    692 		/* This can cause a hiccup in the schedule */
    693 		event_add(&g->master_refill_event, &cfg->tick_timeout);
    694 	}
    695 
    696 	/* The new limits might force us to adjust min_share differently. */
    697 	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
    698 
    699 	UNLOCK_GROUP(g);
    700 	return 0;
    701 }
    702 
    703 int
    704 bufferevent_rate_limit_group_set_min_share(
    705 	struct bufferevent_rate_limit_group *g,
    706 	size_t share)
    707 {
    708 	if (share > EV_SSIZE_MAX)
    709 		return -1;
    710 
    711 	g->configured_min_share = share;
    712 
    713 	/* Can't set share to less than the one-tick maximum.  IOW, at steady
    714 	 * state, at least one connection can go per tick. */
    715 	if (share > g->rate_limit_cfg.read_rate)
    716 		share = g->rate_limit_cfg.read_rate;
    717 	if (share > g->rate_limit_cfg.write_rate)
    718 		share = g->rate_limit_cfg.write_rate;
    719 
    720 	g->min_share = share;
    721 	return 0;
    722 }
    723 
    724 void
    725 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
    726 {
    727 	LOCK_GROUP(g);
    728 	EVUTIL_ASSERT(0 == g->n_members);
    729 	event_del(&g->master_refill_event);
    730 	UNLOCK_GROUP(g);
    731 	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
    732 	mm_free(g);
    733 }
    734 
    735 int
    736 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
    737     struct bufferevent_rate_limit_group *g)
    738 {
    739 	int wsuspend, rsuspend;
    740 	struct bufferevent_private *bevp =
    741 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
    742 	BEV_LOCK(bev);
    743 
    744 	if (!bevp->rate_limiting) {
    745 		struct bufferevent_rate_limit *rlim;
    746 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
    747 		if (!rlim) {
    748 			BEV_UNLOCK(bev);
    749 			return -1;
    750 		}
    751 		event_assign(&rlim->refill_bucket_event, bev->ev_base,
    752 		    -1, EV_FINALIZE, bev_refill_callback_, bevp);
    753 		bevp->rate_limiting = rlim;
    754 	}
    755 
    756 	if (bevp->rate_limiting->group == g) {
    757 		BEV_UNLOCK(bev);
    758 		return 0;
    759 	}
    760 	if (bevp->rate_limiting->group)
    761 		bufferevent_remove_from_rate_limit_group(bev);
    762 
    763 	LOCK_GROUP(g);
    764 	bevp->rate_limiting->group = g;
    765 	++g->n_members;
    766 	LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
    767 
    768 	rsuspend = g->read_suspended;
    769 	wsuspend = g->write_suspended;
    770 
    771 	UNLOCK_GROUP(g);
    772 
    773 	if (rsuspend)
    774 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
    775 	if (wsuspend)
    776 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
    777 
    778 	BEV_UNLOCK(bev);
    779 	return 0;
    780 }
    781 
    782 int
    783 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
    784 {
    785 	return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
    786 }
    787 
    788 int
    789 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
    790     int unsuspend)
    791 {
    792 	struct bufferevent_private *bevp =
    793 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
    794 	BEV_LOCK(bev);
    795 	if (bevp->rate_limiting && bevp->rate_limiting->group) {
    796 		struct bufferevent_rate_limit_group *g =
    797 		    bevp->rate_limiting->group;
    798 		LOCK_GROUP(g);
    799 		bevp->rate_limiting->group = NULL;
    800 		--g->n_members;
    801 		LIST_REMOVE(bevp, rate_limiting->next_in_group);
    802 		UNLOCK_GROUP(g);
    803 	}
    804 	if (unsuspend) {
    805 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
    806 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
    807 	}
    808 	BEV_UNLOCK(bev);
    809 	return 0;
    810 }
    811 
    812 /* ===
    813  * API functions to expose rate limits.
    814  *
    815  * Don't use these from inside Libevent; they're meant to be for use by
    816  * the program.
    817  * === */
    818 
    819 /* Mostly you don't want to use this function from inside libevent;
    820  * bufferevent_get_read_max_() is more likely what you want*/
    821 ev_ssize_t
    822 bufferevent_get_read_limit(struct bufferevent *bev)
    823 {
    824 	ev_ssize_t r;
    825 	struct bufferevent_private *bevp;
    826 	BEV_LOCK(bev);
    827 	bevp = BEV_UPCAST(bev);
    828 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
    829 		bufferevent_update_buckets(bevp);
    830 		r = bevp->rate_limiting->limit.read_limit;
    831 	} else {
    832 		r = EV_SSIZE_MAX;
    833 	}
    834 	BEV_UNLOCK(bev);
    835 	return r;
    836 }
    837 
    838 /* Mostly you don't want to use this function from inside libevent;
    839  * bufferevent_get_write_max_() is more likely what you want*/
    840 ev_ssize_t
    841 bufferevent_get_write_limit(struct bufferevent *bev)
    842 {
    843 	ev_ssize_t r;
    844 	struct bufferevent_private *bevp;
    845 	BEV_LOCK(bev);
    846 	bevp = BEV_UPCAST(bev);
    847 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
    848 		bufferevent_update_buckets(bevp);
    849 		r = bevp->rate_limiting->limit.write_limit;
    850 	} else {
    851 		r = EV_SSIZE_MAX;
    852 	}
    853 	BEV_UNLOCK(bev);
    854 	return r;
    855 }
    856 
    857 int
    858 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
    859 {
    860 	struct bufferevent_private *bevp;
    861 	BEV_LOCK(bev);
    862 	bevp = BEV_UPCAST(bev);
    863 	if (size == 0 || size > EV_SSIZE_MAX)
    864 		bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
    865 	else
    866 		bevp->max_single_read = size;
    867 	BEV_UNLOCK(bev);
    868 	return 0;
    869 }
    870 
    871 int
    872 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
    873 {
    874 	struct bufferevent_private *bevp;
    875 	BEV_LOCK(bev);
    876 	bevp = BEV_UPCAST(bev);
    877 	if (size == 0 || size > EV_SSIZE_MAX)
    878 		bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
    879 	else
    880 		bevp->max_single_write = size;
    881 	BEV_UNLOCK(bev);
    882 	return 0;
    883 }
    884 
    885 ev_ssize_t
    886 bufferevent_get_max_single_read(struct bufferevent *bev)
    887 {
    888 	ev_ssize_t r;
    889 
    890 	BEV_LOCK(bev);
    891 	r = BEV_UPCAST(bev)->max_single_read;
    892 	BEV_UNLOCK(bev);
    893 	return r;
    894 }
    895 
    896 ev_ssize_t
    897 bufferevent_get_max_single_write(struct bufferevent *bev)
    898 {
    899 	ev_ssize_t r;
    900 
    901 	BEV_LOCK(bev);
    902 	r = BEV_UPCAST(bev)->max_single_write;
    903 	BEV_UNLOCK(bev);
    904 	return r;
    905 }
    906 
    907 ev_ssize_t
    908 bufferevent_get_max_to_read(struct bufferevent *bev)
    909 {
    910 	ev_ssize_t r;
    911 	BEV_LOCK(bev);
    912 	r = bufferevent_get_read_max_(BEV_UPCAST(bev));
    913 	BEV_UNLOCK(bev);
    914 	return r;
    915 }
    916 
    917 ev_ssize_t
    918 bufferevent_get_max_to_write(struct bufferevent *bev)
    919 {
    920 	ev_ssize_t r;
    921 	BEV_LOCK(bev);
    922 	r = bufferevent_get_write_max_(BEV_UPCAST(bev));
    923 	BEV_UNLOCK(bev);
    924 	return r;
    925 }
    926 
    927 const struct ev_token_bucket_cfg *
    928 bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
    929 	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
    930 	struct ev_token_bucket_cfg *cfg;
    931 
    932 	BEV_LOCK(bev);
    933 
    934 	if (bufev_private->rate_limiting) {
    935 		cfg = bufev_private->rate_limiting->cfg;
    936 	} else {
    937 		cfg = NULL;
    938 	}
    939 
    940 	BEV_UNLOCK(bev);
    941 
    942 	return cfg;
    943 }
    944 
    945 /* Mostly you don't want to use this function from inside libevent;
    946  * bufferevent_get_read_max_() is more likely what you want*/
    947 ev_ssize_t
    948 bufferevent_rate_limit_group_get_read_limit(
    949 	struct bufferevent_rate_limit_group *grp)
    950 {
    951 	ev_ssize_t r;
    952 	LOCK_GROUP(grp);
    953 	r = grp->rate_limit.read_limit;
    954 	UNLOCK_GROUP(grp);
    955 	return r;
    956 }
    957 
    958 /* Mostly you don't want to use this function from inside libevent;
    959  * bufferevent_get_write_max_() is more likely what you want. */
    960 ev_ssize_t
    961 bufferevent_rate_limit_group_get_write_limit(
    962 	struct bufferevent_rate_limit_group *grp)
    963 {
    964 	ev_ssize_t r;
    965 	LOCK_GROUP(grp);
    966 	r = grp->rate_limit.write_limit;
    967 	UNLOCK_GROUP(grp);
    968 	return r;
    969 }
    970 
    971 int
    972 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
    973 {
    974 	int r = 0;
    975 	ev_ssize_t old_limit, new_limit;
    976 	struct bufferevent_private *bevp;
    977 	BEV_LOCK(bev);
    978 	bevp = BEV_UPCAST(bev);
    979 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
    980 	old_limit = bevp->rate_limiting->limit.read_limit;
    981 
    982 	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
    983 	if (old_limit > 0 && new_limit <= 0) {
    984 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
    985 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
    986 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
    987 			r = -1;
    988 	} else if (old_limit <= 0 && new_limit > 0) {
    989 		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
    990 			event_del(&bevp->rate_limiting->refill_bucket_event);
    991 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
    992 	}
    993 
    994 	BEV_UNLOCK(bev);
    995 	return r;
    996 }
    997 
    998 int
    999 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
   1000 {
   1001 	/* XXXX this is mostly copy-and-paste from
   1002 	 * bufferevent_decrement_read_limit */
   1003 	int r = 0;
   1004 	ev_ssize_t old_limit, new_limit;
   1005 	struct bufferevent_private *bevp;
   1006 	BEV_LOCK(bev);
   1007 	bevp = BEV_UPCAST(bev);
   1008 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
   1009 	old_limit = bevp->rate_limiting->limit.write_limit;
   1010 
   1011 	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
   1012 	if (old_limit > 0 && new_limit <= 0) {
   1013 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
   1014 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
   1015 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
   1016 			r = -1;
   1017 	} else if (old_limit <= 0 && new_limit > 0) {
   1018 		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
   1019 			event_del(&bevp->rate_limiting->refill_bucket_event);
   1020 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
   1021 	}
   1022 
   1023 	BEV_UNLOCK(bev);
   1024 	return r;
   1025 }
   1026 
   1027 int
   1028 bufferevent_rate_limit_group_decrement_read(
   1029 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
   1030 {
   1031 	int r = 0;
   1032 	ev_ssize_t old_limit, new_limit;
   1033 	LOCK_GROUP(grp);
   1034 	old_limit = grp->rate_limit.read_limit;
   1035 	new_limit = (grp->rate_limit.read_limit -= decr);
   1036 
   1037 	if (old_limit > 0 && new_limit <= 0) {
   1038 		bev_group_suspend_reading_(grp);
   1039 	} else if (old_limit <= 0 && new_limit > 0) {
   1040 		bev_group_unsuspend_reading_(grp);
   1041 	}
   1042 
   1043 	UNLOCK_GROUP(grp);
   1044 	return r;
   1045 }
   1046 
   1047 int
   1048 bufferevent_rate_limit_group_decrement_write(
   1049 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
   1050 {
   1051 	int r = 0;
   1052 	ev_ssize_t old_limit, new_limit;
   1053 	LOCK_GROUP(grp);
   1054 	old_limit = grp->rate_limit.write_limit;
   1055 	new_limit = (grp->rate_limit.write_limit -= decr);
   1056 
   1057 	if (old_limit > 0 && new_limit <= 0) {
   1058 		bev_group_suspend_writing_(grp);
   1059 	} else if (old_limit <= 0 && new_limit > 0) {
   1060 		bev_group_unsuspend_writing_(grp);
   1061 	}
   1062 
   1063 	UNLOCK_GROUP(grp);
   1064 	return r;
   1065 }
   1066 
   1067 void
   1068 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
   1069     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
   1070 {
   1071 	EVUTIL_ASSERT(grp != NULL);
   1072 	if (total_read_out)
   1073 		*total_read_out = grp->total_read;
   1074 	if (total_written_out)
   1075 		*total_written_out = grp->total_written;
   1076 }
   1077 
   1078 void
   1079 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
   1080 {
   1081 	grp->total_read = grp->total_written = 0;
   1082 }
   1083 
   1084 int
   1085 bufferevent_ratelim_init_(struct bufferevent_private *bev)
   1086 {
   1087 	bev->rate_limiting = NULL;
   1088 	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
   1089 	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
   1090 
   1091 	return 0;
   1092 }
   1093