1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/message_loop/message_pump_libevent.h" 6 7 #include <errno.h> 8 #include <unistd.h> 9 10 #include <memory> 11 12 #include "base/auto_reset.h" 13 #include "base/compiler_specific.h" 14 #include "base/files/file_util.h" 15 #include "base/logging.h" 16 #include "base/observer_list.h" 17 #include "base/posix/eintr_wrapper.h" 18 #include "base/third_party/libevent/event.h" 19 #include "base/time/time.h" 20 #include "base/trace_event/trace_event.h" 21 #include "build/build_config.h" 22 23 #if defined(OS_MACOSX) 24 #include "base/mac/scoped_nsautorelease_pool.h" 25 #endif 26 27 // Lifecycle of struct event 28 // Libevent uses two main data structures: 29 // struct event_base (of which there is one per message pump), and 30 // struct event (of which there is roughly one per socket). 31 // The socket's struct event is created in 32 // MessagePumpLibevent::WatchFileDescriptor(), 33 // is owned by the FileDescriptorWatcher, and is destroyed in 34 // StopWatchingFileDescriptor(). 35 // It is moved into and out of lists in struct event_base by 36 // the libevent functions event_add() and event_del(). 37 // 38 // TODO(dkegel): 39 // At the moment bad things happen if a FileDescriptorWatcher 40 // is active after its MessagePumpLibevent has been destroyed. 41 // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop 42 // Not clear yet whether that situation occurs in practice, 43 // but if it does, we need to fix it. 44 45 namespace base { 46 47 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher() 48 : event_(NULL), 49 pump_(NULL), 50 watcher_(NULL), 51 was_destroyed_(NULL) { 52 } 53 54 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { 55 if (event_) { 56 StopWatchingFileDescriptor(); 57 } 58 if (was_destroyed_) { 59 DCHECK(!*was_destroyed_); 60 *was_destroyed_ = true; 61 } 62 } 63 64 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { 65 event* e = ReleaseEvent(); 66 if (e == NULL) 67 return true; 68 69 // event_del() is a no-op if the event isn't active. 70 int rv = event_del(e); 71 delete e; 72 pump_ = NULL; 73 watcher_ = NULL; 74 return (rv == 0); 75 } 76 77 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e) { 78 DCHECK(e); 79 DCHECK(!event_); 80 81 event_ = e; 82 } 83 84 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() { 85 struct event *e = event_; 86 event_ = NULL; 87 return e; 88 } 89 90 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking( 91 int fd, 92 MessagePumpLibevent*) { 93 // Since OnFileCanWriteWithoutBlocking() gets called first, it can stop 94 // watching the file descriptor. 95 if (!watcher_) 96 return; 97 watcher_->OnFileCanReadWithoutBlocking(fd); 98 } 99 100 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking( 101 int fd, 102 MessagePumpLibevent*) { 103 DCHECK(watcher_); 104 watcher_->OnFileCanWriteWithoutBlocking(fd); 105 } 106 107 MessagePumpLibevent::MessagePumpLibevent() 108 : keep_running_(true), 109 in_run_(false), 110 processed_io_events_(false), 111 event_base_(event_base_new()), 112 wakeup_pipe_in_(-1), 113 wakeup_pipe_out_(-1) { 114 if (!Init()) 115 NOTREACHED(); 116 } 117 118 MessagePumpLibevent::~MessagePumpLibevent() { 119 DCHECK(wakeup_event_); 120 DCHECK(event_base_); 121 event_del(wakeup_event_); 122 delete wakeup_event_; 123 if (wakeup_pipe_in_ >= 0) { 124 if (IGNORE_EINTR(close(wakeup_pipe_in_)) < 0) 125 DPLOG(ERROR) << "close"; 126 } 127 if (wakeup_pipe_out_ >= 0) { 128 if (IGNORE_EINTR(close(wakeup_pipe_out_)) < 0) 129 DPLOG(ERROR) << "close"; 130 } 131 event_base_free(event_base_); 132 } 133 134 bool MessagePumpLibevent::WatchFileDescriptor(int fd, 135 bool persistent, 136 int mode, 137 FileDescriptorWatcher *controller, 138 Watcher *delegate) { 139 DCHECK_GE(fd, 0); 140 DCHECK(controller); 141 DCHECK(delegate); 142 DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE); 143 // WatchFileDescriptor should be called on the pump thread. It is not 144 // threadsafe, and your watcher may never be registered. 145 DCHECK(watch_file_descriptor_caller_checker_.CalledOnValidThread()); 146 147 int event_mask = persistent ? EV_PERSIST : 0; 148 if (mode & WATCH_READ) { 149 event_mask |= EV_READ; 150 } 151 if (mode & WATCH_WRITE) { 152 event_mask |= EV_WRITE; 153 } 154 155 std::unique_ptr<event> evt(controller->ReleaseEvent()); 156 if (evt.get() == NULL) { 157 // Ownership is transferred to the controller. 158 evt.reset(new event); 159 } else { 160 // Make sure we don't pick up any funky internal libevent masks. 161 int old_interest_mask = evt.get()->ev_events & 162 (EV_READ | EV_WRITE | EV_PERSIST); 163 164 // Combine old/new event masks. 165 event_mask |= old_interest_mask; 166 167 // Must disarm the event before we can reuse it. 168 event_del(evt.get()); 169 170 // It's illegal to use this function to listen on 2 separate fds with the 171 // same |controller|. 172 if (EVENT_FD(evt.get()) != fd) { 173 NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd; 174 return false; 175 } 176 } 177 178 // Set current interest mask and message pump for this event. 179 event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller); 180 181 // Tell libevent which message pump this socket will belong to when we add it. 182 if (event_base_set(event_base_, evt.get())) { 183 return false; 184 } 185 186 // Add this socket to the list of monitored sockets. 187 if (event_add(evt.get(), NULL)) { 188 return false; 189 } 190 191 // Transfer ownership of evt to controller. 192 controller->Init(evt.release()); 193 194 controller->set_watcher(delegate); 195 controller->set_pump(this); 196 197 return true; 198 } 199 200 // Tell libevent to break out of inner loop. 201 static void timer_callback(int /*fd*/, short /*events*/, void* context) { 202 event_base_loopbreak((struct event_base *)context); 203 } 204 205 // Reentrant! 206 void MessagePumpLibevent::Run(Delegate* delegate) { 207 AutoReset<bool> auto_reset_keep_running(&keep_running_, true); 208 AutoReset<bool> auto_reset_in_run(&in_run_, true); 209 210 // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641. 211 // Instead, make our own timer and reuse it on each call to event_base_loop(). 212 std::unique_ptr<event> timer_event(new event); 213 214 for (;;) { 215 #if defined(OS_MACOSX) 216 mac::ScopedNSAutoreleasePool autorelease_pool; 217 #endif 218 219 bool did_work = delegate->DoWork(); 220 if (!keep_running_) 221 break; 222 223 event_base_loop(event_base_, EVLOOP_NONBLOCK); 224 did_work |= processed_io_events_; 225 processed_io_events_ = false; 226 if (!keep_running_) 227 break; 228 229 did_work |= delegate->DoDelayedWork(&delayed_work_time_); 230 if (!keep_running_) 231 break; 232 233 if (did_work) 234 continue; 235 236 did_work = delegate->DoIdleWork(); 237 if (!keep_running_) 238 break; 239 240 if (did_work) 241 continue; 242 243 // EVLOOP_ONCE tells libevent to only block once, 244 // but to service all pending events when it wakes up. 245 if (delayed_work_time_.is_null()) { 246 event_base_loop(event_base_, EVLOOP_ONCE); 247 } else { 248 TimeDelta delay = delayed_work_time_ - TimeTicks::Now(); 249 if (delay > TimeDelta()) { 250 struct timeval poll_tv; 251 poll_tv.tv_sec = delay.InSeconds(); 252 poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond; 253 event_set(timer_event.get(), -1, 0, timer_callback, event_base_); 254 event_base_set(event_base_, timer_event.get()); 255 event_add(timer_event.get(), &poll_tv); 256 event_base_loop(event_base_, EVLOOP_ONCE); 257 event_del(timer_event.get()); 258 } else { 259 // It looks like delayed_work_time_ indicates a time in the past, so we 260 // need to call DoDelayedWork now. 261 delayed_work_time_ = TimeTicks(); 262 } 263 } 264 265 if (!keep_running_) 266 break; 267 } 268 } 269 270 void MessagePumpLibevent::Quit() { 271 DCHECK(in_run_) << "Quit was called outside of Run!"; 272 // Tell both libevent and Run that they should break out of their loops. 273 keep_running_ = false; 274 ScheduleWork(); 275 } 276 277 void MessagePumpLibevent::ScheduleWork() { 278 // Tell libevent (in a threadsafe way) that it should break out of its loop. 279 char buf = 0; 280 int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1)); 281 DCHECK(nwrite == 1 || errno == EAGAIN) 282 << "[nwrite:" << nwrite << "] [errno:" << errno << "]"; 283 } 284 285 void MessagePumpLibevent::ScheduleDelayedWork( 286 const TimeTicks& delayed_work_time) { 287 // We know that we can't be blocked on Wait right now since this method can 288 // only be called on the same thread as Run, so we only need to update our 289 // record of how long to sleep when we do sleep. 290 delayed_work_time_ = delayed_work_time; 291 } 292 293 bool MessagePumpLibevent::Init() { 294 int fds[2]; 295 if (pipe(fds)) { 296 DLOG(ERROR) << "pipe() failed, errno: " << errno; 297 return false; 298 } 299 if (!SetNonBlocking(fds[0])) { 300 DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno; 301 return false; 302 } 303 if (!SetNonBlocking(fds[1])) { 304 DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno; 305 return false; 306 } 307 wakeup_pipe_out_ = fds[0]; 308 wakeup_pipe_in_ = fds[1]; 309 310 wakeup_event_ = new event; 311 event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, 312 OnWakeup, this); 313 event_base_set(event_base_, wakeup_event_); 314 315 if (event_add(wakeup_event_, 0)) 316 return false; 317 return true; 318 } 319 320 // static 321 void MessagePumpLibevent::OnLibeventNotification(int fd, 322 short flags, 323 void* context) { 324 FileDescriptorWatcher* controller = 325 static_cast<FileDescriptorWatcher*>(context); 326 DCHECK(controller); 327 TRACE_EVENT1("toplevel", "MessagePumpLibevent::OnLibeventNotification", 328 "fd", fd); 329 330 MessagePumpLibevent* pump = controller->pump(); 331 pump->processed_io_events_ = true; 332 333 if ((flags & (EV_READ | EV_WRITE)) == (EV_READ | EV_WRITE)) { 334 // Both callbacks will be called. It is necessary to check that |controller| 335 // is not destroyed. 336 bool controller_was_destroyed = false; 337 controller->was_destroyed_ = &controller_was_destroyed; 338 controller->OnFileCanWriteWithoutBlocking(fd, pump); 339 if (!controller_was_destroyed) 340 controller->OnFileCanReadWithoutBlocking(fd, pump); 341 if (!controller_was_destroyed) 342 controller->was_destroyed_ = nullptr; 343 } else if (flags & EV_WRITE) { 344 controller->OnFileCanWriteWithoutBlocking(fd, pump); 345 } else if (flags & EV_READ) { 346 controller->OnFileCanReadWithoutBlocking(fd, pump); 347 } 348 } 349 350 // Called if a byte is received on the wakeup pipe. 351 // static 352 void MessagePumpLibevent::OnWakeup(int socket, short /*flags*/, void* context) { 353 MessagePumpLibevent* that = static_cast<MessagePumpLibevent*>(context); 354 DCHECK(that->wakeup_pipe_out_ == socket); 355 356 // Remove and discard the wakeup byte. 357 char buf; 358 int nread = HANDLE_EINTR(read(socket, &buf, 1)); 359 DCHECK_EQ(nread, 1); 360 that->processed_io_events_ = true; 361 // Tell libevent to break out of inner loop. 362 event_base_loopbreak(that->event_base_); 363 } 364 365 } // namespace base 366