Home | History | Annotate | Download | only in base
      1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "base/message_pump_libevent.h"
      6 
      7 #include <errno.h>
      8 #include <fcntl.h>
      9 
     10 #include "base/auto_reset.h"
     11 #include "base/eintr_wrapper.h"
     12 #include "base/logging.h"
     13 #include "base/mac/scoped_nsautorelease_pool.h"
     14 #include "base/memory/scoped_ptr.h"
     15 #include "base/observer_list.h"
     16 #include "base/time.h"
     17 #if defined(USE_SYSTEM_LIBEVENT)
     18 #include <event.h>
     19 #else
     20 #include "third_party/libevent/event.h"
     21 #endif
     22 
     23 // Lifecycle of struct event
     24 // Libevent uses two main data structures:
     25 // struct event_base (of which there is one per message pump), and
     26 // struct event (of which there is roughly one per socket).
     27 // The socket's struct event is created in
     28 // MessagePumpLibevent::WatchFileDescriptor(),
     29 // is owned by the FileDescriptorWatcher, and is destroyed in
     30 // StopWatchingFileDescriptor().
     31 // It is moved into and out of lists in struct event_base by
     32 // the libevent functions event_add() and event_del().
     33 //
     34 // TODO(dkegel):
     35 // At the moment bad things happen if a FileDescriptorWatcher
     36 // is active after its MessagePumpLibevent has been destroyed.
     37 // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop
     38 // Not clear yet whether that situation occurs in practice,
     39 // but if it does, we need to fix it.
     40 
     41 namespace base {
     42 
     43 // Return 0 on success
     44 // Too small a function to bother putting in a library?
     45 static int SetNonBlocking(int fd) {
     46   int flags = fcntl(fd, F_GETFL, 0);
     47   if (flags == -1)
     48     flags = 0;
     49   return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
     50 }
     51 
     52 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
     53     : is_persistent_(false),
     54       event_(NULL),
     55       pump_(NULL),
     56       watcher_(NULL) {
     57 }
     58 
     59 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
     60   if (event_) {
     61     StopWatchingFileDescriptor();
     62   }
     63 }
     64 
     65 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
     66   event* e = ReleaseEvent();
     67   if (e == NULL)
     68     return true;
     69 
     70   // event_del() is a no-op if the event isn't active.
     71   int rv = event_del(e);
     72   delete e;
     73   pump_ = NULL;
     74   watcher_ = NULL;
     75   return (rv == 0);
     76 }
     77 
     78 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e,
     79                                                       bool is_persistent) {
     80   DCHECK(e);
     81   DCHECK(!event_);
     82 
     83   is_persistent_ = is_persistent;
     84   event_ = e;
     85 }
     86 
     87 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
     88   struct event *e = event_;
     89   event_ = NULL;
     90   return e;
     91 }
     92 
     93 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking(
     94     int fd, MessagePumpLibevent* pump) {
     95   pump->WillProcessIOEvent();
     96   watcher_->OnFileCanReadWithoutBlocking(fd);
     97   pump->DidProcessIOEvent();
     98 }
     99 
    100 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking(
    101     int fd, MessagePumpLibevent* pump) {
    102   pump->WillProcessIOEvent();
    103   watcher_->OnFileCanWriteWithoutBlocking(fd);
    104   pump->DidProcessIOEvent();
    105 }
    106 
    107 MessagePumpLibevent::MessagePumpLibevent()
    108     : keep_running_(true),
    109       in_run_(false),
    110       event_base_(event_base_new()),
    111       wakeup_pipe_in_(-1),
    112       wakeup_pipe_out_(-1) {
    113   if (!Init())
    114      NOTREACHED();
    115 }
    116 
    117 MessagePumpLibevent::~MessagePumpLibevent() {
    118   DCHECK(wakeup_event_);
    119   DCHECK(event_base_);
    120   event_del(wakeup_event_);
    121   delete wakeup_event_;
    122   if (wakeup_pipe_in_ >= 0) {
    123     if (HANDLE_EINTR(close(wakeup_pipe_in_)) < 0)
    124       PLOG(ERROR) << "close";
    125   }
    126   if (wakeup_pipe_out_ >= 0) {
    127     if (HANDLE_EINTR(close(wakeup_pipe_out_)) < 0)
    128       PLOG(ERROR) << "close";
    129   }
    130   event_base_free(event_base_);
    131 }
    132 
    133 bool MessagePumpLibevent::WatchFileDescriptor(int fd,
    134                                               bool persistent,
    135                                               Mode mode,
    136                                               FileDescriptorWatcher *controller,
    137                                               Watcher *delegate) {
    138   DCHECK_GE(fd, 0);
    139   DCHECK(controller);
    140   DCHECK(delegate);
    141   DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
    142 
    143   int event_mask = persistent ? EV_PERSIST : 0;
    144   if ((mode & WATCH_READ) != 0) {
    145     event_mask |= EV_READ;
    146   }
    147   if ((mode & WATCH_WRITE) != 0) {
    148     event_mask |= EV_WRITE;
    149   }
    150 
    151   scoped_ptr<event> evt(controller->ReleaseEvent());
    152   if (evt.get() == NULL) {
    153     // Ownership is transferred to the controller.
    154     evt.reset(new event);
    155   } else {
    156     // Make sure we don't pick up any funky internal libevent masks.
    157     int old_interest_mask = evt.get()->ev_events &
    158         (EV_READ | EV_WRITE | EV_PERSIST);
    159 
    160     // Combine old/new event masks.
    161     event_mask |= old_interest_mask;
    162 
    163     // Must disarm the event before we can reuse it.
    164     event_del(evt.get());
    165 
    166     // It's illegal to use this function to listen on 2 separate fds with the
    167     // same |controller|.
    168     if (EVENT_FD(evt.get()) != fd) {
    169       NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd;
    170       return false;
    171     }
    172   }
    173 
    174   // Set current interest mask and message pump for this event.
    175   event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller);
    176 
    177   // Tell libevent which message pump this socket will belong to when we add it.
    178   if (event_base_set(event_base_, evt.get()) != 0) {
    179     return false;
    180   }
    181 
    182   // Add this socket to the list of monitored sockets.
    183   if (event_add(evt.get(), NULL) != 0) {
    184     return false;
    185   }
    186 
    187   // Transfer ownership of evt to controller.
    188   controller->Init(evt.release(), persistent);
    189 
    190   controller->set_watcher(delegate);
    191   controller->set_pump(this);
    192 
    193   return true;
    194 }
    195 
    196 void MessagePumpLibevent::AddIOObserver(IOObserver *obs) {
    197   io_observers_.AddObserver(obs);
    198 }
    199 
    200 void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) {
    201   io_observers_.RemoveObserver(obs);
    202 }
    203 
    204 // Tell libevent to break out of inner loop.
    205 static void timer_callback(int fd, short events, void *context)
    206 {
    207   event_base_loopbreak((struct event_base *)context);
    208 }
    209 
    210 // Reentrant!
    211 void MessagePumpLibevent::Run(Delegate* delegate) {
    212   DCHECK(keep_running_) << "Quit must have been called outside of Run!";
    213   AutoReset<bool> auto_reset_in_run(&in_run_, true);
    214 
    215   // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641.
    216   // Instead, make our own timer and reuse it on each call to event_base_loop().
    217   scoped_ptr<event> timer_event(new event);
    218 
    219   for (;;) {
    220     mac::ScopedNSAutoreleasePool autorelease_pool;
    221 
    222     bool did_work = delegate->DoWork();
    223     if (!keep_running_)
    224       break;
    225 
    226     did_work |= delegate->DoDelayedWork(&delayed_work_time_);
    227     if (!keep_running_)
    228       break;
    229 
    230     if (did_work)
    231       continue;
    232 
    233     did_work = delegate->DoIdleWork();
    234     if (!keep_running_)
    235       break;
    236 
    237     if (did_work)
    238       continue;
    239 
    240     // EVLOOP_ONCE tells libevent to only block once,
    241     // but to service all pending events when it wakes up.
    242     if (delayed_work_time_.is_null()) {
    243       event_base_loop(event_base_, EVLOOP_ONCE);
    244     } else {
    245       TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
    246       if (delay > TimeDelta()) {
    247         struct timeval poll_tv;
    248         poll_tv.tv_sec = delay.InSeconds();
    249         poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
    250         event_set(timer_event.get(), -1, 0, timer_callback, event_base_);
    251         event_base_set(event_base_, timer_event.get());
    252         event_add(timer_event.get(), &poll_tv);
    253         event_base_loop(event_base_, EVLOOP_ONCE);
    254         event_del(timer_event.get());
    255       } else {
    256         // It looks like delayed_work_time_ indicates a time in the past, so we
    257         // need to call DoDelayedWork now.
    258         delayed_work_time_ = TimeTicks();
    259       }
    260     }
    261   }
    262 
    263   keep_running_ = true;
    264 }
    265 
    266 void MessagePumpLibevent::Quit() {
    267   DCHECK(in_run_);
    268   // Tell both libevent and Run that they should break out of their loops.
    269   keep_running_ = false;
    270   ScheduleWork();
    271 }
    272 
    273 void MessagePumpLibevent::ScheduleWork() {
    274   // Tell libevent (in a threadsafe way) that it should break out of its loop.
    275   char buf = 0;
    276   int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
    277   DCHECK(nwrite == 1 || errno == EAGAIN)
    278       << "[nwrite:" << nwrite << "] [errno:" << errno << "]";
    279 }
    280 
    281 void MessagePumpLibevent::ScheduleDelayedWork(
    282     const TimeTicks& delayed_work_time) {
    283   // We know that we can't be blocked on Wait right now since this method can
    284   // only be called on the same thread as Run, so we only need to update our
    285   // record of how long to sleep when we do sleep.
    286   delayed_work_time_ = delayed_work_time;
    287 }
    288 
    289 void MessagePumpLibevent::WillProcessIOEvent() {
    290   FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent());
    291 }
    292 
    293 void MessagePumpLibevent::DidProcessIOEvent() {
    294   FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent());
    295 }
    296 
    297 bool MessagePumpLibevent::Init() {
    298   int fds[2];
    299   if (pipe(fds)) {
    300     DLOG(ERROR) << "pipe() failed, errno: " << errno;
    301     return false;
    302   }
    303   if (SetNonBlocking(fds[0])) {
    304     DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno;
    305     return false;
    306   }
    307   if (SetNonBlocking(fds[1])) {
    308     DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno;
    309     return false;
    310   }
    311   wakeup_pipe_out_ = fds[0];
    312   wakeup_pipe_in_ = fds[1];
    313 
    314   wakeup_event_ = new event;
    315   event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
    316             OnWakeup, this);
    317   event_base_set(event_base_, wakeup_event_);
    318 
    319   if (event_add(wakeup_event_, 0))
    320     return false;
    321   return true;
    322 }
    323 
    324 // static
    325 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
    326                                                  void* context) {
    327   FileDescriptorWatcher* controller =
    328       static_cast<FileDescriptorWatcher*>(context);
    329 
    330   MessagePumpLibevent* pump = controller->pump();
    331 
    332   if (flags & EV_WRITE) {
    333     controller->OnFileCanWriteWithoutBlocking(fd, pump);
    334   }
    335   if (flags & EV_READ) {
    336     controller->OnFileCanReadWithoutBlocking(fd, pump);
    337   }
    338 }
    339 
    340 // Called if a byte is received on the wakeup pipe.
    341 // static
    342 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
    343   base::MessagePumpLibevent* that =
    344               static_cast<base::MessagePumpLibevent*>(context);
    345   DCHECK(that->wakeup_pipe_out_ == socket);
    346 
    347   // Remove and discard the wakeup byte.
    348   char buf;
    349   int nread = HANDLE_EINTR(read(socket, &buf, 1));
    350   DCHECK_EQ(nread, 1);
    351   // Tell libevent to break out of inner loop.
    352   event_base_loopbreak(that->event_base_);
    353 }
    354 
    355 }  // namespace base
    356