1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/common/handle_watcher.h" 6 7 #include <map> 8 9 #include "base/atomic_sequence_num.h" 10 #include "base/bind.h" 11 #include "base/lazy_instance.h" 12 #include "base/logging.h" 13 #include "base/macros.h" 14 #include "base/memory/singleton.h" 15 #include "base/memory/weak_ptr.h" 16 #include "base/message_loop/message_loop.h" 17 #include "base/message_loop/message_loop_proxy.h" 18 #include "base/synchronization/lock.h" 19 #include "base/synchronization/waitable_event.h" 20 #include "base/threading/thread.h" 21 #include "base/threading/thread_restrictions.h" 22 #include "base/time/time.h" 23 #include "mojo/common/message_pump_mojo.h" 24 #include "mojo/common/message_pump_mojo_handler.h" 25 #include "mojo/common/time_helper.h" 26 27 namespace mojo { 28 namespace common { 29 30 typedef int WatcherID; 31 32 namespace { 33 34 const char kWatcherThreadName[] = "handle-watcher-thread"; 35 36 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) { 37 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : 38 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline); 39 } 40 41 // Tracks the data for a single call to Start(). 42 struct WatchData { 43 WatchData() 44 : id(0), 45 handle_signals(MOJO_HANDLE_SIGNAL_NONE), 46 message_loop(NULL) {} 47 48 WatcherID id; 49 Handle handle; 50 MojoHandleSignals handle_signals; 51 base::TimeTicks deadline; 52 base::Callback<void(MojoResult)> callback; 53 scoped_refptr<base::MessageLoopProxy> message_loop; 54 }; 55 56 // WatcherBackend -------------------------------------------------------------- 57 58 // WatcherBackend is responsible for managing the requests and interacting with 59 // MessagePumpMojo. All access (outside of creation/destruction) is done on the 60 // thread WatcherThreadManager creates. 61 class WatcherBackend : public MessagePumpMojoHandler { 62 public: 63 WatcherBackend(); 64 virtual ~WatcherBackend(); 65 66 void StartWatching(const WatchData& data); 67 68 // Cancels a previously scheduled request to start a watch. 69 void StopWatching(WatcherID watcher_id); 70 71 private: 72 typedef std::map<Handle, WatchData> HandleToWatchDataMap; 73 74 // Invoked when a handle needs to be removed and notified. 75 void RemoveAndNotify(const Handle& handle, MojoResult result); 76 77 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found 78 // and sets |handle| to the Handle. Returns false if not a known id. 79 bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const; 80 81 // MessagePumpMojoHandler overrides: 82 virtual void OnHandleReady(const Handle& handle) OVERRIDE; 83 virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE; 84 85 // Maps from assigned id to WatchData. 86 HandleToWatchDataMap handle_to_data_; 87 88 DISALLOW_COPY_AND_ASSIGN(WatcherBackend); 89 }; 90 91 WatcherBackend::WatcherBackend() { 92 } 93 94 WatcherBackend::~WatcherBackend() { 95 } 96 97 void WatcherBackend::StartWatching(const WatchData& data) { 98 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); 99 100 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); 101 102 handle_to_data_[data.handle] = data; 103 MessagePumpMojo::current()->AddHandler(this, data.handle, 104 data.handle_signals, 105 data.deadline); 106 } 107 108 void WatcherBackend::StopWatching(WatcherID watcher_id) { 109 // Because of the thread hop it is entirely possible to get here and not 110 // have a valid handle registered for |watcher_id|. 111 Handle handle; 112 if (GetMojoHandleByWatcherID(watcher_id, &handle)) { 113 handle_to_data_.erase(handle); 114 MessagePumpMojo::current()->RemoveHandler(handle); 115 } 116 } 117 118 void WatcherBackend::RemoveAndNotify(const Handle& handle, 119 MojoResult result) { 120 if (handle_to_data_.count(handle) == 0) 121 return; 122 123 const WatchData data(handle_to_data_[handle]); 124 handle_to_data_.erase(handle); 125 MessagePumpMojo::current()->RemoveHandler(handle); 126 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result)); 127 } 128 129 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, 130 Handle* handle) const { 131 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); 132 i != handle_to_data_.end(); ++i) { 133 if (i->second.id == watcher_id) { 134 *handle = i->second.handle; 135 return true; 136 } 137 } 138 return false; 139 } 140 141 void WatcherBackend::OnHandleReady(const Handle& handle) { 142 RemoveAndNotify(handle, MOJO_RESULT_OK); 143 } 144 145 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) { 146 RemoveAndNotify(handle, result); 147 } 148 149 // WatcherThreadManager -------------------------------------------------------- 150 151 // WatcherThreadManager manages the background thread that listens for handles 152 // to be ready. All requests are handled by WatcherBackend. 153 } // namespace 154 155 class WatcherThreadManager { 156 public: 157 ~WatcherThreadManager(); 158 159 // Returns the shared instance. 160 static WatcherThreadManager* GetInstance(); 161 162 // Starts watching the requested handle. Returns a unique ID that is used to 163 // stop watching the handle. When the handle is ready |callback| is notified 164 // on the thread StartWatching() was invoked on. 165 // This may be invoked on any thread. 166 WatcherID StartWatching(const Handle& handle, 167 MojoHandleSignals handle_signals, 168 base::TimeTicks deadline, 169 const base::Callback<void(MojoResult)>& callback); 170 171 // Stops watching a handle. 172 // This may be invoked on any thread. 173 void StopWatching(WatcherID watcher_id); 174 175 private: 176 enum RequestType { 177 REQUEST_START, 178 REQUEST_STOP, 179 }; 180 181 // See description of |requests_| for details. 182 struct RequestData { 183 RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {} 184 185 RequestType type; 186 WatchData start_data; 187 WatcherID stop_id; 188 base::WaitableEvent* stop_event; 189 }; 190 191 typedef std::vector<RequestData> Requests; 192 193 friend struct DefaultSingletonTraits<WatcherThreadManager>; 194 195 WatcherThreadManager(); 196 197 // Schedules a request on the background thread. See |requests_| for details. 198 void AddRequest(const RequestData& data); 199 200 // Processes requests added to |requests_|. This is invoked on the backend 201 // thread. 202 void ProcessRequestsOnBackendThread(); 203 204 base::Thread thread_; 205 206 base::AtomicSequenceNumber watcher_id_generator_; 207 208 WatcherBackend backend_; 209 210 // Protects |requests_|. 211 base::Lock lock_; 212 213 // Start/Stop result in adding a RequestData to |requests_| (protected by 214 // |lock_|). When the background thread wakes up it processes the requests. 215 Requests requests_; 216 217 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); 218 }; 219 220 WatcherThreadManager::~WatcherThreadManager() { 221 thread_.Stop(); 222 } 223 224 WatcherThreadManager* WatcherThreadManager::GetInstance() { 225 return Singleton<WatcherThreadManager>::get(); 226 } 227 228 WatcherID WatcherThreadManager::StartWatching( 229 const Handle& handle, 230 MojoHandleSignals handle_signals, 231 base::TimeTicks deadline, 232 const base::Callback<void(MojoResult)>& callback) { 233 RequestData request_data; 234 request_data.type = REQUEST_START; 235 request_data.start_data.id = watcher_id_generator_.GetNext(); 236 request_data.start_data.handle = handle; 237 request_data.start_data.callback = callback; 238 request_data.start_data.handle_signals = handle_signals; 239 request_data.start_data.deadline = deadline; 240 request_data.start_data.message_loop = base::MessageLoopProxy::current(); 241 DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL), 242 request_data.start_data.message_loop.get()); 243 AddRequest(request_data); 244 return request_data.start_data.id; 245 } 246 247 void WatcherThreadManager::StopWatching(WatcherID watcher_id) { 248 // Handle the case of StartWatching() followed by StopWatching() before 249 // |thread_| woke up. 250 { 251 base::AutoLock auto_lock(lock_); 252 for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) { 253 if (i->type == REQUEST_START && i->start_data.id == watcher_id) { 254 // Watcher ids are not reused, so if we find it we can stop. 255 requests_.erase(i); 256 return; 257 } 258 } 259 } 260 261 base::ThreadRestrictions::ScopedAllowWait allow_wait; 262 base::WaitableEvent event(true, false); 263 RequestData request_data; 264 request_data.type = REQUEST_STOP; 265 request_data.stop_id = watcher_id; 266 request_data.stop_event = &event; 267 AddRequest(request_data); 268 269 // We need to block until the handle is actually removed. 270 event.Wait(); 271 } 272 273 void WatcherThreadManager::AddRequest(const RequestData& data) { 274 { 275 base::AutoLock auto_lock(lock_); 276 const bool was_empty = requests_.empty(); 277 requests_.push_back(data); 278 if (!was_empty) 279 return; 280 } 281 // We own |thread_|, so it's safe to use Unretained() here. 282 thread_.message_loop()->PostTask( 283 FROM_HERE, 284 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread, 285 base::Unretained(this))); 286 } 287 288 void WatcherThreadManager::ProcessRequestsOnBackendThread() { 289 DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current()); 290 291 Requests requests; 292 { 293 base::AutoLock auto_lock(lock_); 294 requests_.swap(requests); 295 } 296 for (size_t i = 0; i < requests.size(); ++i) { 297 if (requests[i].type == REQUEST_START) { 298 backend_.StartWatching(requests[i].start_data); 299 } else { 300 backend_.StopWatching(requests[i].stop_id); 301 requests[i].stop_event->Signal(); 302 } 303 } 304 } 305 306 WatcherThreadManager::WatcherThreadManager() 307 : thread_(kWatcherThreadName) { 308 base::Thread::Options thread_options; 309 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create); 310 thread_.StartWithOptions(thread_options); 311 } 312 313 // HandleWatcher::StateBase and subclasses ------------------------------------- 314 315 // The base class of HandleWatcher's state. Owns the user's callback and 316 // monitors the current thread's MessageLoop to know when to force the callback 317 // to run (with an error) even though the pipe hasn't been signaled yet. 318 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver { 319 public: 320 StateBase(HandleWatcher* watcher, 321 const base::Callback<void(MojoResult)>& callback) 322 : watcher_(watcher), 323 callback_(callback), 324 got_ready_(false) { 325 base::MessageLoop::current()->AddDestructionObserver(this); 326 } 327 328 virtual ~StateBase() { 329 base::MessageLoop::current()->RemoveDestructionObserver(this); 330 } 331 332 protected: 333 void NotifyHandleReady(MojoResult result) { 334 got_ready_ = true; 335 NotifyAndDestroy(result); 336 } 337 338 bool got_ready() const { return got_ready_; } 339 340 private: 341 virtual void WillDestroyCurrentMessageLoop() OVERRIDE { 342 // The current thread is exiting. Simulate a watch error. 343 NotifyAndDestroy(MOJO_RESULT_ABORTED); 344 } 345 346 void NotifyAndDestroy(MojoResult result) { 347 base::Callback<void(MojoResult)> callback = callback_; 348 watcher_->Stop(); // Destroys |this|. 349 350 callback.Run(result); 351 } 352 353 HandleWatcher* watcher_; 354 base::Callback<void(MojoResult)> callback_; 355 356 // Have we been notified that the handle is ready? 357 bool got_ready_; 358 359 DISALLOW_COPY_AND_ASSIGN(StateBase); 360 }; 361 362 // If the thread on which HandleWatcher is used runs MessagePumpMojo, 363 // SameThreadWatchingState is used to directly watch the handle on the same 364 // thread. 365 class HandleWatcher::SameThreadWatchingState : public StateBase, 366 public MessagePumpMojoHandler { 367 public: 368 SameThreadWatchingState(HandleWatcher* watcher, 369 const Handle& handle, 370 MojoHandleSignals handle_signals, 371 MojoDeadline deadline, 372 const base::Callback<void(MojoResult)>& callback) 373 : StateBase(watcher, callback), 374 handle_(handle) { 375 DCHECK(MessagePumpMojo::IsCurrent()); 376 377 MessagePumpMojo::current()->AddHandler( 378 this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline)); 379 } 380 381 virtual ~SameThreadWatchingState() { 382 if (!got_ready()) 383 MessagePumpMojo::current()->RemoveHandler(handle_); 384 } 385 386 private: 387 // MessagePumpMojoHandler overrides: 388 virtual void OnHandleReady(const Handle& handle) OVERRIDE { 389 StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK); 390 } 391 392 virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE { 393 StopWatchingAndNotifyReady(handle, result); 394 } 395 396 void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) { 397 DCHECK_EQ(handle.value(), handle_.value()); 398 MessagePumpMojo::current()->RemoveHandler(handle_); 399 NotifyHandleReady(result); 400 } 401 402 Handle handle_; 403 404 DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState); 405 }; 406 407 // If the thread on which HandleWatcher is used runs a message pump different 408 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the 409 // handle on the handle watcher thread. 410 class HandleWatcher::SecondaryThreadWatchingState : public StateBase { 411 public: 412 SecondaryThreadWatchingState(HandleWatcher* watcher, 413 const Handle& handle, 414 MojoHandleSignals handle_signals, 415 MojoDeadline deadline, 416 const base::Callback<void(MojoResult)>& callback) 417 : StateBase(watcher, callback), 418 weak_factory_(this) { 419 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching( 420 handle, 421 handle_signals, 422 MojoDeadlineToTimeTicks(deadline), 423 base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady, 424 weak_factory_.GetWeakPtr())); 425 } 426 427 virtual ~SecondaryThreadWatchingState() { 428 // If we've been notified the handle is ready (|got_ready()| is true) then 429 // the watch has been implicitly removed by 430 // WatcherThreadManager/MessagePumpMojo and we don't have to call 431 // StopWatching(). To do so would needlessly entail posting a task and 432 // blocking until the background thread services it. 433 if (!got_ready()) 434 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); 435 } 436 437 private: 438 WatcherID watcher_id_; 439 440 // Used to weakly bind |this| to the WatcherThreadManager. 441 base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_; 442 443 DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState); 444 }; 445 446 // HandleWatcher --------------------------------------------------------------- 447 448 HandleWatcher::HandleWatcher() { 449 } 450 451 HandleWatcher::~HandleWatcher() { 452 } 453 454 void HandleWatcher::Start(const Handle& handle, 455 MojoHandleSignals handle_signals, 456 MojoDeadline deadline, 457 const base::Callback<void(MojoResult)>& callback) { 458 DCHECK(handle.is_valid()); 459 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); 460 461 if (MessagePumpMojo::IsCurrent()) { 462 state_.reset(new SameThreadWatchingState( 463 this, handle, handle_signals, deadline, callback)); 464 } else { 465 state_.reset(new SecondaryThreadWatchingState( 466 this, handle, handle_signals, deadline, callback)); 467 } 468 } 469 470 void HandleWatcher::Stop() { 471 state_.reset(); 472 } 473 474 } // namespace common 475 } // namespace mojo 476