Lines Matching full:base
146 static int evthread_notify_base(struct event_base *base);
325 #define EVENT_BASE_ASSERT_LOCKED(base) \
326 EVLOCK_ASSERT_LOCKED((base)->th_base_lock)
351 /** Set 'tp' to the current time according to 'base'. We must hold the lock
352 * on 'base'. If there is a cached time, return it. Otherwise, use
357 gettime(struct event_base *base, struct timeval *tp)
359 EVENT_BASE_ASSERT_LOCKED(base);
361 if (base->tv_cache.tv_sec) {
362 *tp = base->tv_cache;
375 if (base->last_updated_clock_diff + CLOCK_SYNC_INTERVAL
379 evutil_timersub(&tv, tp, &base->tv_clock_diff);
380 base->last_updated_clock_diff = ts.tv_sec;
391 event_base_gettimeofday_cached(struct event_base *base, struct timeval *tv)
394 if (!base) {
395 base = current_base;
400 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
401 if (base->tv_cache.tv_sec == 0) {
405 evutil_timeradd(&base->tv_cache, &base->tv_clock_diff, tv);
407 *tv = base->tv_cache;
411 EVBASE_RELEASE_LOCK(base, th_base_lock);
415 /** Make 'base' have no current cached time. */
417 clear_time_cache(struct event_base *base)
419 base->tv_cache.tv_sec = 0;
422 /** Replace the cached time in 'base' with the current time. */
424 update_time_cache(struct event_base *base)
426 base->tv_cache.tv_sec = 0;
427 if (!(base->flags & EVENT_BASE_FLAG_NO_CACHE_TIME))
428 gettime(base, &base->tv_cache);
434 struct event_base *base = event_base_new_with_config(NULL);
436 if (base == NULL) {
441 current_base = base;
443 return (base);
449 struct event_base *base = NULL;
452 base = event_base_new_with_config(cfg);
455 return base;
491 event_base_get_features(const struct event_base *base)
493 return base->evsel->features;
503 /** Helper for the deferred_cb queue: wake up the event base. */
507 struct event_base *base = baseptr;
508 if (EVBASE_NEED_NOTIFY(base))
509 evthread_notify_base(base);
513 event_base_get_deferred_cb_queue(struct event_base *base)
515 return base ? &base->defer_queue : NULL;
555 struct event_base *base;
562 if ((base = mm_calloc(1, sizeof(struct event_base))) == NULL) {
567 gettime(base, &base->event_tv);
569 min_heap_ctor(&base->timeheap);
570 TAILQ_INIT(&base->eventqueue);
571 base->sig.ev_signal_pair[0] = -1;
572 base->sig.ev_signal_pair[1] = -1;
573 base
574 base->th_notify_fd[1] = -1;
576 event_deferred_cb_queue_init(&base->defer_queue);
577 base->defer_queue.notify_fn = notify_base_cbq_callback;
578 base->defer_queue.notify_arg = base;
580 base->flags = cfg->flags;
582 evmap_io_initmap(&base->io);
583 evmap_signal_initmap(&base->sigmap);
584 event_changelist_init(&base->changelist);
586 base->evbase = NULL;
591 for (i = 0; eventops[i] && !base->evbase; i++) {
607 base->evsel = eventops[i];
609 base->evbase = base->evsel->init(base);
612 if (base->evbase == NULL) {
615 base->evsel = NULL;
616 event_base_free(base);
621 event_msgx("libevent using: %s", base->evsel->name);
624 if (event_base_priority_init(base, 1) < 0) {
625 event_base_free(base);
635 EVTHREAD_ALLOC_LOCK(base->th_base_lock,
637 base->defer_queue.lock = base->th_base_lock;
638 EVTHREAD_ALLOC_COND(base->current_event_cond);
639 r = evthread_make_base_notifiable(base);
641 event_warnx("%s: Unable to make base notifiable.", __func__);
642 event_base_free(base);
650 event_base_start_iocp(base, cfg->n_cpus_hint);
653 return (base);
657 event_base_start_iocp(struct event_base *base, int n_cpus)
660 if (base->iocp)
662 base->iocp = event_iocp_port_launch(n_cpus);
663 if (!base->iocp) {
674 event_base_stop_iocp(struct event_base *base)
679 if (!base->iocp)
681 rv = event_iocp_shutdown(base->iocp, -1);
683 base->iocp = NULL;
688 event_base_free(struct event_base *base)
693 * the base, then the contending thread will be very sad soon. */
697 if (base == NULL && current_base)
698 base = current_base;
700 if (base == current_base)
703 if (base == NULL) {
704 event_warnx("%s: no base to free", __func__);
710 event_base_stop_iocp(base);
714 if (base->th_notify_fd[0] != -1) {
715 event_del(&base->th_notify);
716 EVUTIL_CLOSESOCKET(base->th_notify_fd[0]);
717 if (base->th_notify_fd[1] != -1)
718 EVUTIL_CLOSESOCKET(base->th_notify_fd[1]);
719 base->th_notify_fd[0] = -1;
720 base->th_notify_fd[1] = -1;
721 event_debug_unassign(&base->th_notify);
725 for (ev = TAILQ_FIRST(&base->eventqueue); ev; ) {
733 while ((ev = min_heap_top(&base->timeheap)) != NULL) {
737 for (i = 0; i < base->n_common_timeouts; ++i) {
739 base->common_timeout_queues[i];
753 if (base->common_timeout_queues)
754 mm_free(base->common_timeout_queues);
756 for (i = 0; i < base->nactivequeues; ++i) {
757 for (ev = TAILQ_FIRST(&base->activequeues[i]); ev; ) {
768 event_debug(("%s: %d events were still set in base",
771 if (base->evsel != NULL && base->evsel->dealloc != NULL)
772 base->evsel->dealloc(base);
774 for (i = 0; i < base->nactivequeues; ++i)
775 EVUTIL_ASSERT(TAILQ_EMPTY(&base->activequeues[i]));
777 EVUTIL_ASSERT(min_heap_empty(&base->timeheap));
778 min_heap_dtor(&base->timeheap);
780 mm_free(base->activequeues);
782 EVUTIL_ASSERT(TAILQ_EMPTY(&base->eventqueue));
784 evmap_io_clear(&base->io);
785 evmap_signal_clear(&base->sigmap);
786 event_changelist_freemem(&base->changelist);
788 EVTHREAD_FREE_LOCK(base->th_base_lock, EVTHREAD_LOCKTYPE_RECURSIVE);
789 EVTHREAD_FREE_COND(base->current_event_cond);
791 mm_free(base);
794 /* reinitialize the event base after a fork */
796 event_reinit(struct event_base *base)
803 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
805 evsel = base->evsel;
819 if (base->sig.ev_signal_added) {
820 /* we cannot call event_del here because the base has
822 event_queue_remove(base, &base->sig.ev_signal,
824 if (base->sig.ev_signal.ev_flags & EVLIST_ACTIVE)
825 event_queue_remove(base, &base->sig.ev_signal,
827 if (base->sig.ev_signal_pair[0] != -1)
828 EVUTIL_CLOSESOCKET(base->sig.ev_signal_pair[0]);
829 if (base->sig.ev_signal_pair[1] != -1)
830 EVUTIL_CLOSESOCKET(base->sig.ev_signal_pair[1]);
831 base->sig.ev_signal_added = 0;
833 if (base->th_notify_fd[0] != -1) {
834 /* we cannot call event_del here because the base has
837 event_queue_remove(base, &base->th_notify,
839 if (base->th_notify.ev_flags & EVLIST_ACTIVE)
840 event_queue_remove(base, &base->th_notify,
842 base->sig.ev_signal_added = 0;
843 EVUTIL_CLOSESOCKET(base->th_notify_fd[0]);
844 if (base->th_notify_fd[1] != -1)
845 EVUTIL_CLOSESOCKET(base->th_notify_fd[1]);
846 base->th_notify_fd[0] = -1;
847 base->th_notify_fd[1] = -1;
848 event_debug_unassign(&base->th_notify);
851 if (base->evsel->dealloc != NULL)
852 base->evsel->dealloc(base);
853 base->evbase = evsel->init(base);
854 if (base->evbase == NULL) {
861 event_changelist_freemem(&base->changelist); /* XXX */
862 evmap_io_clear(&base->io);
863 evmap_signal_clear(&base->sigmap);
865 TAILQ_FOREACH(ev, &base->eventqueue, ev_next) {
867 if (ev == &base->sig.ev_signal) {
874 if (evmap_io_add(base, ev->ev_fd, ev) == -1)
877 if (evmap_signal_add(base, (int)ev->ev_fd, ev) == -1)
883 res = evthread_make_base_notifiable(base);
886 EVBASE_RELEASE_LOCK(base, th_base_lock);
1007 event_base_priority_init(struct event_base *base, int npriorities)
1011 if (N_ACTIVE_CALLBACKS(base) || npriorities < 1
1015 if (npriorities == base->nactivequeues)
1018 if (base->nactivequeues) {
1019 mm_free(base->activequeues);
1020 base->nactivequeues = 0;
1024 base->activequeues = (struct event_list *)
1026 if (base->activequeues == NULL) {
1030 base->nactivequeues = npriorities;
1032 for (i = 0; i < base->nactivequeues; ++i) {
1033 TAILQ_INIT(&base->activequeues[i]);
1041 event_haveevents(struct event_base *base)
1044 return (base->virtual_event_count > 0 || base->event_count > 0);
1049 event_signal_closure(struct event_base *base, struct event *ev)
1058 EVBASE_RELEASE_LOCK(base, th_base_lock);
1066 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
1067 should_break = base->event_break;
1068 EVBASE_RELEASE_LOCK(base, th_base_lock);
1101 /** Return true iff if 'tv' is a common timeout in 'base' */
1104 const struct event_base *base)
1110 return idx < base->n_common_timeouts;
1125 get_common_timeout_list(struct event_base *base, const struct timeval *tv)
1127 return base->common_timeout_queues[COMMON_TIMEOUT_IDX(tv)];
1133 struct event_base *base)
1136 &get_common_timeout_list(base, tv)->duration;
1161 struct event_base *base = ctl->base;
1163 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
1164 gettime(base, &now);
1176 EVBASE_RELEASE_LOCK(base, th_base_lock);
1182 event_base_init_common_timeout(struct event_base *base,
1190 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
1193 if (is_common_timeout(duration, base))
1199 for (i = 0; i < base->n_common_timeouts; ++i) {
1201 base->common_timeout_queues[i];
1205 EVUTIL_ASSERT(is_common_timeout(&ctl->duration, base));
1210 if (base->n_common_timeouts == MAX_COMMON_TIMEOUTS) {
1216 if (base->n_common_timeouts_allocated == base->n_common_timeouts) {
1217 int n = base->n_common_timeouts < 16 ? 16 :
1218 base->n_common_timeouts*2;
1220 mm_realloc(base->common_timeout_queues,
1226 base->n_common_timeouts_allocated = n;
1227 base->common_timeout_queues = newqueues;
1238 (base->n_common_timeouts << COMMON_TIMEOUT_IDX_SHIFT);
1239 evtimer_assign(&new_ctl->timeout_event, base,
1243 new_ctl->base = base;
1244 base->common_timeout_queues[base->n_common_timeouts++] = new_ctl;
1249 EVUTIL_ASSERT(is_common_timeout(result, base));
1251 EVBASE_RELEASE_LOCK(base, th_base_lock);
1257 event_persist_closure(struct event_base *base, struct event *ev)
1277 gettime(base, &now);
1278 if (is_common_timeout(&ev->ev_timeout, base)) {
1316 EVBASE_RELEASE_LOCK(base, th_base_lock);
1330 event_process_active_single_queue(struct event_base *base,
1340 event_queue_remove(base, ev, EVLIST_ACTIVE);
1354 base->current_event = ev;
1355 base->current_event_waiters = 0;
1360 event_signal_closure(base, ev);
1363 event_persist_closure(base, ev);
1367 EVBASE_RELEASE_LOCK(base, th_base_lock);
1373 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
1375 base->current_event = NULL;
1376 if (base->current_event_waiters) {
1377 base->current_event_waiters = 0;
1378 EVTHREAD_COND_BROADCAST(base->current_event_cond);
1382 if (base->event_break)
1384 if (base->event_continue)
1428 event_process_active(struct event_base *base)
1434 for (i = 0; i < base->nactivequeues; ++i) {
1435 if (TAILQ_FIRST(&base->activequeues[i]) != NULL) {
1436 base->event_running_priority = i;
1437 activeq = &base->activequeues[i];
1438 c = event_process_active_single_queue(base, activeq);
1440 base->event_running_priority = -1;
1450 event_process_deferred_callbacks(&base->defer_queue,&base->event_break);
1451 base->event_running_priority = -1;
1472 event_base_get_method(const struct event_base *base)
1474 EVUTIL_ASSERT(base);
1475 return (base->evsel->name);
1483 struct event_base *base = arg;
1484 base->event_gotterm = 1;
1555 event_base_loop(struct event_base *base, int flags)
1557 const struct eventop *evsel = base->evsel;
1564 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
1566 if (base->running_loop) {
1569 EVBASE_RELEASE_LOCK(base, th_base_lock);
1573 base->running_loop = 1;
1575 clear_time_cache(base);
1577 if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
1578 evsig_set_base(base);
1583 base->th_owner_id = EVTHREAD_GET_ID();
1586 base->event_gotterm = base->event_break = 0;
1589 base->event_continue = 0;
1592 if (base->event_gotterm) {
1596 if (base->event_break) {
1600 timeout_correct(base, &tv);
1603 if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
1604 timeout_next(base, &tv_p);
1614 if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
1621 gettime(base, &base->event_tv);
1623 clear_time_cache(base);
1625 res = evsel->dispatch(base, tv_p);
1634 update_time_cache(base);
1636 timeout_process(base);
1638 if (N_ACTIVE_CALLBACKS(base)) {
1639 int n = event_process_active(base);
1641 && N_ACTIVE_CALLBACKS(base) == 0
1650 clear_time_cache(base);
1651 base->running_loop = 0;
1653 EVBASE_RELEASE_LOCK(base, th_base_lock);
1689 event_base_once(struct event_base *base, evutil_socket_t fd, short events,
1714 evtimer_assign(&eonce->ev, base, event_once_cb, eonce);
1718 event_assign(&eonce->ev, base, fd, events, event_once_cb, eonce);
1736 event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, void (*callback)(evutil_socket_t, short, void *), void *arg)
1738 if (!base)
1739 base = current_base;
1743 ev->ev_base = base;
1772 if (base != NULL) {
1774 ev->ev_pri = base->nactivequeues / 2;
1783 event_base_set(struct event_base *base, struct event *ev)
1785 /* Only innocent events may be assigned to a different base */
1791 ev->ev_base = base;
1792 ev->ev_pri = base->nactivequeues/2;
1807 event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)
1813 if (event_assign(ev, base, fd, events, cb, arg) < 0) {
1996 evthread_notify_base_default(struct event_base *base)
2002 r = send(base->th_notify_fd[1], buf, 1, 0);
2004 r = write(base->th_notify_fd[1], buf, 1);
2013 evthread_notify_base_eventfd(struct event_base *base)
2018 r = write(base->th_notify_fd[0], (void*) &msg, sizeof(msg));
2025 /** Tell the thread currently running the event_loop for base (if any) that it
2029 evthread_notify_base(struct event_base *base)
2031 EVENT_BASE_ASSERT_LOCKED(base);
2032 if (!base->th_notify_fn)
2034 if (base->is_notify_pending)
2036 base->is_notify_pending = 1;
2037 return base->th_notify_fn(base);
2048 struct event_base *base = ev->ev_base;
2052 EVENT_BASE_ASSERT_LOCKED(base);
2071 if (min_heap_reserve(&base->timeheap,
2072 1 + min_heap_size(&base->timeheap)) == -1)
2081 if (base->current_event == ev && (ev->ev_events & EV_SIGNAL)
2082 && !EVBASE_IN_THREAD(base)) {
2083 ++base->current_event_waiters;
2084 EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
2091 res = evmap_io_add(base, ev->ev_fd, ev);
2093 res = evmap_signal_add(base, (int)ev->ev_fd, ev);
2095 event_queue_insert(base, ev, EVLIST_INSERTED);
2128 event_queue_remove(base, ev, EVLIST_TIMEOUT);
2146 event_queue_remove(base, ev, EVLIST_ACTIVE);
2149 gettime(base, &now);
2151 common_timeout = is_common_timeout(tv, base);
2168 event_queue_insert(base, ev, EVLIST_TIMEOUT);
2171 get_common_timeout_list(base, &ev->ev_timeout);
2186 if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
2187 evthread_notify_base(base);
2217 struct event_base *base;
2223 /* An event without a base has not been added */
2234 base = ev->ev_base;
2236 if (base->current_event == ev && !EVBASE_IN_THREAD(base)) {
2237 ++base->current_event_waiters;
2238 EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
2260 event_queue_remove(base, ev, EVLIST_TIMEOUT);
2264 event_queue_remove(base, ev, EVLIST_ACTIVE);
2267 event_queue_remove(base, ev, EVLIST_INSERTED);
2269 res = evmap_io_del(base, ev->ev_fd, ev);
2271 res = evmap_signal_del(base, (int)ev->ev_fd, ev);
2280 if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
2281 evthread_notify_base(base);
2309 struct event_base *base;
2321 base = ev->ev_base;
2323 EVENT_BASE_ASSERT_LOCKED(base);
2327 if (ev->ev_pri < base->event_running_priority)
2328 base->event_continue = 1;
2332 if (base->current_event == ev && !EVBASE_IN_THREAD(base)) {
2333 ++base->current_event_waiters;
2334 EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
2341 event_queue_insert(base, ev, EVLIST_ACTIVE);
2343 if (EVBASE_NEED_NOTIFY(base))
2344 evthread_notify_base(base);
2398 timeout_next(struct event_base *base, struct timeval **tv_p)
2406 ev = min_heap_top(&base->timeheap);
2414 if (gettime(base, &now) == -1) {
2441 timeout_correct(struct event_base *base, struct timeval *tv)
2453 gettime(base, tv);
2455 if (evutil_timercmp(tv, &base->event_tv, >=)) {
2456 base->event_tv = *tv;
2462 evutil_timersub(&base->event_tv, tv, &off);
2468 pev = base->timeheap.p;
2469 size = base->timeheap.n;
2474 for (i=0; i<base->n_common_timeouts; ++i) {
2477 base->common_timeout_queues[i];
2489 base->event_tv = *tv;
2494 timeout_process(struct event_base *base)
2500 if (min_heap_empty(&base->timeheap)) {
2504 gettime(base, &now);
2506 while ((ev = min_heap_top(&base->timeheap))) {
2519 /* Remove 'ev' from 'queue' (EVLIST_...) in base. */
2521 event_queue_remove(struct event_base *base, struct event *ev, int queue)
2523 EVENT_BASE_ASSERT_LOCKED(base);
2532 base->event_count--;
2537 TAILQ_REMOVE(&base->eventqueue, ev, ev_next);
2540 base->event_count_active--;
2541 TAILQ_REMOVE(&base->activequeues[ev->ev_pri],
2545 if (is_common_timeout(&ev->ev_timeout, base)) {
2547 get_common_timeout_list(base, &ev->ev_timeout);
2551 min_heap_erase(&base->timeheap, ev);
2591 event_queue_insert(struct event_base *base, struct event *ev, int queue)
2593 EVENT_BASE_ASSERT_LOCKED(base);
2606 base->event_count++;
2611 TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
2614 base->event_count_active++;
2615 TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri],
2619 if (is_common_timeout(&ev->ev_timeout, base)) {
2621 get_common_timeout_list(base, &ev->ev_timeout);
2624 min_heap_push(&base->timeheap, ev);
2736 struct event_base *base = arg;
2742 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
2743 base->is_notify_pending = 0;
2744 EVBASE_RELEASE_LOCK(base, th_base_lock);
2752 struct event_base *base = arg;
2761 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
2762 base->is_notify_pending = 0;
2763 EVBASE_RELEASE_LOCK(base, th_base_lock);
2767 evthread_make_base_notifiable(struct event_base *base)
2773 if (!base)
2776 if (base->th_notify_fd[0] >= 0)
2783 base->th_notify_fd[0] = eventfd(0, EFD_CLOEXEC);
2784 if (base->th_notify_fd[0] >= 0) {
2785 evutil_make_socket_closeonexec(base->th_notify_fd[0]);
2791 if (base->th_notify_fd[0] < 0) {
2792 if ((base->evsel->features & EV_FEATURE_FDS)) {
2793 if (pipe(base->th_notify_fd) < 0) {
2796 evutil_make_socket_closeonexec(base->th_notify_fd[0]);
2797 evutil_make_socket_closeonexec(base->th_notify_fd[1]);
2808 if (base->th_notify_fd[0] < 0) {
2810 base->th_notify_fd) == -1) {
2814 evutil_make_socket_closeonexec(base->th_notify_fd[0]);
2815 evutil_make_socket_closeonexec(base->th_notify_fd[1]);
2819 evutil_make_socket_nonblocking(base->th_notify_fd[0]);
2821 base->th_notify_fn = notify;
2831 if (base->th_notify_fd[1] > 0)
2832 evutil_make_socket_nonblocking(base->th_notify_fd[1]);
2835 event_assign(&base->th_notify, base, base->th_notify_fd[0],
2836 EV_READ|EV_PERSIST, cb, base);
2839 base->th_notify.ev_flags |= EVLIST_INTERNAL;
2840 event_priority_set(&base->th_notify, 0);
2842 return event_add(&base->th_notify, NULL);
2846 event_base_dump_events(struct event_base *base, FILE *output)
2851 TAILQ_FOREACH(e, &base->eventqueue, ev_next) {
2861 for (i = 0; i < base->nactivequeues; ++i) {
2862 if (TAILQ_EMPTY(&base->activequeues[i]))
2865 TAILQ_FOREACH(e, &base->eventqueue, ev_next) {
2877 event_base_add_virtual(struct event_base *base)
2879 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
2880 base->virtual_event_count++;
2881 EVBASE_RELEASE_LOCK(base, th_base_lock);
2885 event_base_del_virtual(struct event_base *base)
2887 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
2888 EVUTIL_ASSERT(base->virtual_event_count > 0);
2889 base->virtual_event_count--;
2890 if (base->virtual_event_count == 0 && EVBASE_NEED_NOTIFY(base))
2891 evthread_notify_base(base);
2892 EVBASE_RELEASE_LOCK(base, th_base_lock);
2911 event_base_assert_ok(struct event_base *base)
2914 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
2915 evmap_check_integrity(base);
2918 for (i = 1; i < (int)base->timeheap.n; ++i) {
2921 ev = base->timeheap.p[i];
2922 p_ev = base->timeheap.p[parent];
2929 for (i = 0; i < base->n_common_timeouts; ++i) {
2930 struct common_timeout_list *ctl = base->common_timeout_queues[i];
2936 EVUTIL_ASSERT(is_common_timeout(&ev->ev_timeout,base));
2942 EVBASE_RELEASE_LOCK(base, th_base_lock);