Home | History | Annotate | Download | only in provider
      1 // Copyright 2015 The Weave Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "examples/provider/event_task_runner.h"
      6 
      7 #include <signal.h>
      8 
      9 namespace weave {
     10 namespace examples {
     11 
     12 namespace {
     13 event_base* g_event_base = nullptr;
     14 }
     15 
     16 void EventTaskRunner::PostDelayedTask(
     17     const tracked_objects::Location& from_here,
     18     const base::Closure& task,
     19     base::TimeDelta delay) {
     20   base::Time new_time = base::Time::Now() + delay;
     21   if (queue_.empty() || new_time < queue_.top().first.first) {
     22     ReScheduleEvent(delay);
     23   }
     24   queue_.emplace(std::make_pair(new_time, ++counter_), task);
     25 }
     26 
     27 void EventTaskRunner::AddIoCompletionTask(
     28     int fd,
     29     int16_t what,
     30     const EventTaskRunner::IoCompletionCallback& task) {
     31   int16_t flags = EV_PERSIST | EV_ET;
     32   flags |= (what & kReadable) ? EV_READ : 0;
     33   flags |= (what & kWriteable) ? EV_WRITE : 0;
     34 #if LIBEVENT_VERSION_NUMBER >= 0x02010400
     35   flags |= (what & kClosed) ? EV_CLOSED : 0;
     36 #endif
     37   event* ioevent = event_new(base_.get(), fd, flags, FdEventHandler, this);
     38   EventPtr<event> ioeventPtr{ioevent};
     39   fd_task_map_.insert(
     40       std::make_pair(fd, std::make_pair(std::move(ioeventPtr), task)));
     41   event_add(ioevent, nullptr);
     42 }
     43 
     44 void EventTaskRunner::RemoveIoCompletionTask(int fd) {
     45   fd_task_map_.erase(fd);
     46 }
     47 
     48 void EventTaskRunner::Run() {
     49   g_event_base = base_.get();
     50 
     51   struct sigaction sa = {};
     52   sa.sa_handler = [](int signal) {
     53     event_base_loopexit(g_event_base, nullptr);
     54   };
     55   sigfillset(&sa.sa_mask);
     56   sigaction(SIGINT, &sa, nullptr);
     57 
     58   do {
     59     event_base_loop(g_event_base, EVLOOP_ONCE);
     60   } while (!event_base_got_exit(g_event_base));
     61   g_event_base = nullptr;
     62 }
     63 
     64 void EventTaskRunner::ReScheduleEvent(base::TimeDelta delay) {
     65   timespec ts = delay.ToTimeSpec();
     66   timeval tv = {ts.tv_sec, ts.tv_nsec / 1000};
     67   event_add(task_event_.get(), &tv);
     68 }
     69 
     70 void EventTaskRunner::EventHandler(int /* fd */,
     71                                    int16_t /* what */,
     72                                    void* runner) {
     73   static_cast<EventTaskRunner*>(runner)->Process();
     74 }
     75 
     76 void EventTaskRunner::FreeEvent(event* evnt) {
     77   event_del(evnt);
     78   event_free(evnt);
     79 }
     80 
     81 void EventTaskRunner::Process() {
     82   while (!queue_.empty() && queue_.top().first.first <= base::Time::Now()) {
     83     auto cb = queue_.top().second;
     84     queue_.pop();
     85     cb.Run();
     86   }
     87   if (!queue_.empty()) {
     88     base::TimeDelta delta = std::max(
     89         base::TimeDelta(), queue_.top().first.first - base::Time::Now());
     90     ReScheduleEvent(delta);
     91   }
     92 }
     93 
     94 void EventTaskRunner::FdEventHandler(int fd, int16_t what, void* runner) {
     95   static_cast<EventTaskRunner*>(runner)->ProcessFd(fd, what);
     96 }
     97 
     98 void EventTaskRunner::ProcessFd(int fd, int16_t what) {
     99   auto it = fd_task_map_.find(fd);
    100   if (it != fd_task_map_.end()) {
    101     const IoCompletionCallback& callback = it->second.second;
    102     callback.Run(fd, what, this);
    103   }
    104 }
    105 
    106 }  // namespace examples
    107 }  // namespace weave
    108