Home | History | Annotate | Download | only in message_loop
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "base/message_loop/message_pump_libevent.h"
      6 
      7 #include <errno.h>
      8 #include <fcntl.h>
      9 #include <unistd.h>
     10 
     11 #include "base/auto_reset.h"
     12 #include "base/compiler_specific.h"
     13 #include "base/logging.h"
     14 #include "base/memory/scoped_ptr.h"
     15 #include "base/observer_list.h"
     16 #include "base/posix/eintr_wrapper.h"
     17 #include "base/time/time.h"
     18 #include "third_party/libevent/event.h"
     19 
     20 #if defined(OS_MACOSX)
     21 #include "base/mac/scoped_nsautorelease_pool.h"
     22 #endif
     23 
     24 // Lifecycle of struct event
     25 // Libevent uses two main data structures:
     26 // struct event_base (of which there is one per message pump), and
     27 // struct event (of which there is roughly one per socket).
     28 // The socket's struct event is created in
     29 // MessagePumpLibevent::WatchFileDescriptor(),
     30 // is owned by the FileDescriptorWatcher, and is destroyed in
     31 // StopWatchingFileDescriptor().
     32 // It is moved into and out of lists in struct event_base by
     33 // the libevent functions event_add() and event_del().
     34 //
     35 // TODO(dkegel):
     36 // At the moment bad things happen if a FileDescriptorWatcher
     37 // is active after its MessagePumpLibevent has been destroyed.
     38 // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop
     39 // Not clear yet whether that situation occurs in practice,
     40 // but if it does, we need to fix it.
     41 
     42 namespace base {
     43 
     44 // Return 0 on success
     45 // Too small a function to bother putting in a library?
     46 static int SetNonBlocking(int fd) {
     47   int flags = fcntl(fd, F_GETFL, 0);
     48   if (flags == -1)
     49     flags = 0;
     50   return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
     51 }
     52 
     53 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
     54     : event_(NULL),
     55       pump_(NULL),
     56       watcher_(NULL),
     57       weak_factory_(this) {
     58 }
     59 
     60 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
     61   if (event_) {
     62     StopWatchingFileDescriptor();
     63   }
     64 }
     65 
     66 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
     67   event* e = ReleaseEvent();
     68   if (e == NULL)
     69     return true;
     70 
     71   // event_del() is a no-op if the event isn't active.
     72   int rv = event_del(e);
     73   delete e;
     74   pump_ = NULL;
     75   watcher_ = NULL;
     76   return (rv == 0);
     77 }
     78 
     79 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e) {
     80   DCHECK(e);
     81   DCHECK(!event_);
     82 
     83   event_ = e;
     84 }
     85 
     86 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
     87   struct event *e = event_;
     88   event_ = NULL;
     89   return e;
     90 }
     91 
     92 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking(
     93     int fd, MessagePumpLibevent* pump) {
     94   // Since OnFileCanWriteWithoutBlocking() gets called first, it can stop
     95   // watching the file descriptor.
     96   if (!watcher_)
     97     return;
     98   pump->WillProcessIOEvent();
     99   watcher_->OnFileCanReadWithoutBlocking(fd);
    100   pump->DidProcessIOEvent();
    101 }
    102 
    103 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking(
    104     int fd, MessagePumpLibevent* pump) {
    105   DCHECK(watcher_);
    106   pump->WillProcessIOEvent();
    107   watcher_->OnFileCanWriteWithoutBlocking(fd);
    108   pump->DidProcessIOEvent();
    109 }
    110 
    111 MessagePumpLibevent::MessagePumpLibevent()
    112     : keep_running_(true),
    113       in_run_(false),
    114       processed_io_events_(false),
    115       event_base_(event_base_new()),
    116       wakeup_pipe_in_(-1),
    117       wakeup_pipe_out_(-1) {
    118   if (!Init())
    119      NOTREACHED();
    120 }
    121 
    122 MessagePumpLibevent::~MessagePumpLibevent() {
    123   DCHECK(wakeup_event_);
    124   DCHECK(event_base_);
    125   event_del(wakeup_event_);
    126   delete wakeup_event_;
    127   if (wakeup_pipe_in_ >= 0) {
    128     if (HANDLE_EINTR(close(wakeup_pipe_in_)) < 0)
    129       DPLOG(ERROR) << "close";
    130   }
    131   if (wakeup_pipe_out_ >= 0) {
    132     if (HANDLE_EINTR(close(wakeup_pipe_out_)) < 0)
    133       DPLOG(ERROR) << "close";
    134   }
    135   event_base_free(event_base_);
    136 }
    137 
    138 bool MessagePumpLibevent::WatchFileDescriptor(int fd,
    139                                               bool persistent,
    140                                               int mode,
    141                                               FileDescriptorWatcher *controller,
    142                                               Watcher *delegate) {
    143   DCHECK_GE(fd, 0);
    144   DCHECK(controller);
    145   DCHECK(delegate);
    146   DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
    147   // WatchFileDescriptor should be called on the pump thread. It is not
    148   // threadsafe, and your watcher may never be registered.
    149   DCHECK(watch_file_descriptor_caller_checker_.CalledOnValidThread());
    150 
    151   int event_mask = persistent ? EV_PERSIST : 0;
    152   if (mode & WATCH_READ) {
    153     event_mask |= EV_READ;
    154   }
    155   if (mode & WATCH_WRITE) {
    156     event_mask |= EV_WRITE;
    157   }
    158 
    159   scoped_ptr<event> evt(controller->ReleaseEvent());
    160   if (evt.get() == NULL) {
    161     // Ownership is transferred to the controller.
    162     evt.reset(new event);
    163   } else {
    164     // Make sure we don't pick up any funky internal libevent masks.
    165     int old_interest_mask = evt.get()->ev_events &
    166         (EV_READ | EV_WRITE | EV_PERSIST);
    167 
    168     // Combine old/new event masks.
    169     event_mask |= old_interest_mask;
    170 
    171     // Must disarm the event before we can reuse it.
    172     event_del(evt.get());
    173 
    174     // It's illegal to use this function to listen on 2 separate fds with the
    175     // same |controller|.
    176     if (EVENT_FD(evt.get()) != fd) {
    177       NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd;
    178       return false;
    179     }
    180   }
    181 
    182   // Set current interest mask and message pump for this event.
    183   event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller);
    184 
    185   // Tell libevent which message pump this socket will belong to when we add it.
    186   if (event_base_set(event_base_, evt.get())) {
    187     return false;
    188   }
    189 
    190   // Add this socket to the list of monitored sockets.
    191   if (event_add(evt.get(), NULL)) {
    192     return false;
    193   }
    194 
    195   // Transfer ownership of evt to controller.
    196   controller->Init(evt.release());
    197 
    198   controller->set_watcher(delegate);
    199   controller->set_pump(this);
    200 
    201   return true;
    202 }
    203 
    204 void MessagePumpLibevent::AddIOObserver(IOObserver *obs) {
    205   io_observers_.AddObserver(obs);
    206 }
    207 
    208 void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) {
    209   io_observers_.RemoveObserver(obs);
    210 }
    211 
    212 // Tell libevent to break out of inner loop.
    213 static void timer_callback(int fd, short events, void *context)
    214 {
    215   event_base_loopbreak((struct event_base *)context);
    216 }
    217 
    218 // Reentrant!
    219 void MessagePumpLibevent::Run(Delegate* delegate) {
    220   DCHECK(keep_running_) << "Quit must have been called outside of Run!";
    221   AutoReset<bool> auto_reset_in_run(&in_run_, true);
    222 
    223   // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641.
    224   // Instead, make our own timer and reuse it on each call to event_base_loop().
    225   scoped_ptr<event> timer_event(new event);
    226 
    227   for (;;) {
    228 #if defined(OS_MACOSX)
    229     mac::ScopedNSAutoreleasePool autorelease_pool;
    230 #endif
    231 
    232     bool did_work = delegate->DoWork();
    233     if (!keep_running_)
    234       break;
    235 
    236     event_base_loop(event_base_, EVLOOP_NONBLOCK);
    237     did_work |= processed_io_events_;
    238     processed_io_events_ = false;
    239     if (!keep_running_)
    240       break;
    241 
    242     did_work |= delegate->DoDelayedWork(&delayed_work_time_);
    243     if (!keep_running_)
    244       break;
    245 
    246     if (did_work)
    247       continue;
    248 
    249     did_work = delegate->DoIdleWork();
    250     if (!keep_running_)
    251       break;
    252 
    253     if (did_work)
    254       continue;
    255 
    256     // EVLOOP_ONCE tells libevent to only block once,
    257     // but to service all pending events when it wakes up.
    258     if (delayed_work_time_.is_null()) {
    259       event_base_loop(event_base_, EVLOOP_ONCE);
    260     } else {
    261       TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
    262       if (delay > TimeDelta()) {
    263         struct timeval poll_tv;
    264         poll_tv.tv_sec = delay.InSeconds();
    265         poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
    266         event_set(timer_event.get(), -1, 0, timer_callback, event_base_);
    267         event_base_set(event_base_, timer_event.get());
    268         event_add(timer_event.get(), &poll_tv);
    269         event_base_loop(event_base_, EVLOOP_ONCE);
    270         event_del(timer_event.get());
    271       } else {
    272         // It looks like delayed_work_time_ indicates a time in the past, so we
    273         // need to call DoDelayedWork now.
    274         delayed_work_time_ = TimeTicks();
    275       }
    276     }
    277   }
    278 
    279   keep_running_ = true;
    280 }
    281 
    282 void MessagePumpLibevent::Quit() {
    283   DCHECK(in_run_);
    284   // Tell both libevent and Run that they should break out of their loops.
    285   keep_running_ = false;
    286   ScheduleWork();
    287 }
    288 
    289 void MessagePumpLibevent::ScheduleWork() {
    290   // Tell libevent (in a threadsafe way) that it should break out of its loop.
    291   char buf = 0;
    292   int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
    293   DCHECK(nwrite == 1 || errno == EAGAIN)
    294       << "[nwrite:" << nwrite << "] [errno:" << errno << "]";
    295 }
    296 
    297 void MessagePumpLibevent::ScheduleDelayedWork(
    298     const TimeTicks& delayed_work_time) {
    299   // We know that we can't be blocked on Wait right now since this method can
    300   // only be called on the same thread as Run, so we only need to update our
    301   // record of how long to sleep when we do sleep.
    302   delayed_work_time_ = delayed_work_time;
    303 }
    304 
    305 void MessagePumpLibevent::WillProcessIOEvent() {
    306   FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent());
    307 }
    308 
    309 void MessagePumpLibevent::DidProcessIOEvent() {
    310   FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent());
    311 }
    312 
    313 bool MessagePumpLibevent::Init() {
    314   int fds[2];
    315   if (pipe(fds)) {
    316     DLOG(ERROR) << "pipe() failed, errno: " << errno;
    317     return false;
    318   }
    319   if (SetNonBlocking(fds[0])) {
    320     DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno;
    321     return false;
    322   }
    323   if (SetNonBlocking(fds[1])) {
    324     DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno;
    325     return false;
    326   }
    327   wakeup_pipe_out_ = fds[0];
    328   wakeup_pipe_in_ = fds[1];
    329 
    330   wakeup_event_ = new event;
    331   event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
    332             OnWakeup, this);
    333   event_base_set(event_base_, wakeup_event_);
    334 
    335   if (event_add(wakeup_event_, 0))
    336     return false;
    337   return true;
    338 }
    339 
    340 // static
    341 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
    342                                                  void* context) {
    343   WeakPtr<FileDescriptorWatcher> controller =
    344       static_cast<FileDescriptorWatcher*>(context)->weak_factory_.GetWeakPtr();
    345   DCHECK(controller.get());
    346 
    347   MessagePumpLibevent* pump = controller->pump();
    348   pump->processed_io_events_ = true;
    349 
    350   if (flags & EV_WRITE) {
    351     controller->OnFileCanWriteWithoutBlocking(fd, pump);
    352   }
    353   // Check |controller| in case it's been deleted in
    354   // controller->OnFileCanWriteWithoutBlocking().
    355   if (controller.get() && flags & EV_READ) {
    356     controller->OnFileCanReadWithoutBlocking(fd, pump);
    357   }
    358 }
    359 
    360 // Called if a byte is received on the wakeup pipe.
    361 // static
    362 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
    363   MessagePumpLibevent* that = static_cast<MessagePumpLibevent*>(context);
    364   DCHECK(that->wakeup_pipe_out_ == socket);
    365 
    366   // Remove and discard the wakeup byte.
    367   char buf;
    368   int nread = HANDLE_EINTR(read(socket, &buf, 1));
    369   DCHECK_EQ(nread, 1);
    370   that->processed_io_events_ = true;
    371   // Tell libevent to break out of inner loop.
    372   event_base_loopbreak(that->event_base_);
    373 }
    374 
    375 }  // namespace base
    376