Home | History | Annotate | Download | only in message_loop
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "base/message_loop/message_pump_libevent.h"
      6 
      7 #include <errno.h>
      8 #include <unistd.h>
      9 
     10 #include <memory>
     11 
     12 #include "base/auto_reset.h"
     13 #include "base/compiler_specific.h"
     14 #include "base/files/file_util.h"
     15 #include "base/logging.h"
     16 #include "base/observer_list.h"
     17 #include "base/posix/eintr_wrapper.h"
     18 #include "base/third_party/libevent/event.h"
     19 #include "base/time/time.h"
     20 #include "base/trace_event/trace_event.h"
     21 #include "build/build_config.h"
     22 
     23 #if defined(OS_MACOSX)
     24 #include "base/mac/scoped_nsautorelease_pool.h"
     25 #endif
     26 
     27 // Lifecycle of struct event
     28 // Libevent uses two main data structures:
     29 // struct event_base (of which there is one per message pump), and
     30 // struct event (of which there is roughly one per socket).
     31 // The socket's struct event is created in
     32 // MessagePumpLibevent::WatchFileDescriptor(),
     33 // is owned by the FileDescriptorWatcher, and is destroyed in
     34 // StopWatchingFileDescriptor().
     35 // It is moved into and out of lists in struct event_base by
     36 // the libevent functions event_add() and event_del().
     37 //
     38 // TODO(dkegel):
     39 // At the moment bad things happen if a FileDescriptorWatcher
     40 // is active after its MessagePumpLibevent has been destroyed.
     41 // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop
     42 // Not clear yet whether that situation occurs in practice,
     43 // but if it does, we need to fix it.
     44 
     45 namespace base {
     46 
     47 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
     48     : event_(NULL),
     49       pump_(NULL),
     50       watcher_(NULL),
     51       was_destroyed_(NULL) {
     52 }
     53 
     54 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
     55   if (event_) {
     56     StopWatchingFileDescriptor();
     57   }
     58   if (was_destroyed_) {
     59     DCHECK(!*was_destroyed_);
     60     *was_destroyed_ = true;
     61   }
     62 }
     63 
     64 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
     65   event* e = ReleaseEvent();
     66   if (e == NULL)
     67     return true;
     68 
     69   // event_del() is a no-op if the event isn't active.
     70   int rv = event_del(e);
     71   delete e;
     72   pump_ = NULL;
     73   watcher_ = NULL;
     74   return (rv == 0);
     75 }
     76 
     77 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e) {
     78   DCHECK(e);
     79   DCHECK(!event_);
     80 
     81   event_ = e;
     82 }
     83 
     84 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
     85   struct event *e = event_;
     86   event_ = NULL;
     87   return e;
     88 }
     89 
     90 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking(
     91     int fd,
     92     MessagePumpLibevent*) {
     93   // Since OnFileCanWriteWithoutBlocking() gets called first, it can stop
     94   // watching the file descriptor.
     95   if (!watcher_)
     96     return;
     97   watcher_->OnFileCanReadWithoutBlocking(fd);
     98 }
     99 
    100 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking(
    101     int fd,
    102     MessagePumpLibevent*) {
    103   DCHECK(watcher_);
    104   watcher_->OnFileCanWriteWithoutBlocking(fd);
    105 }
    106 
    107 MessagePumpLibevent::MessagePumpLibevent()
    108     : keep_running_(true),
    109       in_run_(false),
    110       processed_io_events_(false),
    111       event_base_(event_base_new()),
    112       wakeup_pipe_in_(-1),
    113       wakeup_pipe_out_(-1) {
    114   if (!Init())
    115      NOTREACHED();
    116 }
    117 
    118 MessagePumpLibevent::~MessagePumpLibevent() {
    119   DCHECK(wakeup_event_);
    120   DCHECK(event_base_);
    121   event_del(wakeup_event_);
    122   delete wakeup_event_;
    123   if (wakeup_pipe_in_ >= 0) {
    124     if (IGNORE_EINTR(close(wakeup_pipe_in_)) < 0)
    125       DPLOG(ERROR) << "close";
    126   }
    127   if (wakeup_pipe_out_ >= 0) {
    128     if (IGNORE_EINTR(close(wakeup_pipe_out_)) < 0)
    129       DPLOG(ERROR) << "close";
    130   }
    131   event_base_free(event_base_);
    132 }
    133 
    134 bool MessagePumpLibevent::WatchFileDescriptor(int fd,
    135                                               bool persistent,
    136                                               int mode,
    137                                               FileDescriptorWatcher *controller,
    138                                               Watcher *delegate) {
    139   DCHECK_GE(fd, 0);
    140   DCHECK(controller);
    141   DCHECK(delegate);
    142   DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
    143   // WatchFileDescriptor should be called on the pump thread. It is not
    144   // threadsafe, and your watcher may never be registered.
    145   DCHECK(watch_file_descriptor_caller_checker_.CalledOnValidThread());
    146 
    147   int event_mask = persistent ? EV_PERSIST : 0;
    148   if (mode & WATCH_READ) {
    149     event_mask |= EV_READ;
    150   }
    151   if (mode & WATCH_WRITE) {
    152     event_mask |= EV_WRITE;
    153   }
    154 
    155   std::unique_ptr<event> evt(controller->ReleaseEvent());
    156   if (evt.get() == NULL) {
    157     // Ownership is transferred to the controller.
    158     evt.reset(new event);
    159   } else {
    160     // Make sure we don't pick up any funky internal libevent masks.
    161     int old_interest_mask = evt.get()->ev_events &
    162         (EV_READ | EV_WRITE | EV_PERSIST);
    163 
    164     // Combine old/new event masks.
    165     event_mask |= old_interest_mask;
    166 
    167     // Must disarm the event before we can reuse it.
    168     event_del(evt.get());
    169 
    170     // It's illegal to use this function to listen on 2 separate fds with the
    171     // same |controller|.
    172     if (EVENT_FD(evt.get()) != fd) {
    173       NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd;
    174       return false;
    175     }
    176   }
    177 
    178   // Set current interest mask and message pump for this event.
    179   event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller);
    180 
    181   // Tell libevent which message pump this socket will belong to when we add it.
    182   if (event_base_set(event_base_, evt.get())) {
    183     return false;
    184   }
    185 
    186   // Add this socket to the list of monitored sockets.
    187   if (event_add(evt.get(), NULL)) {
    188     return false;
    189   }
    190 
    191   // Transfer ownership of evt to controller.
    192   controller->Init(evt.release());
    193 
    194   controller->set_watcher(delegate);
    195   controller->set_pump(this);
    196 
    197   return true;
    198 }
    199 
    200 // Tell libevent to break out of inner loop.
    201 static void timer_callback(int /*fd*/, short /*events*/, void* context) {
    202   event_base_loopbreak((struct event_base *)context);
    203 }
    204 
    205 // Reentrant!
    206 void MessagePumpLibevent::Run(Delegate* delegate) {
    207   AutoReset<bool> auto_reset_keep_running(&keep_running_, true);
    208   AutoReset<bool> auto_reset_in_run(&in_run_, true);
    209 
    210   // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641.
    211   // Instead, make our own timer and reuse it on each call to event_base_loop().
    212   std::unique_ptr<event> timer_event(new event);
    213 
    214   for (;;) {
    215 #if defined(OS_MACOSX)
    216     mac::ScopedNSAutoreleasePool autorelease_pool;
    217 #endif
    218 
    219     bool did_work = delegate->DoWork();
    220     if (!keep_running_)
    221       break;
    222 
    223     event_base_loop(event_base_, EVLOOP_NONBLOCK);
    224     did_work |= processed_io_events_;
    225     processed_io_events_ = false;
    226     if (!keep_running_)
    227       break;
    228 
    229     did_work |= delegate->DoDelayedWork(&delayed_work_time_);
    230     if (!keep_running_)
    231       break;
    232 
    233     if (did_work)
    234       continue;
    235 
    236     did_work = delegate->DoIdleWork();
    237     if (!keep_running_)
    238       break;
    239 
    240     if (did_work)
    241       continue;
    242 
    243     // EVLOOP_ONCE tells libevent to only block once,
    244     // but to service all pending events when it wakes up.
    245     if (delayed_work_time_.is_null()) {
    246       event_base_loop(event_base_, EVLOOP_ONCE);
    247     } else {
    248       TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
    249       if (delay > TimeDelta()) {
    250         struct timeval poll_tv;
    251         poll_tv.tv_sec = delay.InSeconds();
    252         poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
    253         event_set(timer_event.get(), -1, 0, timer_callback, event_base_);
    254         event_base_set(event_base_, timer_event.get());
    255         event_add(timer_event.get(), &poll_tv);
    256         event_base_loop(event_base_, EVLOOP_ONCE);
    257         event_del(timer_event.get());
    258       } else {
    259         // It looks like delayed_work_time_ indicates a time in the past, so we
    260         // need to call DoDelayedWork now.
    261         delayed_work_time_ = TimeTicks();
    262       }
    263     }
    264 
    265     if (!keep_running_)
    266       break;
    267   }
    268 }
    269 
    270 void MessagePumpLibevent::Quit() {
    271   DCHECK(in_run_) << "Quit was called outside of Run!";
    272   // Tell both libevent and Run that they should break out of their loops.
    273   keep_running_ = false;
    274   ScheduleWork();
    275 }
    276 
    277 void MessagePumpLibevent::ScheduleWork() {
    278   // Tell libevent (in a threadsafe way) that it should break out of its loop.
    279   char buf = 0;
    280   int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
    281   DCHECK(nwrite == 1 || errno == EAGAIN)
    282       << "[nwrite:" << nwrite << "] [errno:" << errno << "]";
    283 }
    284 
    285 void MessagePumpLibevent::ScheduleDelayedWork(
    286     const TimeTicks& delayed_work_time) {
    287   // We know that we can't be blocked on Wait right now since this method can
    288   // only be called on the same thread as Run, so we only need to update our
    289   // record of how long to sleep when we do sleep.
    290   delayed_work_time_ = delayed_work_time;
    291 }
    292 
    293 bool MessagePumpLibevent::Init() {
    294   int fds[2];
    295   if (pipe(fds)) {
    296     DLOG(ERROR) << "pipe() failed, errno: " << errno;
    297     return false;
    298   }
    299   if (!SetNonBlocking(fds[0])) {
    300     DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno;
    301     return false;
    302   }
    303   if (!SetNonBlocking(fds[1])) {
    304     DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno;
    305     return false;
    306   }
    307   wakeup_pipe_out_ = fds[0];
    308   wakeup_pipe_in_ = fds[1];
    309 
    310   wakeup_event_ = new event;
    311   event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
    312             OnWakeup, this);
    313   event_base_set(event_base_, wakeup_event_);
    314 
    315   if (event_add(wakeup_event_, 0))
    316     return false;
    317   return true;
    318 }
    319 
    320 // static
    321 void MessagePumpLibevent::OnLibeventNotification(int fd,
    322                                                  short flags,
    323                                                  void* context) {
    324   FileDescriptorWatcher* controller =
    325       static_cast<FileDescriptorWatcher*>(context);
    326   DCHECK(controller);
    327   TRACE_EVENT1("toplevel", "MessagePumpLibevent::OnLibeventNotification",
    328                "fd", fd);
    329 
    330   MessagePumpLibevent* pump = controller->pump();
    331   pump->processed_io_events_ = true;
    332 
    333   if ((flags & (EV_READ | EV_WRITE)) == (EV_READ | EV_WRITE)) {
    334     // Both callbacks will be called. It is necessary to check that |controller|
    335     // is not destroyed.
    336     bool controller_was_destroyed = false;
    337     controller->was_destroyed_ = &controller_was_destroyed;
    338     controller->OnFileCanWriteWithoutBlocking(fd, pump);
    339     if (!controller_was_destroyed)
    340       controller->OnFileCanReadWithoutBlocking(fd, pump);
    341     if (!controller_was_destroyed)
    342       controller->was_destroyed_ = nullptr;
    343   } else if (flags & EV_WRITE) {
    344     controller->OnFileCanWriteWithoutBlocking(fd, pump);
    345   } else if (flags & EV_READ) {
    346     controller->OnFileCanReadWithoutBlocking(fd, pump);
    347   }
    348 }
    349 
    350 // Called if a byte is received on the wakeup pipe.
    351 // static
    352 void MessagePumpLibevent::OnWakeup(int socket, short /*flags*/, void* context) {
    353   MessagePumpLibevent* that = static_cast<MessagePumpLibevent*>(context);
    354   DCHECK(that->wakeup_pipe_out_ == socket);
    355 
    356   // Remove and discard the wakeup byte.
    357   char buf;
    358   int nread = HANDLE_EINTR(read(socket, &buf, 1));
    359   DCHECK_EQ(nread, 1);
    360   that->processed_io_events_ = true;
    361   // Tell libevent to break out of inner loop.
    362   event_base_loopbreak(that->event_base_);
    363 }
    364 
    365 }  // namespace base
    366