1 /* 2 * Copyright (c) 2000-2007 Niels Provos <provos (at) citi.umich.edu> 3 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 #include "event2/event-config.h" 28 29 #ifdef WIN32 30 #include <winsock2.h> 31 #define WIN32_LEAN_AND_MEAN 32 #include <windows.h> 33 #undef WIN32_LEAN_AND_MEAN 34 #endif 35 #include <sys/types.h> 36 #if !defined(WIN32) && defined(_EVENT_HAVE_SYS_TIME_H) 37 #include <sys/time.h> 38 #endif 39 #include <sys/queue.h> 40 #ifdef _EVENT_HAVE_SYS_SOCKET_H 41 #include <sys/socket.h> 42 #endif 43 #include <stdio.h> 44 #include <stdlib.h> 45 #ifdef _EVENT_HAVE_UNISTD_H 46 #include <unistd.h> 47 #endif 48 #ifdef _EVENT_HAVE_SYS_EVENTFD_H 49 #include <sys/eventfd.h> 50 #endif 51 #include <ctype.h> 52 #include <errno.h> 53 #include <signal.h> 54 #include <string.h> 55 #include <time.h> 56 57 #include "event2/event.h" 58 #include "event2/event_struct.h" 59 #include "event2/event_compat.h" 60 #include "event-internal.h" 61 #include "defer-internal.h" 62 #include "evthread-internal.h" 63 #include "event2/thread.h" 64 #include "event2/util.h" 65 #include "log-internal.h" 66 #include "evmap-internal.h" 67 #include "iocp-internal.h" 68 #include "changelist-internal.h" 69 #include "ht-internal.h" 70 #include "util-internal.h" 71 72 #ifdef _EVENT_HAVE_EVENT_PORTS 73 extern const struct eventop evportops; 74 #endif 75 #ifdef _EVENT_HAVE_SELECT 76 extern const struct eventop selectops; 77 #endif 78 #ifdef _EVENT_HAVE_POLL 79 extern const struct eventop pollops; 80 #endif 81 #ifdef _EVENT_HAVE_EPOLL 82 extern const struct eventop epollops; 83 #endif 84 #ifdef _EVENT_HAVE_WORKING_KQUEUE 85 extern const struct eventop kqops; 86 #endif 87 #ifdef _EVENT_HAVE_DEVPOLL 88 extern const struct eventop devpollops; 89 #endif 90 #ifdef WIN32 91 extern const struct eventop win32ops; 92 #endif 93 94 /* Array of backends in order of preference. */ 95 static const struct eventop *eventops[] = { 96 #ifdef _EVENT_HAVE_EVENT_PORTS 97 &evportops, 98 #endif 99 #ifdef _EVENT_HAVE_WORKING_KQUEUE 100 &kqops, 101 #endif 102 #ifdef _EVENT_HAVE_EPOLL 103 &epollops, 104 #endif 105 #ifdef _EVENT_HAVE_DEVPOLL 106 &devpollops, 107 #endif 108 #ifdef _EVENT_HAVE_POLL 109 &pollops, 110 #endif 111 #ifdef _EVENT_HAVE_SELECT 112 &selectops, 113 #endif 114 #ifdef WIN32 115 &win32ops, 116 #endif 117 NULL 118 }; 119 120 /* Global state; deprecated */ 121 struct event_base *event_global_current_base_ = NULL; 122 #define current_base event_global_current_base_ 123 124 /* Global state */ 125 126 static int use_monotonic; 127 128 /* Prototypes */ 129 static inline int event_add_internal(struct event *ev, 130 const struct timeval *tv, int tv_is_absolute); 131 static inline int event_del_internal(struct event *ev); 132 133 static void event_queue_insert(struct event_base *, struct event *, int); 134 static void event_queue_remove(struct event_base *, struct event *, int); 135 static int event_haveevents(struct event_base *); 136 137 static int event_process_active(struct event_base *); 138 139 static int timeout_next(struct event_base *, struct timeval **); 140 static void timeout_process(struct event_base *); 141 static void timeout_correct(struct event_base *, struct timeval *); 142 143 static inline void event_signal_closure(struct event_base *, struct event *ev); 144 static inline void event_persist_closure(struct event_base *, struct event *ev); 145 146 static int evthread_notify_base(struct event_base *base); 147 148 #ifndef _EVENT_DISABLE_DEBUG_MODE 149 /* These functions implement a hashtable of which 'struct event *' structures 150 * have been setup or added. We don't want to trust the content of the struct 151 * event itself, since we're trying to work through cases where an event gets 152 * clobbered or freed. Instead, we keep a hashtable indexed by the pointer. 153 */ 154 155 struct event_debug_entry { 156 HT_ENTRY(event_debug_entry) node; 157 const struct event *ptr; 158 unsigned added : 1; 159 }; 160 161 static inline unsigned 162 hash_debug_entry(const struct event_debug_entry *e) 163 { 164 /* We need to do this silliness to convince compilers that we 165 * honestly mean to cast e->ptr to an integer, and discard any 166 * part of it that doesn't fit in an unsigned. 167 */ 168 unsigned u = (unsigned) ((ev_uintptr_t) e->ptr); 169 /* Our hashtable implementation is pretty sensitive to low bits, 170 * and every struct event is over 64 bytes in size, so we can 171 * just say >>6. */ 172 return (u >> 6); 173 } 174 175 static inline int 176 eq_debug_entry(const struct event_debug_entry *a, 177 const struct event_debug_entry *b) 178 { 179 return a->ptr == b->ptr; 180 } 181 182 int _event_debug_mode_on = 0; 183 /* Set if it's too late to enable event_debug_mode. */ 184 static int event_debug_mode_too_late = 0; 185 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 186 static void *_event_debug_map_lock = NULL; 187 #endif 188 static HT_HEAD(event_debug_map, event_debug_entry) global_debug_map = 189 HT_INITIALIZER(); 190 191 HT_PROTOTYPE(event_debug_map, event_debug_entry, node, hash_debug_entry, 192 eq_debug_entry) 193 HT_GENERATE(event_debug_map, event_debug_entry, node, hash_debug_entry, 194 eq_debug_entry, 0.5, mm_malloc, mm_realloc, mm_free) 195 196 /* Macro: record that ev is now setup (that is, ready for an add) */ 197 #define _event_debug_note_setup(ev) do { \ 198 if (_event_debug_mode_on) { \ 199 struct event_debug_entry *dent,find; \ 200 find.ptr = (ev); \ 201 EVLOCK_LOCK(_event_debug_map_lock, 0); \ 202 dent = HT_FIND(event_debug_map, &global_debug_map, &find); \ 203 if (dent) { \ 204 dent->added = 0; \ 205 } else { \ 206 dent = mm_malloc(sizeof(*dent)); \ 207 if (!dent) \ 208 event_err(1, \ 209 "Out of memory in debugging code"); \ 210 dent->ptr = (ev); \ 211 dent->added = 0; \ 212 HT_INSERT(event_debug_map, &global_debug_map, dent); \ 213 } \ 214 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \ 215 } \ 216 event_debug_mode_too_late = 1; \ 217 } while (0) 218 /* Macro: record that ev is no longer setup */ 219 #define _event_debug_note_teardown(ev) do { \ 220 if (_event_debug_mode_on) { \ 221 struct event_debug_entry *dent,find; \ 222 find.ptr = (ev); \ 223 EVLOCK_LOCK(_event_debug_map_lock, 0); \ 224 dent = HT_REMOVE(event_debug_map, &global_debug_map, &find); \ 225 if (dent) \ 226 mm_free(dent); \ 227 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \ 228 } \ 229 event_debug_mode_too_late = 1; \ 230 } while (0) 231 /* Macro: record that ev is now added */ 232 #define _event_debug_note_add(ev) do { \ 233 if (_event_debug_mode_on) { \ 234 struct event_debug_entry *dent,find; \ 235 find.ptr = (ev); \ 236 EVLOCK_LOCK(_event_debug_map_lock, 0); \ 237 dent = HT_FIND(event_debug_map, &global_debug_map, &find); \ 238 if (dent) { \ 239 dent->added = 1; \ 240 } else { \ 241 event_errx(_EVENT_ERR_ABORT, \ 242 "%s: noting an add on a non-setup event %p" \ 243 " (events: 0x%x, fd: "EV_SOCK_FMT \ 244 ", flags: 0x%x)", \ 245 __func__, (ev), (ev)->ev_events, \ 246 EV_SOCK_ARG((ev)->ev_fd), (ev)->ev_flags); \ 247 } \ 248 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \ 249 } \ 250 event_debug_mode_too_late = 1; \ 251 } while (0) 252 /* Macro: record that ev is no longer added */ 253 #define _event_debug_note_del(ev) do { \ 254 if (_event_debug_mode_on) { \ 255 struct event_debug_entry *dent,find; \ 256 find.ptr = (ev); \ 257 EVLOCK_LOCK(_event_debug_map_lock, 0); \ 258 dent = HT_FIND(event_debug_map, &global_debug_map, &find); \ 259 if (dent) { \ 260 dent->added = 0; \ 261 } else { \ 262 event_errx(_EVENT_ERR_ABORT, \ 263 "%s: noting a del on a non-setup event %p" \ 264 " (events: 0x%x, fd: "EV_SOCK_FMT \ 265 ", flags: 0x%x)", \ 266 __func__, (ev), (ev)->ev_events, \ 267 EV_SOCK_ARG((ev)->ev_fd), (ev)->ev_flags); \ 268 } \ 269 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \ 270 } \ 271 event_debug_mode_too_late = 1; \ 272 } while (0) 273 /* Macro: assert that ev is setup (i.e., okay to add or inspect) */ 274 #define _event_debug_assert_is_setup(ev) do { \ 275 if (_event_debug_mode_on) { \ 276 struct event_debug_entry *dent,find; \ 277 find.ptr = (ev); \ 278 EVLOCK_LOCK(_event_debug_map_lock, 0); \ 279 dent = HT_FIND(event_debug_map, &global_debug_map, &find); \ 280 if (!dent) { \ 281 event_errx(_EVENT_ERR_ABORT, \ 282 "%s called on a non-initialized event %p" \ 283 " (events: 0x%x, fd: "EV_SOCK_FMT\ 284 ", flags: 0x%x)", \ 285 __func__, (ev), (ev)->ev_events, \ 286 EV_SOCK_ARG((ev)->ev_fd), (ev)->ev_flags); \ 287 } \ 288 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \ 289 } \ 290 } while (0) 291 /* Macro: assert that ev is not added (i.e., okay to tear down or set 292 * up again) */ 293 #define _event_debug_assert_not_added(ev) do { \ 294 if (_event_debug_mode_on) { \ 295 struct event_debug_entry *dent,find; \ 296 find.ptr = (ev); \ 297 EVLOCK_LOCK(_event_debug_map_lock, 0); \ 298 dent = HT_FIND(event_debug_map, &global_debug_map, &find); \ 299 if (dent && dent->added) { \ 300 event_errx(_EVENT_ERR_ABORT, \ 301 "%s called on an already added event %p" \ 302 " (events: 0x%x, fd: "EV_SOCK_FMT", " \ 303 "flags: 0x%x)", \ 304 __func__, (ev), (ev)->ev_events, \ 305 EV_SOCK_ARG((ev)->ev_fd), (ev)->ev_flags); \ 306 } \ 307 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \ 308 } \ 309 } while (0) 310 #else 311 #define _event_debug_note_setup(ev) \ 312 ((void)0) 313 #define _event_debug_note_teardown(ev) \ 314 ((void)0) 315 #define _event_debug_note_add(ev) \ 316 ((void)0) 317 #define _event_debug_note_del(ev) \ 318 ((void)0) 319 #define _event_debug_assert_is_setup(ev) \ 320 ((void)0) 321 #define _event_debug_assert_not_added(ev) \ 322 ((void)0) 323 #endif 324 325 #define EVENT_BASE_ASSERT_LOCKED(base) \ 326 EVLOCK_ASSERT_LOCKED((base)->th_base_lock) 327 328 /* The first time this function is called, it sets use_monotonic to 1 329 * if we have a clock function that supports monotonic time */ 330 static void 331 detect_monotonic(void) 332 { 333 #if defined(_EVENT_HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 334 struct timespec ts; 335 static int use_monotonic_initialized = 0; 336 337 if (use_monotonic_initialized) 338 return; 339 340 if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) 341 use_monotonic = 1; 342 343 use_monotonic_initialized = 1; 344 #endif 345 } 346 347 /* How often (in seconds) do we check for changes in wall clock time relative 348 * to monotonic time? Set this to -1 for 'never.' */ 349 #define CLOCK_SYNC_INTERVAL -1 350 351 /** Set 'tp' to the current time according to 'base'. We must hold the lock 352 * on 'base'. If there is a cached time, return it. Otherwise, use 353 * clock_gettime or gettimeofday as appropriate to find out the right time. 354 * Return 0 on success, -1 on failure. 355 */ 356 static int 357 gettime(struct event_base *base, struct timeval *tp) 358 { 359 EVENT_BASE_ASSERT_LOCKED(base); 360 361 if (base->tv_cache.tv_sec) { 362 *tp = base->tv_cache; 363 return (0); 364 } 365 366 #if defined(_EVENT_HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 367 if (use_monotonic) { 368 struct timespec ts; 369 370 if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1) 371 return (-1); 372 373 tp->tv_sec = ts.tv_sec; 374 tp->tv_usec = ts.tv_nsec / 1000; 375 if (base->last_updated_clock_diff + CLOCK_SYNC_INTERVAL 376 < ts.tv_sec) { 377 struct timeval tv; 378 evutil_gettimeofday(&tv,NULL); 379 evutil_timersub(&tv, tp, &base->tv_clock_diff); 380 base->last_updated_clock_diff = ts.tv_sec; 381 } 382 383 return (0); 384 } 385 #endif 386 387 return (evutil_gettimeofday(tp, NULL)); 388 } 389 390 int 391 event_base_gettimeofday_cached(struct event_base *base, struct timeval *tv) 392 { 393 int r; 394 if (!base) { 395 base = current_base; 396 if (!current_base) 397 return evutil_gettimeofday(tv, NULL); 398 } 399 400 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 401 if (base->tv_cache.tv_sec == 0) { 402 r = evutil_gettimeofday(tv, NULL); 403 } else { 404 #if defined(_EVENT_HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 405 evutil_timeradd(&base->tv_cache, &base->tv_clock_diff, tv); 406 #else 407 *tv = base->tv_cache; 408 #endif 409 r = 0; 410 } 411 EVBASE_RELEASE_LOCK(base, th_base_lock); 412 return r; 413 } 414 415 /** Make 'base' have no current cached time. */ 416 static inline void 417 clear_time_cache(struct event_base *base) 418 { 419 base->tv_cache.tv_sec = 0; 420 } 421 422 /** Replace the cached time in 'base' with the current time. */ 423 static inline void 424 update_time_cache(struct event_base *base) 425 { 426 base->tv_cache.tv_sec = 0; 427 if (!(base->flags & EVENT_BASE_FLAG_NO_CACHE_TIME)) 428 gettime(base, &base->tv_cache); 429 } 430 431 struct event_base * 432 event_init(void) 433 { 434 struct event_base *base = event_base_new_with_config(NULL); 435 436 if (base == NULL) { 437 event_errx(1, "%s: Unable to construct event_base", __func__); 438 return NULL; 439 } 440 441 current_base = base; 442 443 return (base); 444 } 445 446 struct event_base * 447 event_base_new(void) 448 { 449 struct event_base *base = NULL; 450 struct event_config *cfg = event_config_new(); 451 if (cfg) { 452 base = event_base_new_with_config(cfg); 453 event_config_free(cfg); 454 } 455 return base; 456 } 457 458 /** Return true iff 'method' is the name of a method that 'cfg' tells us to 459 * avoid. */ 460 static int 461 event_config_is_avoided_method(const struct event_config *cfg, 462 const char *method) 463 { 464 struct event_config_entry *entry; 465 466 TAILQ_FOREACH(entry, &cfg->entries, next) { 467 if (entry->avoid_method != NULL && 468 strcmp(entry->avoid_method, method) == 0) 469 return (1); 470 } 471 472 return (0); 473 } 474 475 /** Return true iff 'method' is disabled according to the environment. */ 476 static int 477 event_is_method_disabled(const char *name) 478 { 479 char environment[64]; 480 int i; 481 482 evutil_snprintf(environment, sizeof(environment), "EVENT_NO%s", name); 483 for (i = 8; environment[i] != '\0'; ++i) 484 environment[i] = EVUTIL_TOUPPER(environment[i]); 485 /* Note that evutil_getenv() ignores the environment entirely if 486 * we're setuid */ 487 return (evutil_getenv(environment) != NULL); 488 } 489 490 int 491 event_base_get_features(const struct event_base *base) 492 { 493 return base->evsel->features; 494 } 495 496 void 497 event_deferred_cb_queue_init(struct deferred_cb_queue *cb) 498 { 499 memset(cb, 0, sizeof(struct deferred_cb_queue)); 500 TAILQ_INIT(&cb->deferred_cb_list); 501 } 502 503 /** Helper for the deferred_cb queue: wake up the event base. */ 504 static void 505 notify_base_cbq_callback(struct deferred_cb_queue *cb, void *baseptr) 506 { 507 struct event_base *base = baseptr; 508 if (EVBASE_NEED_NOTIFY(base)) 509 evthread_notify_base(base); 510 } 511 512 struct deferred_cb_queue * 513 event_base_get_deferred_cb_queue(struct event_base *base) 514 { 515 return base ? &base->defer_queue : NULL; 516 } 517 518 void 519 event_enable_debug_mode(void) 520 { 521 #ifndef _EVENT_DISABLE_DEBUG_MODE 522 if (_event_debug_mode_on) 523 event_errx(1, "%s was called twice!", __func__); 524 if (event_debug_mode_too_late) 525 event_errx(1, "%s must be called *before* creating any events " 526 "or event_bases",__func__); 527 528 _event_debug_mode_on = 1; 529 530 HT_INIT(event_debug_map, &global_debug_map); 531 #endif 532 } 533 534 #if 0 535 void 536 event_disable_debug_mode(void) 537 { 538 struct event_debug_entry **ent, *victim; 539 540 EVLOCK_LOCK(_event_debug_map_lock, 0); 541 for (ent = HT_START(event_debug_map, &global_debug_map); ent; ) { 542 victim = *ent; 543 ent = HT_NEXT_RMV(event_debug_map,&global_debug_map, ent); 544 mm_free(victim); 545 } 546 HT_CLEAR(event_debug_map, &global_debug_map); 547 EVLOCK_UNLOCK(_event_debug_map_lock , 0); 548 } 549 #endif 550 551 struct event_base * 552 event_base_new_with_config(const struct event_config *cfg) 553 { 554 int i; 555 struct event_base *base; 556 int should_check_environment; 557 558 #ifndef _EVENT_DISABLE_DEBUG_MODE 559 event_debug_mode_too_late = 1; 560 #endif 561 562 if ((base = mm_calloc(1, sizeof(struct event_base))) == NULL) { 563 event_warn("%s: calloc", __func__); 564 return NULL; 565 } 566 detect_monotonic(); 567 gettime(base, &base->event_tv); 568 569 min_heap_ctor(&base->timeheap); 570 TAILQ_INIT(&base->eventqueue); 571 base->sig.ev_signal_pair[0] = -1; 572 base->sig.ev_signal_pair[1] = -1; 573 base->th_notify_fd[0] = -1; 574 base->th_notify_fd[1] = -1; 575 576 event_deferred_cb_queue_init(&base->defer_queue); 577 base->defer_queue.notify_fn = notify_base_cbq_callback; 578 base->defer_queue.notify_arg = base; 579 if (cfg) 580 base->flags = cfg->flags; 581 582 evmap_io_initmap(&base->io); 583 evmap_signal_initmap(&base->sigmap); 584 event_changelist_init(&base->changelist); 585 586 base->evbase = NULL; 587 588 should_check_environment = 589 !(cfg && (cfg->flags & EVENT_BASE_FLAG_IGNORE_ENV)); 590 591 for (i = 0; eventops[i] && !base->evbase; i++) { 592 if (cfg != NULL) { 593 /* determine if this backend should be avoided */ 594 if (event_config_is_avoided_method(cfg, 595 eventops[i]->name)) 596 continue; 597 if ((eventops[i]->features & cfg->require_features) 598 != cfg->require_features) 599 continue; 600 } 601 602 /* also obey the environment variables */ 603 if (should_check_environment && 604 event_is_method_disabled(eventops[i]->name)) 605 continue; 606 607 base->evsel = eventops[i]; 608 609 base->evbase = base->evsel->init(base); 610 } 611 612 if (base->evbase == NULL) { 613 event_warnx("%s: no event mechanism available", 614 __func__); 615 base->evsel = NULL; 616 event_base_free(base); 617 return NULL; 618 } 619 620 if (evutil_getenv("EVENT_SHOW_METHOD")) 621 event_msgx("libevent using: %s", base->evsel->name); 622 623 /* allocate a single active event queue */ 624 if (event_base_priority_init(base, 1) < 0) { 625 event_base_free(base); 626 return NULL; 627 } 628 629 /* prepare for threading */ 630 631 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 632 if (EVTHREAD_LOCKING_ENABLED() && 633 (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) { 634 int r; 635 EVTHREAD_ALLOC_LOCK(base->th_base_lock, 636 EVTHREAD_LOCKTYPE_RECURSIVE); 637 base->defer_queue.lock = base->th_base_lock; 638 EVTHREAD_ALLOC_COND(base->current_event_cond); 639 r = evthread_make_base_notifiable(base); 640 if (r<0) { 641 event_warnx("%s: Unable to make base notifiable.", __func__); 642 event_base_free(base); 643 return NULL; 644 } 645 } 646 #endif 647 648 #ifdef WIN32 649 if (cfg && (cfg->flags & EVENT_BASE_FLAG_STARTUP_IOCP)) 650 event_base_start_iocp(base, cfg->n_cpus_hint); 651 #endif 652 653 return (base); 654 } 655 656 int 657 event_base_start_iocp(struct event_base *base, int n_cpus) 658 { 659 #ifdef WIN32 660 if (base->iocp) 661 return 0; 662 base->iocp = event_iocp_port_launch(n_cpus); 663 if (!base->iocp) { 664 event_warnx("%s: Couldn't launch IOCP", __func__); 665 return -1; 666 } 667 return 0; 668 #else 669 return -1; 670 #endif 671 } 672 673 void 674 event_base_stop_iocp(struct event_base *base) 675 { 676 #ifdef WIN32 677 int rv; 678 679 if (!base->iocp) 680 return; 681 rv = event_iocp_shutdown(base->iocp, -1); 682 EVUTIL_ASSERT(rv >= 0); 683 base->iocp = NULL; 684 #endif 685 } 686 687 void 688 event_base_free(struct event_base *base) 689 { 690 int i, n_deleted=0; 691 struct event *ev; 692 /* XXXX grab the lock? If there is contention when one thread frees 693 * the base, then the contending thread will be very sad soon. */ 694 695 /* event_base_free(NULL) is how to free the current_base if we 696 * made it with event_init and forgot to hold a reference to it. */ 697 if (base == NULL && current_base) 698 base = current_base; 699 /* If we're freeing current_base, there won't be a current_base. */ 700 if (base == current_base) 701 current_base = NULL; 702 /* Don't actually free NULL. */ 703 if (base == NULL) { 704 event_warnx("%s: no base to free", __func__); 705 return; 706 } 707 /* XXX(niels) - check for internal events first */ 708 709 #ifdef WIN32 710 event_base_stop_iocp(base); 711 #endif 712 713 /* threading fds if we have them */ 714 if (base->th_notify_fd[0] != -1) { 715 event_del(&base->th_notify); 716 EVUTIL_CLOSESOCKET(base->th_notify_fd[0]); 717 if (base->th_notify_fd[1] != -1) 718 EVUTIL_CLOSESOCKET(base->th_notify_fd[1]); 719 base->th_notify_fd[0] = -1; 720 base->th_notify_fd[1] = -1; 721 event_debug_unassign(&base->th_notify); 722 } 723 724 /* Delete all non-internal events. */ 725 for (ev = TAILQ_FIRST(&base->eventqueue); ev; ) { 726 struct event *next = TAILQ_NEXT(ev, ev_next); 727 if (!(ev->ev_flags & EVLIST_INTERNAL)) { 728 event_del(ev); 729 ++n_deleted; 730 } 731 ev = next; 732 } 733 while ((ev = min_heap_top(&base->timeheap)) != NULL) { 734 event_del(ev); 735 ++n_deleted; 736 } 737 for (i = 0; i < base->n_common_timeouts; ++i) { 738 struct common_timeout_list *ctl = 739 base->common_timeout_queues[i]; 740 event_del(&ctl->timeout_event); /* Internal; doesn't count */ 741 event_debug_unassign(&ctl->timeout_event); 742 for (ev = TAILQ_FIRST(&ctl->events); ev; ) { 743 struct event *next = TAILQ_NEXT(ev, 744 ev_timeout_pos.ev_next_with_common_timeout); 745 if (!(ev->ev_flags & EVLIST_INTERNAL)) { 746 event_del(ev); 747 ++n_deleted; 748 } 749 ev = next; 750 } 751 mm_free(ctl); 752 } 753 if (base->common_timeout_queues) 754 mm_free(base->common_timeout_queues); 755 756 for (i = 0; i < base->nactivequeues; ++i) { 757 for (ev = TAILQ_FIRST(&base->activequeues[i]); ev; ) { 758 struct event *next = TAILQ_NEXT(ev, ev_active_next); 759 if (!(ev->ev_flags & EVLIST_INTERNAL)) { 760 event_del(ev); 761 ++n_deleted; 762 } 763 ev = next; 764 } 765 } 766 767 if (n_deleted) 768 event_debug(("%s: %d events were still set in base", 769 __func__, n_deleted)); 770 771 if (base->evsel != NULL && base->evsel->dealloc != NULL) 772 base->evsel->dealloc(base); 773 774 for (i = 0; i < base->nactivequeues; ++i) 775 EVUTIL_ASSERT(TAILQ_EMPTY(&base->activequeues[i])); 776 777 EVUTIL_ASSERT(min_heap_empty(&base->timeheap)); 778 min_heap_dtor(&base->timeheap); 779 780 mm_free(base->activequeues); 781 782 EVUTIL_ASSERT(TAILQ_EMPTY(&base->eventqueue)); 783 784 evmap_io_clear(&base->io); 785 evmap_signal_clear(&base->sigmap); 786 event_changelist_freemem(&base->changelist); 787 788 EVTHREAD_FREE_LOCK(base->th_base_lock, EVTHREAD_LOCKTYPE_RECURSIVE); 789 EVTHREAD_FREE_COND(base->current_event_cond); 790 791 mm_free(base); 792 } 793 794 /* reinitialize the event base after a fork */ 795 int 796 event_reinit(struct event_base *base) 797 { 798 const struct eventop *evsel; 799 int res = 0; 800 struct event *ev; 801 int was_notifiable = 0; 802 803 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 804 805 evsel = base->evsel; 806 807 #if 0 808 /* Right now, reinit always takes effect, since even if the 809 backend doesn't require it, the signal socketpair code does. 810 811 XXX 812 */ 813 /* check if this event mechanism requires reinit */ 814 if (!evsel->need_reinit) 815 goto done; 816 #endif 817 818 /* prevent internal delete */ 819 if (base->sig.ev_signal_added) { 820 /* we cannot call event_del here because the base has 821 * not been reinitialized yet. */ 822 event_queue_remove(base, &base->sig.ev_signal, 823 EVLIST_INSERTED); 824 if (base->sig.ev_signal.ev_flags & EVLIST_ACTIVE) 825 event_queue_remove(base, &base->sig.ev_signal, 826 EVLIST_ACTIVE); 827 if (base->sig.ev_signal_pair[0] != -1) 828 EVUTIL_CLOSESOCKET(base->sig.ev_signal_pair[0]); 829 if (base->sig.ev_signal_pair[1] != -1) 830 EVUTIL_CLOSESOCKET(base->sig.ev_signal_pair[1]); 831 base->sig.ev_signal_added = 0; 832 } 833 if (base->th_notify_fd[0] != -1) { 834 /* we cannot call event_del here because the base has 835 * not been reinitialized yet. */ 836 was_notifiable = 1; 837 event_queue_remove(base, &base->th_notify, 838 EVLIST_INSERTED); 839 if (base->th_notify.ev_flags & EVLIST_ACTIVE) 840 event_queue_remove(base, &base->th_notify, 841 EVLIST_ACTIVE); 842 base->sig.ev_signal_added = 0; 843 EVUTIL_CLOSESOCKET(base->th_notify_fd[0]); 844 if (base->th_notify_fd[1] != -1) 845 EVUTIL_CLOSESOCKET(base->th_notify_fd[1]); 846 base->th_notify_fd[0] = -1; 847 base->th_notify_fd[1] = -1; 848 event_debug_unassign(&base->th_notify); 849 } 850 851 if (base->evsel->dealloc != NULL) 852 base->evsel->dealloc(base); 853 base->evbase = evsel->init(base); 854 if (base->evbase == NULL) { 855 event_errx(1, "%s: could not reinitialize event mechanism", 856 __func__); 857 res = -1; 858 goto done; 859 } 860 861 event_changelist_freemem(&base->changelist); /* XXX */ 862 evmap_io_clear(&base->io); 863 evmap_signal_clear(&base->sigmap); 864 865 TAILQ_FOREACH(ev, &base->eventqueue, ev_next) { 866 if (ev->ev_events & (EV_READ|EV_WRITE)) { 867 if (ev == &base->sig.ev_signal) { 868 /* If we run into the ev_signal event, it's only 869 * in eventqueue because some signal event was 870 * added, which made evsig_add re-add ev_signal. 871 * So don't double-add it. */ 872 continue; 873 } 874 if (evmap_io_add(base, ev->ev_fd, ev) == -1) 875 res = -1; 876 } else if (ev->ev_events & EV_SIGNAL) { 877 if (evmap_signal_add(base, (int)ev->ev_fd, ev) == -1) 878 res = -1; 879 } 880 } 881 882 if (was_notifiable && res == 0) 883 res = evthread_make_base_notifiable(base); 884 885 done: 886 EVBASE_RELEASE_LOCK(base, th_base_lock); 887 return (res); 888 } 889 890 const char ** 891 event_get_supported_methods(void) 892 { 893 static const char **methods = NULL; 894 const struct eventop **method; 895 const char **tmp; 896 int i = 0, k; 897 898 /* count all methods */ 899 for (method = &eventops[0]; *method != NULL; ++method) { 900 ++i; 901 } 902 903 /* allocate one more than we need for the NULL pointer */ 904 tmp = mm_calloc((i + 1), sizeof(char *)); 905 if (tmp == NULL) 906 return (NULL); 907 908 /* populate the array with the supported methods */ 909 for (k = 0, i = 0; eventops[k] != NULL; ++k) { 910 tmp[i++] = eventops[k]->name; 911 } 912 tmp[i] = NULL; 913 914 if (methods != NULL) 915 mm_free((char**)methods); 916 917 methods = tmp; 918 919 return (methods); 920 } 921 922 struct event_config * 923 event_config_new(void) 924 { 925 struct event_config *cfg = mm_calloc(1, sizeof(*cfg)); 926 927 if (cfg == NULL) 928 return (NULL); 929 930 TAILQ_INIT(&cfg->entries); 931 932 return (cfg); 933 } 934 935 static void 936 event_config_entry_free(struct event_config_entry *entry) 937 { 938 if (entry->avoid_method != NULL) 939 mm_free((char *)entry->avoid_method); 940 mm_free(entry); 941 } 942 943 void 944 event_config_free(struct event_config *cfg) 945 { 946 struct event_config_entry *entry; 947 948 while ((entry = TAILQ_FIRST(&cfg->entries)) != NULL) { 949 TAILQ_REMOVE(&cfg->entries, entry, next); 950 event_config_entry_free(entry); 951 } 952 mm_free(cfg); 953 } 954 955 int 956 event_config_set_flag(struct event_config *cfg, int flag) 957 { 958 if (!cfg) 959 return -1; 960 cfg->flags |= flag; 961 return 0; 962 } 963 964 int 965 event_config_avoid_method(struct event_config *cfg, const char *method) 966 { 967 struct event_config_entry *entry = mm_malloc(sizeof(*entry)); 968 if (entry == NULL) 969 return (-1); 970 971 if ((entry->avoid_method = mm_strdup(method)) == NULL) { 972 mm_free(entry); 973 return (-1); 974 } 975 976 TAILQ_INSERT_TAIL(&cfg->entries, entry, next); 977 978 return (0); 979 } 980 981 int 982 event_config_require_features(struct event_config *cfg, 983 int features) 984 { 985 if (!cfg) 986 return (-1); 987 cfg->require_features = features; 988 return (0); 989 } 990 991 int 992 event_config_set_num_cpus_hint(struct event_config *cfg, int cpus) 993 { 994 if (!cfg) 995 return (-1); 996 cfg->n_cpus_hint = cpus; 997 return (0); 998 } 999 1000 int 1001 event_priority_init(int npriorities) 1002 { 1003 return event_base_priority_init(current_base, npriorities); 1004 } 1005 1006 int 1007 event_base_priority_init(struct event_base *base, int npriorities) 1008 { 1009 int i; 1010 1011 if (N_ACTIVE_CALLBACKS(base) || npriorities < 1 1012 || npriorities >= EVENT_MAX_PRIORITIES) 1013 return (-1); 1014 1015 if (npriorities == base->nactivequeues) 1016 return (0); 1017 1018 if (base->nactivequeues) { 1019 mm_free(base->activequeues); 1020 base->nactivequeues = 0; 1021 } 1022 1023 /* Allocate our priority queues */ 1024 base->activequeues = (struct event_list *) 1025 mm_calloc(npriorities, sizeof(struct event_list)); 1026 if (base->activequeues == NULL) { 1027 event_warn("%s: calloc", __func__); 1028 return (-1); 1029 } 1030 base->nactivequeues = npriorities; 1031 1032 for (i = 0; i < base->nactivequeues; ++i) { 1033 TAILQ_INIT(&base->activequeues[i]); 1034 } 1035 1036 return (0); 1037 } 1038 1039 /* Returns true iff we're currently watching any events. */ 1040 static int 1041 event_haveevents(struct event_base *base) 1042 { 1043 /* Caller must hold th_base_lock */ 1044 return (base->virtual_event_count > 0 || base->event_count > 0); 1045 } 1046 1047 /* "closure" function called when processing active signal events */ 1048 static inline void 1049 event_signal_closure(struct event_base *base, struct event *ev) 1050 { 1051 short ncalls; 1052 int should_break; 1053 1054 /* Allows deletes to work */ 1055 ncalls = ev->ev_ncalls; 1056 if (ncalls != 0) 1057 ev->ev_pncalls = &ncalls; 1058 EVBASE_RELEASE_LOCK(base, th_base_lock); 1059 while (ncalls) { 1060 ncalls--; 1061 ev->ev_ncalls = ncalls; 1062 if (ncalls == 0) 1063 ev->ev_pncalls = NULL; 1064 (*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg); 1065 1066 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 1067 should_break = base->event_break; 1068 EVBASE_RELEASE_LOCK(base, th_base_lock); 1069 1070 if (should_break) { 1071 if (ncalls != 0) 1072 ev->ev_pncalls = NULL; 1073 return; 1074 } 1075 } 1076 } 1077 1078 /* Common timeouts are special timeouts that are handled as queues rather than 1079 * in the minheap. This is more efficient than the minheap if we happen to 1080 * know that we're going to get several thousands of timeout events all with 1081 * the same timeout value. 1082 * 1083 * Since all our timeout handling code assumes timevals can be copied, 1084 * assigned, etc, we can't use "magic pointer" to encode these common 1085 * timeouts. Searching through a list to see if every timeout is common could 1086 * also get inefficient. Instead, we take advantage of the fact that tv_usec 1087 * is 32 bits long, but only uses 20 of those bits (since it can never be over 1088 * 999999.) We use the top bits to encode 4 bites of magic number, and 8 bits 1089 * of index into the event_base's aray of common timeouts. 1090 */ 1091 1092 #define MICROSECONDS_MASK COMMON_TIMEOUT_MICROSECONDS_MASK 1093 #define COMMON_TIMEOUT_IDX_MASK 0x0ff00000 1094 #define COMMON_TIMEOUT_IDX_SHIFT 20 1095 #define COMMON_TIMEOUT_MASK 0xf0000000 1096 #define COMMON_TIMEOUT_MAGIC 0x50000000 1097 1098 #define COMMON_TIMEOUT_IDX(tv) \ 1099 (((tv)->tv_usec & COMMON_TIMEOUT_IDX_MASK)>>COMMON_TIMEOUT_IDX_SHIFT) 1100 1101 /** Return true iff if 'tv' is a common timeout in 'base' */ 1102 static inline int 1103 is_common_timeout(const struct timeval *tv, 1104 const struct event_base *base) 1105 { 1106 int idx; 1107 if ((tv->tv_usec & COMMON_TIMEOUT_MASK) != COMMON_TIMEOUT_MAGIC) 1108 return 0; 1109 idx = COMMON_TIMEOUT_IDX(tv); 1110 return idx < base->n_common_timeouts; 1111 } 1112 1113 /* True iff tv1 and tv2 have the same common-timeout index, or if neither 1114 * one is a common timeout. */ 1115 static inline int 1116 is_same_common_timeout(const struct timeval *tv1, const struct timeval *tv2) 1117 { 1118 return (tv1->tv_usec & ~MICROSECONDS_MASK) == 1119 (tv2->tv_usec & ~MICROSECONDS_MASK); 1120 } 1121 1122 /** Requires that 'tv' is a common timeout. Return the corresponding 1123 * common_timeout_list. */ 1124 static inline struct common_timeout_list * 1125 get_common_timeout_list(struct event_base *base, const struct timeval *tv) 1126 { 1127 return base->common_timeout_queues[COMMON_TIMEOUT_IDX(tv)]; 1128 } 1129 1130 #if 0 1131 static inline int 1132 common_timeout_ok(const struct timeval *tv, 1133 struct event_base *base) 1134 { 1135 const struct timeval *expect = 1136 &get_common_timeout_list(base, tv)->duration; 1137 return tv->tv_sec == expect->tv_sec && 1138 tv->tv_usec == expect->tv_usec; 1139 } 1140 #endif 1141 1142 /* Add the timeout for the first event in given common timeout list to the 1143 * event_base's minheap. */ 1144 static void 1145 common_timeout_schedule(struct common_timeout_list *ctl, 1146 const struct timeval *now, struct event *head) 1147 { 1148 struct timeval timeout = head->ev_timeout; 1149 timeout.tv_usec &= MICROSECONDS_MASK; 1150 event_add_internal(&ctl->timeout_event, &timeout, 1); 1151 } 1152 1153 /* Callback: invoked when the timeout for a common timeout queue triggers. 1154 * This means that (at least) the first event in that queue should be run, 1155 * and the timeout should be rescheduled if there are more events. */ 1156 static void 1157 common_timeout_callback(evutil_socket_t fd, short what, void *arg) 1158 { 1159 struct timeval now; 1160 struct common_timeout_list *ctl = arg; 1161 struct event_base *base = ctl->base; 1162 struct event *ev = NULL; 1163 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 1164 gettime(base, &now); 1165 while (1) { 1166 ev = TAILQ_FIRST(&ctl->events); 1167 if (!ev || ev->ev_timeout.tv_sec > now.tv_sec || 1168 (ev->ev_timeout.tv_sec == now.tv_sec && 1169 (ev->ev_timeout.tv_usec&MICROSECONDS_MASK) > now.tv_usec)) 1170 break; 1171 event_del_internal(ev); 1172 event_active_nolock(ev, EV_TIMEOUT, 1); 1173 } 1174 if (ev) 1175 common_timeout_schedule(ctl, &now, ev); 1176 EVBASE_RELEASE_LOCK(base, th_base_lock); 1177 } 1178 1179 #define MAX_COMMON_TIMEOUTS 256 1180 1181 const struct timeval * 1182 event_base_init_common_timeout(struct event_base *base, 1183 const struct timeval *duration) 1184 { 1185 int i; 1186 struct timeval tv; 1187 const struct timeval *result=NULL; 1188 struct common_timeout_list *new_ctl; 1189 1190 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 1191 if (duration->tv_usec > 1000000) { 1192 memcpy(&tv, duration, sizeof(struct timeval)); 1193 if (is_common_timeout(duration, base)) 1194 tv.tv_usec &= MICROSECONDS_MASK; 1195 tv.tv_sec += tv.tv_usec / 1000000; 1196 tv.tv_usec %= 1000000; 1197 duration = &tv; 1198 } 1199 for (i = 0; i < base->n_common_timeouts; ++i) { 1200 const struct common_timeout_list *ctl = 1201 base->common_timeout_queues[i]; 1202 if (duration->tv_sec == ctl->duration.tv_sec && 1203 duration->tv_usec == 1204 (ctl->duration.tv_usec & MICROSECONDS_MASK)) { 1205 EVUTIL_ASSERT(is_common_timeout(&ctl->duration, base)); 1206 result = &ctl->duration; 1207 goto done; 1208 } 1209 } 1210 if (base->n_common_timeouts == MAX_COMMON_TIMEOUTS) { 1211 event_warnx("%s: Too many common timeouts already in use; " 1212 "we only support %d per event_base", __func__, 1213 MAX_COMMON_TIMEOUTS); 1214 goto done; 1215 } 1216 if (base->n_common_timeouts_allocated == base->n_common_timeouts) { 1217 int n = base->n_common_timeouts < 16 ? 16 : 1218 base->n_common_timeouts*2; 1219 struct common_timeout_list **newqueues = 1220 mm_realloc(base->common_timeout_queues, 1221 n*sizeof(struct common_timeout_queue *)); 1222 if (!newqueues) { 1223 event_warn("%s: realloc",__func__); 1224 goto done; 1225 } 1226 base->n_common_timeouts_allocated = n; 1227 base->common_timeout_queues = newqueues; 1228 } 1229 new_ctl = mm_calloc(1, sizeof(struct common_timeout_list)); 1230 if (!new_ctl) { 1231 event_warn("%s: calloc",__func__); 1232 goto done; 1233 } 1234 TAILQ_INIT(&new_ctl->events); 1235 new_ctl->duration.tv_sec = duration->tv_sec; 1236 new_ctl->duration.tv_usec = 1237 duration->tv_usec | COMMON_TIMEOUT_MAGIC | 1238 (base->n_common_timeouts << COMMON_TIMEOUT_IDX_SHIFT); 1239 evtimer_assign(&new_ctl->timeout_event, base, 1240 common_timeout_callback, new_ctl); 1241 new_ctl->timeout_event.ev_flags |= EVLIST_INTERNAL; 1242 event_priority_set(&new_ctl->timeout_event, 0); 1243 new_ctl->base = base; 1244 base->common_timeout_queues[base->n_common_timeouts++] = new_ctl; 1245 result = &new_ctl->duration; 1246 1247 done: 1248 if (result) 1249 EVUTIL_ASSERT(is_common_timeout(result, base)); 1250 1251 EVBASE_RELEASE_LOCK(base, th_base_lock); 1252 return result; 1253 } 1254 1255 /* Closure function invoked when we're activating a persistent event. */ 1256 static inline void 1257 event_persist_closure(struct event_base *base, struct event *ev) 1258 { 1259 // Define our callback, we use this to store our callback before it's executed 1260 void (*evcb_callback)(evutil_socket_t, short, void *); 1261 1262 // Other fields of *ev that must be stored before executing 1263 evutil_socket_t evcb_fd; 1264 short evcb_res; 1265 void *evcb_arg; 1266 1267 /* reschedule the persistent event if we have a timeout. */ 1268 if (ev->ev_io_timeout.tv_sec || ev->ev_io_timeout.tv_usec) { 1269 /* If there was a timeout, we want it to run at an interval of 1270 * ev_io_timeout after the last time it was _scheduled_ for, 1271 * not ev_io_timeout after _now_. If it fired for another 1272 * reason, though, the timeout ought to start ticking _now_. */ 1273 struct timeval run_at, relative_to, delay, now; 1274 ev_uint32_t usec_mask = 0; 1275 EVUTIL_ASSERT(is_same_common_timeout(&ev->ev_timeout, 1276 &ev->ev_io_timeout)); 1277 gettime(base, &now); 1278 if (is_common_timeout(&ev->ev_timeout, base)) { 1279 delay = ev->ev_io_timeout; 1280 usec_mask = delay.tv_usec & ~MICROSECONDS_MASK; 1281 delay.tv_usec &= MICROSECONDS_MASK; 1282 if (ev->ev_res & EV_TIMEOUT) { 1283 relative_to = ev->ev_timeout; 1284 relative_to.tv_usec &= MICROSECONDS_MASK; 1285 } else { 1286 relative_to = now; 1287 } 1288 } else { 1289 delay = ev->ev_io_timeout; 1290 if (ev->ev_res & EV_TIMEOUT) { 1291 relative_to = ev->ev_timeout; 1292 } else { 1293 relative_to = now; 1294 } 1295 } 1296 evutil_timeradd(&relative_to, &delay, &run_at); 1297 if (evutil_timercmp(&run_at, &now, <)) { 1298 /* Looks like we missed at least one invocation due to 1299 * a clock jump, not running the event loop for a 1300 * while, really slow callbacks, or 1301 * something. Reschedule relative to now. 1302 */ 1303 evutil_timeradd(&now, &delay, &run_at); 1304 } 1305 run_at.tv_usec |= usec_mask; 1306 event_add_internal(ev, &run_at, 1); 1307 } 1308 1309 // Save our callback before we release the lock 1310 evcb_callback = ev->ev_callback; 1311 evcb_fd = ev->ev_fd; 1312 evcb_res = ev->ev_res; 1313 evcb_arg = ev->ev_arg; 1314 1315 // Release the lock 1316 EVBASE_RELEASE_LOCK(base, th_base_lock); 1317 1318 // Execute the callback 1319 (evcb_callback)(evcb_fd, evcb_res, evcb_arg); 1320 } 1321 1322 /* 1323 Helper for event_process_active to process all the events in a single queue, 1324 releasing the lock as we go. This function requires that the lock be held 1325 when it's invoked. Returns -1 if we get a signal or an event_break that 1326 means we should stop processing any active events now. Otherwise returns 1327 the number of non-internal events that we processed. 1328 */ 1329 static int 1330 event_process_active_single_queue(struct event_base *base, 1331 struct event_list *activeq) 1332 { 1333 struct event *ev; 1334 int count = 0; 1335 1336 EVUTIL_ASSERT(activeq != NULL); 1337 1338 for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) { 1339 if (ev->ev_events & EV_PERSIST) 1340 event_queue_remove(base, ev, EVLIST_ACTIVE); 1341 else 1342 event_del_internal(ev); 1343 if (!(ev->ev_flags & EVLIST_INTERNAL)) 1344 ++count; 1345 1346 event_debug(( 1347 "event_process_active: event: %p, %s%scall %p", 1348 ev, 1349 ev->ev_res & EV_READ ? "EV_READ " : " ", 1350 ev->ev_res & EV_WRITE ? "EV_WRITE " : " ", 1351 ev->ev_callback)); 1352 1353 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 1354 base->current_event = ev; 1355 base->current_event_waiters = 0; 1356 #endif 1357 1358 switch (ev->ev_closure) { 1359 case EV_CLOSURE_SIGNAL: 1360 event_signal_closure(base, ev); 1361 break; 1362 case EV_CLOSURE_PERSIST: 1363 event_persist_closure(base, ev); 1364 break; 1365 default: 1366 case EV_CLOSURE_NONE: 1367 EVBASE_RELEASE_LOCK(base, th_base_lock); 1368 (*ev->ev_callback)( 1369 ev->ev_fd, ev->ev_res, ev->ev_arg); 1370 break; 1371 } 1372 1373 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 1374 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 1375 base->current_event = NULL; 1376 if (base->current_event_waiters) { 1377 base->current_event_waiters = 0; 1378 EVTHREAD_COND_BROADCAST(base->current_event_cond); 1379 } 1380 #endif 1381 1382 if (base->event_break) 1383 return -1; 1384 if (base->event_continue) 1385 break; 1386 } 1387 return count; 1388 } 1389 1390 /* 1391 Process up to MAX_DEFERRED of the defered_cb entries in 'queue'. If 1392 *breakptr becomes set to 1, stop. Requires that we start out holding 1393 the lock on 'queue'; releases the lock around 'queue' for each deferred_cb 1394 we process. 1395 */ 1396 static int 1397 event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr) 1398 { 1399 int count = 0; 1400 struct deferred_cb *cb; 1401 1402 #define MAX_DEFERRED 16 1403 while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) { 1404 cb->queued = 0; 1405 TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next); 1406 --queue->active_count; 1407 UNLOCK_DEFERRED_QUEUE(queue); 1408 1409 cb->cb(cb, cb->arg); 1410 1411 LOCK_DEFERRED_QUEUE(queue); 1412 if (*breakptr) 1413 return -1; 1414 if (++count == MAX_DEFERRED) 1415 break; 1416 } 1417 #undef MAX_DEFERRED 1418 return count; 1419 } 1420 1421 /* 1422 * Active events are stored in priority queues. Lower priorities are always 1423 * process before higher priorities. Low priority events can starve high 1424 * priority ones. 1425 */ 1426 1427 static int 1428 event_process_active(struct event_base *base) 1429 { 1430 /* Caller must hold th_base_lock */ 1431 struct event_list *activeq = NULL; 1432 int i, c = 0; 1433 1434 for (i = 0; i < base->nactivequeues; ++i) { 1435 if (TAILQ_FIRST(&base->activequeues[i]) != NULL) { 1436 base->event_running_priority = i; 1437 activeq = &base->activequeues[i]; 1438 c = event_process_active_single_queue(base, activeq); 1439 if (c < 0) { 1440 base->event_running_priority = -1; 1441 return -1; 1442 } else if (c > 0) 1443 break; /* Processed a real event; do not 1444 * consider lower-priority events */ 1445 /* If we get here, all of the events we processed 1446 * were internal. Continue. */ 1447 } 1448 } 1449 1450 event_process_deferred_callbacks(&base->defer_queue,&base->event_break); 1451 base->event_running_priority = -1; 1452 return c; 1453 } 1454 1455 /* 1456 * Wait continuously for events. We exit only if no events are left. 1457 */ 1458 1459 int 1460 event_dispatch(void) 1461 { 1462 return (event_loop(0)); 1463 } 1464 1465 int 1466 event_base_dispatch(struct event_base *event_base) 1467 { 1468 return (event_base_loop(event_base, 0)); 1469 } 1470 1471 const char * 1472 event_base_get_method(const struct event_base *base) 1473 { 1474 EVUTIL_ASSERT(base); 1475 return (base->evsel->name); 1476 } 1477 1478 /** Callback: used to implement event_base_loopexit by telling the event_base 1479 * that it's time to exit its loop. */ 1480 static void 1481 event_loopexit_cb(evutil_socket_t fd, short what, void *arg) 1482 { 1483 struct event_base *base = arg; 1484 base->event_gotterm = 1; 1485 } 1486 1487 int 1488 event_loopexit(const struct timeval *tv) 1489 { 1490 return (event_once(-1, EV_TIMEOUT, event_loopexit_cb, 1491 current_base, tv)); 1492 } 1493 1494 int 1495 event_base_loopexit(struct event_base *event_base, const struct timeval *tv) 1496 { 1497 return (event_base_once(event_base, -1, EV_TIMEOUT, event_loopexit_cb, 1498 event_base, tv)); 1499 } 1500 1501 int 1502 event_loopbreak(void) 1503 { 1504 return (event_base_loopbreak(current_base)); 1505 } 1506 1507 int 1508 event_base_loopbreak(struct event_base *event_base) 1509 { 1510 int r = 0; 1511 if (event_base == NULL) 1512 return (-1); 1513 1514 EVBASE_ACQUIRE_LOCK(event_base, th_base_lock); 1515 event_base->event_break = 1; 1516 1517 if (EVBASE_NEED_NOTIFY(event_base)) { 1518 r = evthread_notify_base(event_base); 1519 } else { 1520 r = (0); 1521 } 1522 EVBASE_RELEASE_LOCK(event_base, th_base_lock); 1523 return r; 1524 } 1525 1526 int 1527 event_base_got_break(struct event_base *event_base) 1528 { 1529 int res; 1530 EVBASE_ACQUIRE_LOCK(event_base, th_base_lock); 1531 res = event_base->event_break; 1532 EVBASE_RELEASE_LOCK(event_base, th_base_lock); 1533 return res; 1534 } 1535 1536 int 1537 event_base_got_exit(struct event_base *event_base) 1538 { 1539 int res; 1540 EVBASE_ACQUIRE_LOCK(event_base, th_base_lock); 1541 res = event_base->event_gotterm; 1542 EVBASE_RELEASE_LOCK(event_base, th_base_lock); 1543 return res; 1544 } 1545 1546 /* not thread safe */ 1547 1548 int 1549 event_loop(int flags) 1550 { 1551 return event_base_loop(current_base, flags); 1552 } 1553 1554 int 1555 event_base_loop(struct event_base *base, int flags) 1556 { 1557 const struct eventop *evsel = base->evsel; 1558 struct timeval tv; 1559 struct timeval *tv_p; 1560 int res, done, retval = 0; 1561 1562 /* Grab the lock. We will release it inside evsel.dispatch, and again 1563 * as we invoke user callbacks. */ 1564 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 1565 1566 if (base->running_loop) { 1567 event_warnx("%s: reentrant invocation. Only one event_base_loop" 1568 " can run on each event_base at once.", __func__); 1569 EVBASE_RELEASE_LOCK(base, th_base_lock); 1570 return -1; 1571 } 1572 1573 base->running_loop = 1; 1574 1575 clear_time_cache(base); 1576 1577 if (base->sig.ev_signal_added && base->sig.ev_n_signals_added) 1578 evsig_set_base(base); 1579 1580 done = 0; 1581 1582 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 1583 base->th_owner_id = EVTHREAD_GET_ID(); 1584 #endif 1585 1586 base->event_gotterm = base->event_break = 0; 1587 1588 while (!done) { 1589 base->event_continue = 0; 1590 1591 /* Terminate the loop if we have been asked to */ 1592 if (base->event_gotterm) { 1593 break; 1594 } 1595 1596 if (base->event_break) { 1597 break; 1598 } 1599 1600 timeout_correct(base, &tv); 1601 1602 tv_p = &tv; 1603 if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) { 1604 timeout_next(base, &tv_p); 1605 } else { 1606 /* 1607 * if we have active events, we just poll new events 1608 * without waiting. 1609 */ 1610 evutil_timerclear(&tv); 1611 } 1612 1613 /* If we have no events, we just exit */ 1614 if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) { 1615 event_debug(("%s: no events registered.", __func__)); 1616 retval = 1; 1617 goto done; 1618 } 1619 1620 /* update last old time */ 1621 gettime(base, &base->event_tv); 1622 1623 clear_time_cache(base); 1624 1625 res = evsel->dispatch(base, tv_p); 1626 1627 if (res == -1) { 1628 event_debug(("%s: dispatch returned unsuccessfully.", 1629 __func__)); 1630 retval = -1; 1631 goto done; 1632 } 1633 1634 update_time_cache(base); 1635 1636 timeout_process(base); 1637 1638 if (N_ACTIVE_CALLBACKS(base)) { 1639 int n = event_process_active(base); 1640 if ((flags & EVLOOP_ONCE) 1641 && N_ACTIVE_CALLBACKS(base) == 0 1642 && n != 0) 1643 done = 1; 1644 } else if (flags & EVLOOP_NONBLOCK) 1645 done = 1; 1646 } 1647 event_debug(("%s: asked to terminate loop.", __func__)); 1648 1649 done: 1650 clear_time_cache(base); 1651 base->running_loop = 0; 1652 1653 EVBASE_RELEASE_LOCK(base, th_base_lock); 1654 1655 return (retval); 1656 } 1657 1658 /* Sets up an event for processing once */ 1659 struct event_once { 1660 struct event ev; 1661 1662 void (*cb)(evutil_socket_t, short, void *); 1663 void *arg; 1664 }; 1665 1666 /* One-time callback to implement event_base_once: invokes the user callback, 1667 * then deletes the allocated storage */ 1668 static void 1669 event_once_cb(evutil_socket_t fd, short events, void *arg) 1670 { 1671 struct event_once *eonce = arg; 1672 1673 (*eonce->cb)(fd, events, eonce->arg); 1674 event_debug_unassign(&eonce->ev); 1675 mm_free(eonce); 1676 } 1677 1678 /* not threadsafe, event scheduled once. */ 1679 int 1680 event_once(evutil_socket_t fd, short events, 1681 void (*callback)(evutil_socket_t, short, void *), 1682 void *arg, const struct timeval *tv) 1683 { 1684 return event_base_once(current_base, fd, events, callback, arg, tv); 1685 } 1686 1687 /* Schedules an event once */ 1688 int 1689 event_base_once(struct event_base *base, evutil_socket_t fd, short events, 1690 void (*callback)(evutil_socket_t, short, void *), 1691 void *arg, const struct timeval *tv) 1692 { 1693 struct event_once *eonce; 1694 struct timeval etv; 1695 int res = 0; 1696 1697 /* We cannot support signals that just fire once, or persistent 1698 * events. */ 1699 if (events & (EV_SIGNAL|EV_PERSIST)) 1700 return (-1); 1701 1702 if ((eonce = mm_calloc(1, sizeof(struct event_once))) == NULL) 1703 return (-1); 1704 1705 eonce->cb = callback; 1706 eonce->arg = arg; 1707 1708 if (events == EV_TIMEOUT) { 1709 if (tv == NULL) { 1710 evutil_timerclear(&etv); 1711 tv = &etv; 1712 } 1713 1714 evtimer_assign(&eonce->ev, base, event_once_cb, eonce); 1715 } else if (events & (EV_READ|EV_WRITE)) { 1716 events &= EV_READ|EV_WRITE; 1717 1718 event_assign(&eonce->ev, base, fd, events, event_once_cb, eonce); 1719 } else { 1720 /* Bad event combination */ 1721 mm_free(eonce); 1722 return (-1); 1723 } 1724 1725 if (res == 0) 1726 res = event_add(&eonce->ev, tv); 1727 if (res != 0) { 1728 mm_free(eonce); 1729 return (res); 1730 } 1731 1732 return (0); 1733 } 1734 1735 int 1736 event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, void (*callback)(evutil_socket_t, short, void *), void *arg) 1737 { 1738 if (!base) 1739 base = current_base; 1740 1741 _event_debug_assert_not_added(ev); 1742 1743 ev->ev_base = base; 1744 1745 ev->ev_callback = callback; 1746 ev->ev_arg = arg; 1747 ev->ev_fd = fd; 1748 ev->ev_events = events; 1749 ev->ev_res = 0; 1750 ev->ev_flags = EVLIST_INIT; 1751 ev->ev_ncalls = 0; 1752 ev->ev_pncalls = NULL; 1753 1754 if (events & EV_SIGNAL) { 1755 if ((events & (EV_READ|EV_WRITE)) != 0) { 1756 event_warnx("%s: EV_SIGNAL is not compatible with " 1757 "EV_READ or EV_WRITE", __func__); 1758 return -1; 1759 } 1760 ev->ev_closure = EV_CLOSURE_SIGNAL; 1761 } else { 1762 if (events & EV_PERSIST) { 1763 evutil_timerclear(&ev->ev_io_timeout); 1764 ev->ev_closure = EV_CLOSURE_PERSIST; 1765 } else { 1766 ev->ev_closure = EV_CLOSURE_NONE; 1767 } 1768 } 1769 1770 min_heap_elem_init(ev); 1771 1772 if (base != NULL) { 1773 /* by default, we put new events into the middle priority */ 1774 ev->ev_pri = base->nactivequeues / 2; 1775 } 1776 1777 _event_debug_note_setup(ev); 1778 1779 return 0; 1780 } 1781 1782 int 1783 event_base_set(struct event_base *base, struct event *ev) 1784 { 1785 /* Only innocent events may be assigned to a different base */ 1786 if (ev->ev_flags != EVLIST_INIT) 1787 return (-1); 1788 1789 _event_debug_assert_is_setup(ev); 1790 1791 ev->ev_base = base; 1792 ev->ev_pri = base->nactivequeues/2; 1793 1794 return (0); 1795 } 1796 1797 void 1798 event_set(struct event *ev, evutil_socket_t fd, short events, 1799 void (*callback)(evutil_socket_t, short, void *), void *arg) 1800 { 1801 int r; 1802 r = event_assign(ev, current_base, fd, events, callback, arg); 1803 EVUTIL_ASSERT(r == 0); 1804 } 1805 1806 struct event * 1807 event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg) 1808 { 1809 struct event *ev; 1810 ev = mm_malloc(sizeof(struct event)); 1811 if (ev == NULL) 1812 return (NULL); 1813 if (event_assign(ev, base, fd, events, cb, arg) < 0) { 1814 mm_free(ev); 1815 return (NULL); 1816 } 1817 1818 return (ev); 1819 } 1820 1821 void 1822 event_free(struct event *ev) 1823 { 1824 _event_debug_assert_is_setup(ev); 1825 1826 /* make sure that this event won't be coming back to haunt us. */ 1827 event_del(ev); 1828 _event_debug_note_teardown(ev); 1829 mm_free(ev); 1830 1831 } 1832 1833 void 1834 event_debug_unassign(struct event *ev) 1835 { 1836 _event_debug_assert_not_added(ev); 1837 _event_debug_note_teardown(ev); 1838 1839 ev->ev_flags &= ~EVLIST_INIT; 1840 } 1841 1842 /* 1843 * Set's the priority of an event - if an event is already scheduled 1844 * changing the priority is going to fail. 1845 */ 1846 1847 int 1848 event_priority_set(struct event *ev, int pri) 1849 { 1850 _event_debug_assert_is_setup(ev); 1851 1852 if (ev->ev_flags & EVLIST_ACTIVE) 1853 return (-1); 1854 if (pri < 0 || pri >= ev->ev_base->nactivequeues) 1855 return (-1); 1856 1857 ev->ev_pri = pri; 1858 1859 return (0); 1860 } 1861 1862 /* 1863 * Checks if a specific event is pending or scheduled. 1864 */ 1865 1866 int 1867 event_pending(const struct event *ev, short event, struct timeval *tv) 1868 { 1869 int flags = 0; 1870 1871 if (EVUTIL_FAILURE_CHECK(ev->ev_base == NULL)) { 1872 event_warnx("%s: event has no event_base set.", __func__); 1873 return 0; 1874 } 1875 1876 EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock); 1877 _event_debug_assert_is_setup(ev); 1878 1879 if (ev->ev_flags & EVLIST_INSERTED) 1880 flags |= (ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)); 1881 if (ev->ev_flags & EVLIST_ACTIVE) 1882 flags |= ev->ev_res; 1883 if (ev->ev_flags & EVLIST_TIMEOUT) 1884 flags |= EV_TIMEOUT; 1885 1886 event &= (EV_TIMEOUT|EV_READ|EV_WRITE|EV_SIGNAL); 1887 1888 /* See if there is a timeout that we should report */ 1889 if (tv != NULL && (flags & event & EV_TIMEOUT)) { 1890 struct timeval tmp = ev->ev_timeout; 1891 tmp.tv_usec &= MICROSECONDS_MASK; 1892 #if defined(_EVENT_HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 1893 /* correctly remamp to real time */ 1894 evutil_timeradd(&ev->ev_base->tv_clock_diff, &tmp, tv); 1895 #else 1896 *tv = tmp; 1897 #endif 1898 } 1899 1900 EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock); 1901 1902 return (flags & event); 1903 } 1904 1905 int 1906 event_initialized(const struct event *ev) 1907 { 1908 if (!(ev->ev_flags & EVLIST_INIT)) 1909 return 0; 1910 1911 return 1; 1912 } 1913 1914 void 1915 event_get_assignment(const struct event *event, struct event_base **base_out, evutil_socket_t *fd_out, short *events_out, event_callback_fn *callback_out, void **arg_out) 1916 { 1917 _event_debug_assert_is_setup(event); 1918 1919 if (base_out) 1920 *base_out = event->ev_base; 1921 if (fd_out) 1922 *fd_out = event->ev_fd; 1923 if (events_out) 1924 *events_out = event->ev_events; 1925 if (callback_out) 1926 *callback_out = event->ev_callback; 1927 if (arg_out) 1928 *arg_out = event->ev_arg; 1929 } 1930 1931 size_t 1932 event_get_struct_event_size(void) 1933 { 1934 return sizeof(struct event); 1935 } 1936 1937 evutil_socket_t 1938 event_get_fd(const struct event *ev) 1939 { 1940 _event_debug_assert_is_setup(ev); 1941 return ev->ev_fd; 1942 } 1943 1944 struct event_base * 1945 event_get_base(const struct event *ev) 1946 { 1947 _event_debug_assert_is_setup(ev); 1948 return ev->ev_base; 1949 } 1950 1951 short 1952 event_get_events(const struct event *ev) 1953 { 1954 _event_debug_assert_is_setup(ev); 1955 return ev->ev_events; 1956 } 1957 1958 event_callback_fn 1959 event_get_callback(const struct event *ev) 1960 { 1961 _event_debug_assert_is_setup(ev); 1962 return ev->ev_callback; 1963 } 1964 1965 void * 1966 event_get_callback_arg(const struct event *ev) 1967 { 1968 _event_debug_assert_is_setup(ev); 1969 return ev->ev_arg; 1970 } 1971 1972 int 1973 event_add(struct event *ev, const struct timeval *tv) 1974 { 1975 int res; 1976 1977 if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) { 1978 event_warnx("%s: event has no event_base set.", __func__); 1979 return -1; 1980 } 1981 1982 EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock); 1983 1984 res = event_add_internal(ev, tv, 0); 1985 1986 EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock); 1987 1988 return (res); 1989 } 1990 1991 /* Helper callback: wake an event_base from another thread. This version 1992 * works by writing a byte to one end of a socketpair, so that the event_base 1993 * listening on the other end will wake up as the corresponding event 1994 * triggers */ 1995 static int 1996 evthread_notify_base_default(struct event_base *base) 1997 { 1998 char buf[1]; 1999 int r; 2000 buf[0] = (char) 0; 2001 #ifdef WIN32 2002 r = send(base->th_notify_fd[1], buf, 1, 0); 2003 #else 2004 r = write(base->th_notify_fd[1], buf, 1); 2005 #endif 2006 return (r < 0 && errno != EAGAIN) ? -1 : 0; 2007 } 2008 2009 #if defined(_EVENT_HAVE_EVENTFD) && defined(_EVENT_HAVE_SYS_EVENTFD_H) 2010 /* Helper callback: wake an event_base from another thread. This version 2011 * assumes that you have a working eventfd() implementation. */ 2012 static int 2013 evthread_notify_base_eventfd(struct event_base *base) 2014 { 2015 ev_uint64_t msg = 1; 2016 int r; 2017 do { 2018 r = write(base->th_notify_fd[0], (void*) &msg, sizeof(msg)); 2019 } while (r < 0 && errno == EAGAIN); 2020 2021 return (r < 0) ? -1 : 0; 2022 } 2023 #endif 2024 2025 /** Tell the thread currently running the event_loop for base (if any) that it 2026 * needs to stop waiting in its dispatch function (if it is) and process all 2027 * active events and deferred callbacks (if there are any). */ 2028 static int 2029 evthread_notify_base(struct event_base *base) 2030 { 2031 EVENT_BASE_ASSERT_LOCKED(base); 2032 if (!base->th_notify_fn) 2033 return -1; 2034 if (base->is_notify_pending) 2035 return 0; 2036 base->is_notify_pending = 1; 2037 return base->th_notify_fn(base); 2038 } 2039 2040 /* Implementation function to add an event. Works just like event_add, 2041 * except: 1) it requires that we have the lock. 2) if tv_is_absolute is set, 2042 * we treat tv as an absolute time, not as an interval to add to the current 2043 * time */ 2044 static inline int 2045 event_add_internal(struct event *ev, const struct timeval *tv, 2046 int tv_is_absolute) 2047 { 2048 struct event_base *base = ev->ev_base; 2049 int res = 0; 2050 int notify = 0; 2051 2052 EVENT_BASE_ASSERT_LOCKED(base); 2053 _event_debug_assert_is_setup(ev); 2054 2055 event_debug(( 2056 "event_add: event: %p (fd "EV_SOCK_FMT"), %s%s%scall %p", 2057 ev, 2058 EV_SOCK_ARG(ev->ev_fd), 2059 ev->ev_events & EV_READ ? "EV_READ " : " ", 2060 ev->ev_events & EV_WRITE ? "EV_WRITE " : " ", 2061 tv ? "EV_TIMEOUT " : " ", 2062 ev->ev_callback)); 2063 2064 EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL)); 2065 2066 /* 2067 * prepare for timeout insertion further below, if we get a 2068 * failure on any step, we should not change any state. 2069 */ 2070 if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) { 2071 if (min_heap_reserve(&base->timeheap, 2072 1 + min_heap_size(&base->timeheap)) == -1) 2073 return (-1); /* ENOMEM == errno */ 2074 } 2075 2076 /* If the main thread is currently executing a signal event's 2077 * callback, and we are not the main thread, then we want to wait 2078 * until the callback is done before we mess with the event, or else 2079 * we can race on ev_ncalls and ev_pncalls below. */ 2080 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 2081 if (base->current_event == ev && (ev->ev_events & EV_SIGNAL) 2082 && !EVBASE_IN_THREAD(base)) { 2083 ++base->current_event_waiters; 2084 EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); 2085 } 2086 #endif 2087 2088 if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) && 2089 !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) { 2090 if (ev->ev_events & (EV_READ|EV_WRITE)) 2091 res = evmap_io_add(base, ev->ev_fd, ev); 2092 else if (ev->ev_events & EV_SIGNAL) 2093 res = evmap_signal_add(base, (int)ev->ev_fd, ev); 2094 if (res != -1) 2095 event_queue_insert(base, ev, EVLIST_INSERTED); 2096 if (res == 1) { 2097 /* evmap says we need to notify the main thread. */ 2098 notify = 1; 2099 res = 0; 2100 } 2101 } 2102 2103 /* 2104 * we should change the timeout state only if the previous event 2105 * addition succeeded. 2106 */ 2107 if (res != -1 && tv != NULL) { 2108 struct timeval now; 2109 int common_timeout; 2110 2111 /* 2112 * for persistent timeout events, we remember the 2113 * timeout value and re-add the event. 2114 * 2115 * If tv_is_absolute, this was already set. 2116 */ 2117 if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute) 2118 ev->ev_io_timeout = *tv; 2119 2120 /* 2121 * we already reserved memory above for the case where we 2122 * are not replacing an existing timeout. 2123 */ 2124 if (ev->ev_flags & EVLIST_TIMEOUT) { 2125 /* XXX I believe this is needless. */ 2126 if (min_heap_elt_is_top(ev)) 2127 notify = 1; 2128 event_queue_remove(base, ev, EVLIST_TIMEOUT); 2129 } 2130 2131 /* Check if it is active due to a timeout. Rescheduling 2132 * this timeout before the callback can be executed 2133 * removes it from the active list. */ 2134 if ((ev->ev_flags & EVLIST_ACTIVE) && 2135 (ev->ev_res & EV_TIMEOUT)) { 2136 if (ev->ev_events & EV_SIGNAL) { 2137 /* See if we are just active executing 2138 * this event in a loop 2139 */ 2140 if (ev->ev_ncalls && ev->ev_pncalls) { 2141 /* Abort loop */ 2142 *ev->ev_pncalls = 0; 2143 } 2144 } 2145 2146 event_queue_remove(base, ev, EVLIST_ACTIVE); 2147 } 2148 2149 gettime(base, &now); 2150 2151 common_timeout = is_common_timeout(tv, base); 2152 if (tv_is_absolute) { 2153 ev->ev_timeout = *tv; 2154 } else if (common_timeout) { 2155 struct timeval tmp = *tv; 2156 tmp.tv_usec &= MICROSECONDS_MASK; 2157 evutil_timeradd(&now, &tmp, &ev->ev_timeout); 2158 ev->ev_timeout.tv_usec |= 2159 (tv->tv_usec & ~MICROSECONDS_MASK); 2160 } else { 2161 evutil_timeradd(&now, tv, &ev->ev_timeout); 2162 } 2163 2164 event_debug(( 2165 "event_add: timeout in %d seconds, call %p", 2166 (int)tv->tv_sec, ev->ev_callback)); 2167 2168 event_queue_insert(base, ev, EVLIST_TIMEOUT); 2169 if (common_timeout) { 2170 struct common_timeout_list *ctl = 2171 get_common_timeout_list(base, &ev->ev_timeout); 2172 if (ev == TAILQ_FIRST(&ctl->events)) { 2173 common_timeout_schedule(ctl, &now, ev); 2174 } 2175 } else { 2176 /* See if the earliest timeout is now earlier than it 2177 * was before: if so, we will need to tell the main 2178 * thread to wake up earlier than it would 2179 * otherwise. */ 2180 if (min_heap_elt_is_top(ev)) 2181 notify = 1; 2182 } 2183 } 2184 2185 /* if we are not in the right thread, we need to wake up the loop */ 2186 if (res != -1 && notify && EVBASE_NEED_NOTIFY(base)) 2187 evthread_notify_base(base); 2188 2189 _event_debug_note_add(ev); 2190 2191 return (res); 2192 } 2193 2194 int 2195 event_del(struct event *ev) 2196 { 2197 int res; 2198 2199 if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) { 2200 event_warnx("%s: event has no event_base set.", __func__); 2201 return -1; 2202 } 2203 2204 EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock); 2205 2206 res = event_del_internal(ev); 2207 2208 EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock); 2209 2210 return (res); 2211 } 2212 2213 /* Helper for event_del: always called with th_base_lock held. */ 2214 static inline int 2215 event_del_internal(struct event *ev) 2216 { 2217 struct event_base *base; 2218 int res = 0, notify = 0; 2219 2220 event_debug(("event_del: %p (fd "EV_SOCK_FMT"), callback %p", 2221 ev, EV_SOCK_ARG(ev->ev_fd), ev->ev_callback)); 2222 2223 /* An event without a base has not been added */ 2224 if (ev->ev_base == NULL) 2225 return (-1); 2226 2227 EVENT_BASE_ASSERT_LOCKED(ev->ev_base); 2228 2229 /* If the main thread is currently executing this event's callback, 2230 * and we are not the main thread, then we want to wait until the 2231 * callback is done before we start removing the event. That way, 2232 * when this function returns, it will be safe to free the 2233 * user-supplied argument. */ 2234 base = ev->ev_base; 2235 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 2236 if (base->current_event == ev && !EVBASE_IN_THREAD(base)) { 2237 ++base->current_event_waiters; 2238 EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); 2239 } 2240 #endif 2241 2242 EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL)); 2243 2244 /* See if we are just active executing this event in a loop */ 2245 if (ev->ev_events & EV_SIGNAL) { 2246 if (ev->ev_ncalls && ev->ev_pncalls) { 2247 /* Abort loop */ 2248 *ev->ev_pncalls = 0; 2249 } 2250 } 2251 2252 if (ev->ev_flags & EVLIST_TIMEOUT) { 2253 /* NOTE: We never need to notify the main thread because of a 2254 * deleted timeout event: all that could happen if we don't is 2255 * that the dispatch loop might wake up too early. But the 2256 * point of notifying the main thread _is_ to wake up the 2257 * dispatch loop early anyway, so we wouldn't gain anything by 2258 * doing it. 2259 */ 2260 event_queue_remove(base, ev, EVLIST_TIMEOUT); 2261 } 2262 2263 if (ev->ev_flags & EVLIST_ACTIVE) 2264 event_queue_remove(base, ev, EVLIST_ACTIVE); 2265 2266 if (ev->ev_flags & EVLIST_INSERTED) { 2267 event_queue_remove(base, ev, EVLIST_INSERTED); 2268 if (ev->ev_events & (EV_READ|EV_WRITE)) 2269 res = evmap_io_del(base, ev->ev_fd, ev); 2270 else 2271 res = evmap_signal_del(base, (int)ev->ev_fd, ev); 2272 if (res == 1) { 2273 /* evmap says we need to notify the main thread. */ 2274 notify = 1; 2275 res = 0; 2276 } 2277 } 2278 2279 /* if we are not in the right thread, we need to wake up the loop */ 2280 if (res != -1 && notify && EVBASE_NEED_NOTIFY(base)) 2281 evthread_notify_base(base); 2282 2283 _event_debug_note_del(ev); 2284 2285 return (res); 2286 } 2287 2288 void 2289 event_active(struct event *ev, int res, short ncalls) 2290 { 2291 if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) { 2292 event_warnx("%s: event has no event_base set.", __func__); 2293 return; 2294 } 2295 2296 EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock); 2297 2298 _event_debug_assert_is_setup(ev); 2299 2300 event_active_nolock(ev, res, ncalls); 2301 2302 EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock); 2303 } 2304 2305 2306 void 2307 event_active_nolock(struct event *ev, int res, short ncalls) 2308 { 2309 struct event_base *base; 2310 2311 event_debug(("event_active: %p (fd "EV_SOCK_FMT"), res %d, callback %p", 2312 ev, EV_SOCK_ARG(ev->ev_fd), (int)res, ev->ev_callback)); 2313 2314 2315 /* We get different kinds of events, add them together */ 2316 if (ev->ev_flags & EVLIST_ACTIVE) { 2317 ev->ev_res |= res; 2318 return; 2319 } 2320 2321 base = ev->ev_base; 2322 2323 EVENT_BASE_ASSERT_LOCKED(base); 2324 2325 ev->ev_res = res; 2326 2327 if (ev->ev_pri < base->event_running_priority) 2328 base->event_continue = 1; 2329 2330 if (ev->ev_events & EV_SIGNAL) { 2331 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 2332 if (base->current_event == ev && !EVBASE_IN_THREAD(base)) { 2333 ++base->current_event_waiters; 2334 EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); 2335 } 2336 #endif 2337 ev->ev_ncalls = ncalls; 2338 ev->ev_pncalls = NULL; 2339 } 2340 2341 event_queue_insert(base, ev, EVLIST_ACTIVE); 2342 2343 if (EVBASE_NEED_NOTIFY(base)) 2344 evthread_notify_base(base); 2345 } 2346 2347 void 2348 event_deferred_cb_init(struct deferred_cb *cb, deferred_cb_fn fn, void *arg) 2349 { 2350 memset(cb, 0, sizeof(struct deferred_cb)); 2351 cb->cb = fn; 2352 cb->arg = arg; 2353 } 2354 2355 void 2356 event_deferred_cb_cancel(struct deferred_cb_queue *queue, 2357 struct deferred_cb *cb) 2358 { 2359 if (!queue) { 2360 if (current_base) 2361 queue = ¤t_base->defer_queue; 2362 else 2363 return; 2364 } 2365 2366 LOCK_DEFERRED_QUEUE(queue); 2367 if (cb->queued) { 2368 TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next); 2369 --queue->active_count; 2370 cb->queued = 0; 2371 } 2372 UNLOCK_DEFERRED_QUEUE(queue); 2373 } 2374 2375 void 2376 event_deferred_cb_schedule(struct deferred_cb_queue *queue, 2377 struct deferred_cb *cb) 2378 { 2379 if (!queue) { 2380 if (current_base) 2381 queue = ¤t_base->defer_queue; 2382 else 2383 return; 2384 } 2385 2386 LOCK_DEFERRED_QUEUE(queue); 2387 if (!cb->queued) { 2388 cb->queued = 1; 2389 TAILQ_INSERT_TAIL(&queue->deferred_cb_list, cb, cb_next); 2390 ++queue->active_count; 2391 if (queue->notify_fn) 2392 queue->notify_fn(queue, queue->notify_arg); 2393 } 2394 UNLOCK_DEFERRED_QUEUE(queue); 2395 } 2396 2397 static int 2398 timeout_next(struct event_base *base, struct timeval **tv_p) 2399 { 2400 /* Caller must hold th_base_lock */ 2401 struct timeval now; 2402 struct event *ev; 2403 struct timeval *tv = *tv_p; 2404 int res = 0; 2405 2406 ev = min_heap_top(&base->timeheap); 2407 2408 if (ev == NULL) { 2409 /* if no time-based events are active wait for I/O */ 2410 *tv_p = NULL; 2411 goto out; 2412 } 2413 2414 if (gettime(base, &now) == -1) { 2415 res = -1; 2416 goto out; 2417 } 2418 2419 if (evutil_timercmp(&ev->ev_timeout, &now, <=)) { 2420 evutil_timerclear(tv); 2421 goto out; 2422 } 2423 2424 evutil_timersub(&ev->ev_timeout, &now, tv); 2425 2426 EVUTIL_ASSERT(tv->tv_sec >= 0); 2427 EVUTIL_ASSERT(tv->tv_usec >= 0); 2428 event_debug(("timeout_next: in %d seconds", (int)tv->tv_sec)); 2429 2430 out: 2431 return (res); 2432 } 2433 2434 /* 2435 * Determines if the time is running backwards by comparing the current time 2436 * against the last time we checked. Not needed when using clock monotonic. 2437 * If time is running backwards, we adjust the firing time of every event by 2438 * the amount that time seems to have jumped. 2439 */ 2440 static void 2441 timeout_correct(struct event_base *base, struct timeval *tv) 2442 { 2443 /* Caller must hold th_base_lock. */ 2444 struct event **pev; 2445 unsigned int size; 2446 struct timeval off; 2447 int i; 2448 2449 if (use_monotonic) 2450 return; 2451 2452 /* Check if time is running backwards */ 2453 gettime(base, tv); 2454 2455 if (evutil_timercmp(tv, &base->event_tv, >=)) { 2456 base->event_tv = *tv; 2457 return; 2458 } 2459 2460 event_debug(("%s: time is running backwards, corrected", 2461 __func__)); 2462 evutil_timersub(&base->event_tv, tv, &off); 2463 2464 /* 2465 * We can modify the key element of the node without destroying 2466 * the minheap property, because we change every element. 2467 */ 2468 pev = base->timeheap.p; 2469 size = base->timeheap.n; 2470 for (; size-- > 0; ++pev) { 2471 struct timeval *ev_tv = &(**pev).ev_timeout; 2472 evutil_timersub(ev_tv, &off, ev_tv); 2473 } 2474 for (i=0; i<base->n_common_timeouts; ++i) { 2475 struct event *ev; 2476 struct common_timeout_list *ctl = 2477 base->common_timeout_queues[i]; 2478 TAILQ_FOREACH(ev, &ctl->events, 2479 ev_timeout_pos.ev_next_with_common_timeout) { 2480 struct timeval *ev_tv = &ev->ev_timeout; 2481 ev_tv->tv_usec &= MICROSECONDS_MASK; 2482 evutil_timersub(ev_tv, &off, ev_tv); 2483 ev_tv->tv_usec |= COMMON_TIMEOUT_MAGIC | 2484 (i<<COMMON_TIMEOUT_IDX_SHIFT); 2485 } 2486 } 2487 2488 /* Now remember what the new time turned out to be. */ 2489 base->event_tv = *tv; 2490 } 2491 2492 /* Activate every event whose timeout has elapsed. */ 2493 static void 2494 timeout_process(struct event_base *base) 2495 { 2496 /* Caller must hold lock. */ 2497 struct timeval now; 2498 struct event *ev; 2499 2500 if (min_heap_empty(&base->timeheap)) { 2501 return; 2502 } 2503 2504 gettime(base, &now); 2505 2506 while ((ev = min_heap_top(&base->timeheap))) { 2507 if (evutil_timercmp(&ev->ev_timeout, &now, >)) 2508 break; 2509 2510 /* delete this event from the I/O queues */ 2511 event_del_internal(ev); 2512 2513 event_debug(("timeout_process: call %p", 2514 ev->ev_callback)); 2515 event_active_nolock(ev, EV_TIMEOUT, 1); 2516 } 2517 } 2518 2519 /* Remove 'ev' from 'queue' (EVLIST_...) in base. */ 2520 static void 2521 event_queue_remove(struct event_base *base, struct event *ev, int queue) 2522 { 2523 EVENT_BASE_ASSERT_LOCKED(base); 2524 2525 if (!(ev->ev_flags & queue)) { 2526 event_errx(1, "%s: %p(fd "EV_SOCK_FMT") not on queue %x", __func__, 2527 ev, EV_SOCK_ARG(ev->ev_fd), queue); 2528 return; 2529 } 2530 2531 if (~ev->ev_flags & EVLIST_INTERNAL) 2532 base->event_count--; 2533 2534 ev->ev_flags &= ~queue; 2535 switch (queue) { 2536 case EVLIST_INSERTED: 2537 TAILQ_REMOVE(&base->eventqueue, ev, ev_next); 2538 break; 2539 case EVLIST_ACTIVE: 2540 base->event_count_active--; 2541 TAILQ_REMOVE(&base->activequeues[ev->ev_pri], 2542 ev, ev_active_next); 2543 break; 2544 case EVLIST_TIMEOUT: 2545 if (is_common_timeout(&ev->ev_timeout, base)) { 2546 struct common_timeout_list *ctl = 2547 get_common_timeout_list(base, &ev->ev_timeout); 2548 TAILQ_REMOVE(&ctl->events, ev, 2549 ev_timeout_pos.ev_next_with_common_timeout); 2550 } else { 2551 min_heap_erase(&base->timeheap, ev); 2552 } 2553 break; 2554 default: 2555 event_errx(1, "%s: unknown queue %x", __func__, queue); 2556 } 2557 } 2558 2559 /* Add 'ev' to the common timeout list in 'ev'. */ 2560 static void 2561 insert_common_timeout_inorder(struct common_timeout_list *ctl, 2562 struct event *ev) 2563 { 2564 struct event *e; 2565 /* By all logic, we should just be able to append 'ev' to the end of 2566 * ctl->events, since the timeout on each 'ev' is set to {the common 2567 * timeout} + {the time when we add the event}, and so the events 2568 * should arrive in order of their timeeouts. But just in case 2569 * there's some wacky threading issue going on, we do a search from 2570 * the end of 'ev' to find the right insertion point. 2571 */ 2572 TAILQ_FOREACH_REVERSE(e, &ctl->events, 2573 event_list, ev_timeout_pos.ev_next_with_common_timeout) { 2574 /* This timercmp is a little sneaky, since both ev and e have 2575 * magic values in tv_usec. Fortunately, they ought to have 2576 * the _same_ magic values in tv_usec. Let's assert for that. 2577 */ 2578 EVUTIL_ASSERT( 2579 is_same_common_timeout(&e->ev_timeout, &ev->ev_timeout)); 2580 if (evutil_timercmp(&ev->ev_timeout, &e->ev_timeout, >=)) { 2581 TAILQ_INSERT_AFTER(&ctl->events, e, ev, 2582 ev_timeout_pos.ev_next_with_common_timeout); 2583 return; 2584 } 2585 } 2586 TAILQ_INSERT_HEAD(&ctl->events, ev, 2587 ev_timeout_pos.ev_next_with_common_timeout); 2588 } 2589 2590 static void 2591 event_queue_insert(struct event_base *base, struct event *ev, int queue) 2592 { 2593 EVENT_BASE_ASSERT_LOCKED(base); 2594 2595 if (ev->ev_flags & queue) { 2596 /* Double insertion is possible for active events */ 2597 if (queue & EVLIST_ACTIVE) 2598 return; 2599 2600 event_errx(1, "%s: %p(fd "EV_SOCK_FMT") already on queue %x", __func__, 2601 ev, EV_SOCK_ARG(ev->ev_fd), queue); 2602 return; 2603 } 2604 2605 if (~ev->ev_flags & EVLIST_INTERNAL) 2606 base->event_count++; 2607 2608 ev->ev_flags |= queue; 2609 switch (queue) { 2610 case EVLIST_INSERTED: 2611 TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next); 2612 break; 2613 case EVLIST_ACTIVE: 2614 base->event_count_active++; 2615 TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri], 2616 ev,ev_active_next); 2617 break; 2618 case EVLIST_TIMEOUT: { 2619 if (is_common_timeout(&ev->ev_timeout, base)) { 2620 struct common_timeout_list *ctl = 2621 get_common_timeout_list(base, &ev->ev_timeout); 2622 insert_common_timeout_inorder(ctl, ev); 2623 } else 2624 min_heap_push(&base->timeheap, ev); 2625 break; 2626 } 2627 default: 2628 event_errx(1, "%s: unknown queue %x", __func__, queue); 2629 } 2630 } 2631 2632 /* Functions for debugging */ 2633 2634 const char * 2635 event_get_version(void) 2636 { 2637 return (_EVENT_VERSION); 2638 } 2639 2640 ev_uint32_t 2641 event_get_version_number(void) 2642 { 2643 return (_EVENT_NUMERIC_VERSION); 2644 } 2645 2646 /* 2647 * No thread-safe interface needed - the information should be the same 2648 * for all threads. 2649 */ 2650 2651 const char * 2652 event_get_method(void) 2653 { 2654 return (current_base->evsel->name); 2655 } 2656 2657 #ifndef _EVENT_DISABLE_MM_REPLACEMENT 2658 static void *(*_mm_malloc_fn)(size_t sz) = NULL; 2659 static void *(*_mm_realloc_fn)(void *p, size_t sz) = NULL; 2660 static void (*_mm_free_fn)(void *p) = NULL; 2661 2662 void * 2663 event_mm_malloc_(size_t sz) 2664 { 2665 if (_mm_malloc_fn) 2666 return _mm_malloc_fn(sz); 2667 else 2668 return malloc(sz); 2669 } 2670 2671 void * 2672 event_mm_calloc_(size_t count, size_t size) 2673 { 2674 if (_mm_malloc_fn) { 2675 size_t sz = count * size; 2676 void *p = _mm_malloc_fn(sz); 2677 if (p) 2678 memset(p, 0, sz); 2679 return p; 2680 } else 2681 return calloc(count, size); 2682 } 2683 2684 char * 2685 event_mm_strdup_(const char *str) 2686 { 2687 if (_mm_malloc_fn) { 2688 size_t ln = strlen(str); 2689 void *p = _mm_malloc_fn(ln+1); 2690 if (p) 2691 memcpy(p, str, ln+1); 2692 return p; 2693 } else 2694 #ifdef WIN32 2695 return _strdup(str); 2696 #else 2697 return strdup(str); 2698 #endif 2699 } 2700 2701 void * 2702 event_mm_realloc_(void *ptr, size_t sz) 2703 { 2704 if (_mm_realloc_fn) 2705 return _mm_realloc_fn(ptr, sz); 2706 else 2707 return realloc(ptr, sz); 2708 } 2709 2710 void 2711 event_mm_free_(void *ptr) 2712 { 2713 if (_mm_free_fn) 2714 _mm_free_fn(ptr); 2715 else 2716 free(ptr); 2717 } 2718 2719 void 2720 event_set_mem_functions(void *(*malloc_fn)(size_t sz), 2721 void *(*realloc_fn)(void *ptr, size_t sz), 2722 void (*free_fn)(void *ptr)) 2723 { 2724 _mm_malloc_fn = malloc_fn; 2725 _mm_realloc_fn = realloc_fn; 2726 _mm_free_fn = free_fn; 2727 } 2728 #endif 2729 2730 #if defined(_EVENT_HAVE_EVENTFD) && defined(_EVENT_HAVE_SYS_EVENTFD_H) 2731 static void 2732 evthread_notify_drain_eventfd(evutil_socket_t fd, short what, void *arg) 2733 { 2734 ev_uint64_t msg; 2735 ev_ssize_t r; 2736 struct event_base *base = arg; 2737 2738 r = read(fd, (void*) &msg, sizeof(msg)); 2739 if (r<0 && errno != EAGAIN) { 2740 event_sock_warn(fd, "Error reading from eventfd"); 2741 } 2742 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 2743 base->is_notify_pending = 0; 2744 EVBASE_RELEASE_LOCK(base, th_base_lock); 2745 } 2746 #endif 2747 2748 static void 2749 evthread_notify_drain_default(evutil_socket_t fd, short what, void *arg) 2750 { 2751 unsigned char buf[1024]; 2752 struct event_base *base = arg; 2753 #ifdef WIN32 2754 while (recv(fd, (char*)buf, sizeof(buf), 0) > 0) 2755 ; 2756 #else 2757 while (read(fd, (char*)buf, sizeof(buf)) > 0) 2758 ; 2759 #endif 2760 2761 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 2762 base->is_notify_pending = 0; 2763 EVBASE_RELEASE_LOCK(base, th_base_lock); 2764 } 2765 2766 int 2767 evthread_make_base_notifiable(struct event_base *base) 2768 { 2769 void (*cb)(evutil_socket_t, short, void *) = evthread_notify_drain_default; 2770 int (*notify)(struct event_base *) = evthread_notify_base_default; 2771 2772 /* XXXX grab the lock here? */ 2773 if (!base) 2774 return -1; 2775 2776 if (base->th_notify_fd[0] >= 0) 2777 return 0; 2778 2779 #if defined(_EVENT_HAVE_EVENTFD) && defined(_EVENT_HAVE_SYS_EVENTFD_H) 2780 #ifndef EFD_CLOEXEC 2781 #define EFD_CLOEXEC 0 2782 #endif 2783 base->th_notify_fd[0] = eventfd(0, EFD_CLOEXEC); 2784 if (base->th_notify_fd[0] >= 0) { 2785 evutil_make_socket_closeonexec(base->th_notify_fd[0]); 2786 notify = evthread_notify_base_eventfd; 2787 cb = evthread_notify_drain_eventfd; 2788 } 2789 #endif 2790 #if defined(_EVENT_HAVE_PIPE) 2791 if (base->th_notify_fd[0] < 0) { 2792 if ((base->evsel->features & EV_FEATURE_FDS)) { 2793 if (pipe(base->th_notify_fd) < 0) { 2794 event_warn("%s: pipe", __func__); 2795 } else { 2796 evutil_make_socket_closeonexec(base->th_notify_fd[0]); 2797 evutil_make_socket_closeonexec(base->th_notify_fd[1]); 2798 } 2799 } 2800 } 2801 #endif 2802 2803 #ifdef WIN32 2804 #define LOCAL_SOCKETPAIR_AF AF_INET 2805 #else 2806 #define LOCAL_SOCKETPAIR_AF AF_UNIX 2807 #endif 2808 if (base->th_notify_fd[0] < 0) { 2809 if (evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0, 2810 base->th_notify_fd) == -1) { 2811 event_sock_warn(-1, "%s: socketpair", __func__); 2812 return (-1); 2813 } else { 2814 evutil_make_socket_closeonexec(base->th_notify_fd[0]); 2815 evutil_make_socket_closeonexec(base->th_notify_fd[1]); 2816 } 2817 } 2818 2819 evutil_make_socket_nonblocking(base->th_notify_fd[0]); 2820 2821 base->th_notify_fn = notify; 2822 2823 /* 2824 Making the second socket nonblocking is a bit subtle, given that we 2825 ignore any EAGAIN returns when writing to it, and you don't usally 2826 do that for a nonblocking socket. But if the kernel gives us EAGAIN, 2827 then there's no need to add any more data to the buffer, since 2828 the main thread is already either about to wake up and drain it, 2829 or woken up and in the process of draining it. 2830 */ 2831 if (base->th_notify_fd[1] > 0) 2832 evutil_make_socket_nonblocking(base->th_notify_fd[1]); 2833 2834 /* prepare an event that we can use for wakeup */ 2835 event_assign(&base->th_notify, base, base->th_notify_fd[0], 2836 EV_READ|EV_PERSIST, cb, base); 2837 2838 /* we need to mark this as internal event */ 2839 base->th_notify.ev_flags |= EVLIST_INTERNAL; 2840 event_priority_set(&base->th_notify, 0); 2841 2842 return event_add(&base->th_notify, NULL); 2843 } 2844 2845 void 2846 event_base_dump_events(struct event_base *base, FILE *output) 2847 { 2848 struct event *e; 2849 int i; 2850 fprintf(output, "Inserted events:\n"); 2851 TAILQ_FOREACH(e, &base->eventqueue, ev_next) { 2852 fprintf(output, " %p [fd "EV_SOCK_FMT"]%s%s%s%s%s\n", 2853 (void*)e, EV_SOCK_ARG(e->ev_fd), 2854 (e->ev_events&EV_READ)?" Read":"", 2855 (e->ev_events&EV_WRITE)?" Write":"", 2856 (e->ev_events&EV_SIGNAL)?" Signal":"", 2857 (e->ev_events&EV_TIMEOUT)?" Timeout":"", 2858 (e->ev_events&EV_PERSIST)?" Persist":""); 2859 2860 } 2861 for (i = 0; i < base->nactivequeues; ++i) { 2862 if (TAILQ_EMPTY(&base->activequeues[i])) 2863 continue; 2864 fprintf(output, "Active events [priority %d]:\n", i); 2865 TAILQ_FOREACH(e, &base->eventqueue, ev_next) { 2866 fprintf(output, " %p [fd "EV_SOCK_FMT"]%s%s%s%s\n", 2867 (void*)e, EV_SOCK_ARG(e->ev_fd), 2868 (e->ev_res&EV_READ)?" Read active":"", 2869 (e->ev_res&EV_WRITE)?" Write active":"", 2870 (e->ev_res&EV_SIGNAL)?" Signal active":"", 2871 (e->ev_res&EV_TIMEOUT)?" Timeout active":""); 2872 } 2873 } 2874 } 2875 2876 void 2877 event_base_add_virtual(struct event_base *base) 2878 { 2879 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 2880 base->virtual_event_count++; 2881 EVBASE_RELEASE_LOCK(base, th_base_lock); 2882 } 2883 2884 void 2885 event_base_del_virtual(struct event_base *base) 2886 { 2887 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 2888 EVUTIL_ASSERT(base->virtual_event_count > 0); 2889 base->virtual_event_count--; 2890 if (base->virtual_event_count == 0 && EVBASE_NEED_NOTIFY(base)) 2891 evthread_notify_base(base); 2892 EVBASE_RELEASE_LOCK(base, th_base_lock); 2893 } 2894 2895 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 2896 int 2897 event_global_setup_locks_(const int enable_locks) 2898 { 2899 #ifndef _EVENT_DISABLE_DEBUG_MODE 2900 EVTHREAD_SETUP_GLOBAL_LOCK(_event_debug_map_lock, 0); 2901 #endif 2902 if (evsig_global_setup_locks_(enable_locks) < 0) 2903 return -1; 2904 if (evutil_secure_rng_global_setup_locks_(enable_locks) < 0) 2905 return -1; 2906 return 0; 2907 } 2908 #endif 2909 2910 void 2911 event_base_assert_ok(struct event_base *base) 2912 { 2913 int i; 2914 EVBASE_ACQUIRE_LOCK(base, th_base_lock); 2915 evmap_check_integrity(base); 2916 2917 /* Check the heap property */ 2918 for (i = 1; i < (int)base->timeheap.n; ++i) { 2919 int parent = (i - 1) / 2; 2920 struct event *ev, *p_ev; 2921 ev = base->timeheap.p[i]; 2922 p_ev = base->timeheap.p[parent]; 2923 EVUTIL_ASSERT(ev->ev_flags & EV_TIMEOUT); 2924 EVUTIL_ASSERT(evutil_timercmp(&p_ev->ev_timeout, &ev->ev_timeout, <=)); 2925 EVUTIL_ASSERT(ev->ev_timeout_pos.min_heap_idx == i); 2926 } 2927 2928 /* Check that the common timeouts are fine */ 2929 for (i = 0; i < base->n_common_timeouts; ++i) { 2930 struct common_timeout_list *ctl = base->common_timeout_queues[i]; 2931 struct event *last=NULL, *ev; 2932 TAILQ_FOREACH(ev, &ctl->events, ev_timeout_pos.ev_next_with_common_timeout) { 2933 if (last) 2934 EVUTIL_ASSERT(evutil_timercmp(&last->ev_timeout, &ev->ev_timeout, <=)); 2935 EVUTIL_ASSERT(ev->ev_flags & EV_TIMEOUT); 2936 EVUTIL_ASSERT(is_common_timeout(&ev->ev_timeout,base)); 2937 EVUTIL_ASSERT(COMMON_TIMEOUT_IDX(&ev->ev_timeout) == i); 2938 last = ev; 2939 } 2940 } 2941 2942 EVBASE_RELEASE_LOCK(base, th_base_lock); 2943 } 2944