Home | History | Annotate | Download | only in base
      1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "base/message_pump_libevent.h"
      6 
      7 #include <errno.h>
      8 #include <fcntl.h>
      9 
     10 #include "eintr_wrapper.h"
     11 #include "base/auto_reset.h"
     12 #include "base/logging.h"
     13 #include "base/scoped_nsautorelease_pool.h"
     14 #include "base/scoped_ptr.h"
     15 #include "base/time.h"
     16 #if defined(USE_SYSTEM_LIBEVENT)
     17 #include <event.h>
     18 #else
     19 #include "third_party/libevent/event.h"
     20 #endif
     21 
     22 // Lifecycle of struct event
     23 // Libevent uses two main data structures:
     24 // struct event_base (of which there is one per message pump), and
     25 // struct event (of which there is roughly one per socket).
     26 // The socket's struct event is created in
     27 // MessagePumpLibevent::WatchFileDescriptor(),
     28 // is owned by the FileDescriptorWatcher, and is destroyed in
     29 // StopWatchingFileDescriptor().
     30 // It is moved into and out of lists in struct event_base by
     31 // the libevent functions event_add() and event_del().
     32 //
     33 // TODO(dkegel):
     34 // At the moment bad things happen if a FileDescriptorWatcher
     35 // is active after its MessagePumpLibevent has been destroyed.
     36 // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop
     37 // Not clear yet whether that situation occurs in practice,
     38 // but if it does, we need to fix it.
     39 
     40 namespace base {
     41 
     42 // Return 0 on success
     43 // Too small a function to bother putting in a library?
     44 static int SetNonBlocking(int fd) {
     45   int flags = fcntl(fd, F_GETFL, 0);
     46   if (flags == -1)
     47     flags = 0;
     48   return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
     49 }
     50 
     51 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
     52     : is_persistent_(false),
     53       event_(NULL) {
     54 }
     55 
     56 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
     57   if (event_) {
     58     StopWatchingFileDescriptor();
     59   }
     60 }
     61 
     62 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e,
     63                                                       bool is_persistent) {
     64   DCHECK(e);
     65   DCHECK(event_ == NULL);
     66 
     67   is_persistent_ = is_persistent;
     68   event_ = e;
     69 }
     70 
     71 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
     72   struct event *e = event_;
     73   event_ = NULL;
     74   return e;
     75 }
     76 
     77 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
     78   event* e = ReleaseEvent();
     79   if (e == NULL)
     80     return true;
     81 
     82   // event_del() is a no-op if the event isn't active.
     83   int rv = event_del(e);
     84   delete e;
     85   return (rv == 0);
     86 }
     87 
     88 // Called if a byte is received on the wakeup pipe.
     89 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
     90   base::MessagePumpLibevent* that =
     91               static_cast<base::MessagePumpLibevent*>(context);
     92   DCHECK(that->wakeup_pipe_out_ == socket);
     93 
     94   // Remove and discard the wakeup byte.
     95   char buf;
     96   int nread = HANDLE_EINTR(read(socket, &buf, 1));
     97   DCHECK_EQ(nread, 1);
     98   // Tell libevent to break out of inner loop.
     99   event_base_loopbreak(that->event_base_);
    100 }
    101 
    102 MessagePumpLibevent::MessagePumpLibevent()
    103     : keep_running_(true),
    104       in_run_(false),
    105       event_base_(event_base_new()),
    106       wakeup_pipe_in_(-1),
    107       wakeup_pipe_out_(-1) {
    108   if (!Init())
    109      NOTREACHED();
    110 }
    111 
    112 bool MessagePumpLibevent::Init() {
    113   int fds[2];
    114   if (pipe(fds)) {
    115     DLOG(ERROR) << "pipe() failed, errno: " << errno;
    116     return false;
    117   }
    118   if (SetNonBlocking(fds[0])) {
    119     DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno;
    120     return false;
    121   }
    122   if (SetNonBlocking(fds[1])) {
    123     DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno;
    124     return false;
    125   }
    126   wakeup_pipe_out_ = fds[0];
    127   wakeup_pipe_in_ = fds[1];
    128 
    129   wakeup_event_ = new event;
    130   event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
    131             OnWakeup, this);
    132   event_base_set(event_base_, wakeup_event_);
    133 
    134   if (event_add(wakeup_event_, 0))
    135     return false;
    136   return true;
    137 }
    138 
    139 MessagePumpLibevent::~MessagePumpLibevent() {
    140   DCHECK(wakeup_event_);
    141   DCHECK(event_base_);
    142   event_del(wakeup_event_);
    143   delete wakeup_event_;
    144   if (wakeup_pipe_in_ >= 0)
    145     close(wakeup_pipe_in_);
    146   if (wakeup_pipe_out_ >= 0)
    147     close(wakeup_pipe_out_);
    148   event_base_free(event_base_);
    149 }
    150 
    151 bool MessagePumpLibevent::WatchFileDescriptor(int fd,
    152                                               bool persistent,
    153                                               Mode mode,
    154                                               FileDescriptorWatcher *controller,
    155                                               Watcher *delegate) {
    156   DCHECK_GE(fd, 0);
    157   DCHECK(controller);
    158   DCHECK(delegate);
    159   DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
    160 
    161   int event_mask = persistent ? EV_PERSIST : 0;
    162   if ((mode & WATCH_READ) != 0) {
    163     event_mask |= EV_READ;
    164   }
    165   if ((mode & WATCH_WRITE) != 0) {
    166     event_mask |= EV_WRITE;
    167   }
    168 
    169   scoped_ptr<event> evt(controller->ReleaseEvent());
    170   if (evt.get() == NULL) {
    171     // Ownership is transferred to the controller.
    172     evt.reset(new event);
    173   } else {
    174     // Make sure we don't pick up any funky internal libevent masks.
    175     int old_interest_mask = evt.get()->ev_events &
    176         (EV_READ | EV_WRITE | EV_PERSIST);
    177 
    178     // Combine old/new event masks.
    179     event_mask |= old_interest_mask;
    180 
    181     // Must disarm the event before we can reuse it.
    182     event_del(evt.get());
    183 
    184     // It's illegal to use this function to listen on 2 separate fds with the
    185     // same |controller|.
    186     if (EVENT_FD(evt.get()) != fd) {
    187       NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd;
    188       return false;
    189     }
    190   }
    191 
    192   // Set current interest mask and message pump for this event.
    193   event_set(evt.get(), fd, event_mask, OnLibeventNotification, delegate);
    194 
    195   // Tell libevent which message pump this socket will belong to when we add it.
    196   if (event_base_set(event_base_, evt.get()) != 0) {
    197     return false;
    198   }
    199 
    200   // Add this socket to the list of monitored sockets.
    201   if (event_add(evt.get(), NULL) != 0) {
    202     return false;
    203   }
    204 
    205   // Transfer ownership of evt to controller.
    206   controller->Init(evt.release(), persistent);
    207   return true;
    208 }
    209 
    210 
    211 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
    212                                                  void* context) {
    213   Watcher* watcher = static_cast<Watcher*>(context);
    214 
    215   if (flags & EV_WRITE) {
    216     watcher->OnFileCanWriteWithoutBlocking(fd);
    217   }
    218   if (flags & EV_READ) {
    219     watcher->OnFileCanReadWithoutBlocking(fd);
    220   }
    221 }
    222 
    223 // Tell libevent to break out of inner loop.
    224 static void timer_callback(int fd, short events, void *context)
    225 {
    226   event_base_loopbreak((struct event_base *)context);
    227 }
    228 
    229 // Reentrant!
    230 void MessagePumpLibevent::Run(Delegate* delegate) {
    231   DCHECK(keep_running_) << "Quit must have been called outside of Run!";
    232   AutoReset auto_reset_in_run(&in_run_, true);
    233 
    234   // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641.
    235   // Instead, make our own timer and reuse it on each call to event_base_loop().
    236   scoped_ptr<event> timer_event(new event);
    237 
    238   for (;;) {
    239     ScopedNSAutoreleasePool autorelease_pool;
    240 
    241     bool did_work = delegate->DoWork();
    242     if (!keep_running_)
    243       break;
    244 
    245     did_work |= delegate->DoDelayedWork(&delayed_work_time_);
    246     if (!keep_running_)
    247       break;
    248 
    249     if (did_work)
    250       continue;
    251 
    252     did_work = delegate->DoIdleWork();
    253     if (!keep_running_)
    254       break;
    255 
    256     if (did_work)
    257       continue;
    258 
    259     // EVLOOP_ONCE tells libevent to only block once,
    260     // but to service all pending events when it wakes up.
    261     if (delayed_work_time_.is_null()) {
    262       event_base_loop(event_base_, EVLOOP_ONCE);
    263     } else {
    264       TimeDelta delay = delayed_work_time_ - Time::Now();
    265       if (delay > TimeDelta()) {
    266         struct timeval poll_tv;
    267         poll_tv.tv_sec = delay.InSeconds();
    268         poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
    269         event_set(timer_event.get(), -1, 0, timer_callback, event_base_);
    270         event_base_set(event_base_, timer_event.get());
    271         event_add(timer_event.get(), &poll_tv);
    272         event_base_loop(event_base_, EVLOOP_ONCE);
    273         event_del(timer_event.get());
    274       } else {
    275         // It looks like delayed_work_time_ indicates a time in the past, so we
    276         // need to call DoDelayedWork now.
    277         delayed_work_time_ = Time();
    278       }
    279     }
    280   }
    281 
    282   keep_running_ = true;
    283 }
    284 
    285 void MessagePumpLibevent::Quit() {
    286   DCHECK(in_run_);
    287   // Tell both libevent and Run that they should break out of their loops.
    288   keep_running_ = false;
    289   ScheduleWork();
    290 }
    291 
    292 void MessagePumpLibevent::ScheduleWork() {
    293   // Tell libevent (in a threadsafe way) that it should break out of its loop.
    294   char buf = 0;
    295   int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
    296   DCHECK(nwrite == 1 || errno == EAGAIN)
    297       << "[nwrite:" << nwrite << "] [errno:" << errno << "]";
    298 }
    299 
    300 void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) {
    301   // We know that we can't be blocked on Wait right now since this method can
    302   // only be called on the same thread as Run, so we only need to update our
    303   // record of how long to sleep when we do sleep.
    304   delayed_work_time_ = delayed_work_time;
    305 }
    306 
    307 }  // namespace base
    308