1 // Copyright 2015 The Weave Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "examples/provider/event_task_runner.h" 6 7 #include <signal.h> 8 9 namespace weave { 10 namespace examples { 11 12 namespace { 13 event_base* g_event_base = nullptr; 14 } 15 16 void EventTaskRunner::PostDelayedTask( 17 const tracked_objects::Location& from_here, 18 const base::Closure& task, 19 base::TimeDelta delay) { 20 base::Time new_time = base::Time::Now() + delay; 21 if (queue_.empty() || new_time < queue_.top().first.first) { 22 ReScheduleEvent(delay); 23 } 24 queue_.emplace(std::make_pair(new_time, ++counter_), task); 25 } 26 27 void EventTaskRunner::AddIoCompletionTask( 28 int fd, 29 int16_t what, 30 const EventTaskRunner::IoCompletionCallback& task) { 31 int16_t flags = EV_PERSIST | EV_ET; 32 flags |= (what & kReadable) ? EV_READ : 0; 33 flags |= (what & kWriteable) ? EV_WRITE : 0; 34 #if LIBEVENT_VERSION_NUMBER >= 0x02010400 35 flags |= (what & kClosed) ? EV_CLOSED : 0; 36 #endif 37 event* ioevent = event_new(base_.get(), fd, flags, FdEventHandler, this); 38 EventPtr<event> ioeventPtr{ioevent}; 39 fd_task_map_.insert( 40 std::make_pair(fd, std::make_pair(std::move(ioeventPtr), task))); 41 event_add(ioevent, nullptr); 42 } 43 44 void EventTaskRunner::RemoveIoCompletionTask(int fd) { 45 fd_task_map_.erase(fd); 46 } 47 48 void EventTaskRunner::Run() { 49 g_event_base = base_.get(); 50 51 struct sigaction sa = {}; 52 sa.sa_handler = [](int signal) { 53 event_base_loopexit(g_event_base, nullptr); 54 }; 55 sigfillset(&sa.sa_mask); 56 sigaction(SIGINT, &sa, nullptr); 57 58 do { 59 event_base_loop(g_event_base, EVLOOP_ONCE); 60 } while (!event_base_got_exit(g_event_base)); 61 g_event_base = nullptr; 62 } 63 64 void EventTaskRunner::ReScheduleEvent(base::TimeDelta delay) { 65 timespec ts = delay.ToTimeSpec(); 66 timeval tv = {ts.tv_sec, ts.tv_nsec / 1000}; 67 event_add(task_event_.get(), &tv); 68 } 69 70 void EventTaskRunner::EventHandler(int /* fd */, 71 int16_t /* what */, 72 void* runner) { 73 static_cast<EventTaskRunner*>(runner)->Process(); 74 } 75 76 void EventTaskRunner::FreeEvent(event* evnt) { 77 event_del(evnt); 78 event_free(evnt); 79 } 80 81 void EventTaskRunner::Process() { 82 while (!queue_.empty() && queue_.top().first.first <= base::Time::Now()) { 83 auto cb = queue_.top().second; 84 queue_.pop(); 85 cb.Run(); 86 } 87 if (!queue_.empty()) { 88 base::TimeDelta delta = std::max( 89 base::TimeDelta(), queue_.top().first.first - base::Time::Now()); 90 ReScheduleEvent(delta); 91 } 92 } 93 94 void EventTaskRunner::FdEventHandler(int fd, int16_t what, void* runner) { 95 static_cast<EventTaskRunner*>(runner)->ProcessFd(fd, what); 96 } 97 98 void EventTaskRunner::ProcessFd(int fd, int16_t what) { 99 auto it = fd_task_map_.find(fd); 100 if (it != fd_task_map_.end()) { 101 const IoCompletionCallback& callback = it->second.second; 102 callback.Run(fd, what, this); 103 } 104 } 105 106 } // namespace examples 107 } // namespace weave 108