1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/message_pump/handle_watcher.h" 6 7 #include <stddef.h> 8 #include <stdint.h> 9 10 #include <map> 11 12 #include "base/atomic_sequence_num.h" 13 #include "base/bind.h" 14 #include "base/lazy_instance.h" 15 #include "base/logging.h" 16 #include "base/macros.h" 17 #include "base/memory/singleton.h" 18 #include "base/memory/weak_ptr.h" 19 #include "base/message_loop/message_loop.h" 20 #include "base/single_thread_task_runner.h" 21 #include "base/synchronization/lock.h" 22 #include "base/threading/thread.h" 23 #include "base/threading/thread_task_runner_handle.h" 24 #include "base/time/time.h" 25 #include "mojo/message_pump/message_pump_mojo.h" 26 #include "mojo/message_pump/message_pump_mojo_handler.h" 27 #include "mojo/message_pump/time_helper.h" 28 #include "mojo/public/c/system/message_pipe.h" 29 30 namespace mojo { 31 namespace common { 32 33 typedef int WatcherID; 34 35 namespace { 36 37 const char kWatcherThreadName[] = "handle-watcher-thread"; 38 39 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) { 40 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : 41 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline); 42 } 43 44 // Tracks the data for a single call to Start(). 45 struct WatchData { 46 WatchData() 47 : id(0), handle_signals(MOJO_HANDLE_SIGNAL_NONE), task_runner(NULL) {} 48 49 WatcherID id; 50 Handle handle; 51 MojoHandleSignals handle_signals; 52 base::TimeTicks deadline; 53 base::Callback<void(MojoResult)> callback; 54 scoped_refptr<base::SingleThreadTaskRunner> task_runner; 55 }; 56 57 // WatcherBackend -------------------------------------------------------------- 58 59 // WatcherBackend is responsible for managing the requests and interacting with 60 // MessagePumpMojo. All access (outside of creation/destruction) is done on the 61 // thread WatcherThreadManager creates. 62 class WatcherBackend : public MessagePumpMojoHandler { 63 public: 64 WatcherBackend(); 65 ~WatcherBackend() override; 66 67 void StartWatching(const WatchData& data); 68 void StopWatching(WatcherID watcher_id); 69 70 private: 71 typedef std::map<Handle, WatchData> HandleToWatchDataMap; 72 73 // Invoked when a handle needs to be removed and notified. 74 void RemoveAndNotify(const Handle& handle, MojoResult result); 75 76 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found 77 // and sets |handle| to the Handle. Returns false if not a known id. 78 bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const; 79 80 // MessagePumpMojoHandler overrides: 81 void OnHandleReady(const Handle& handle) override; 82 void OnHandleError(const Handle& handle, MojoResult result) override; 83 84 // Maps from assigned id to WatchData. 85 HandleToWatchDataMap handle_to_data_; 86 87 DISALLOW_COPY_AND_ASSIGN(WatcherBackend); 88 }; 89 90 WatcherBackend::WatcherBackend() { 91 } 92 93 WatcherBackend::~WatcherBackend() { 94 } 95 96 void WatcherBackend::StartWatching(const WatchData& data) { 97 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); 98 99 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); 100 101 handle_to_data_[data.handle] = data; 102 MessagePumpMojo::current()->AddHandler(this, data.handle, 103 data.handle_signals, 104 data.deadline); 105 } 106 107 void WatcherBackend::StopWatching(WatcherID watcher_id) { 108 // Because of the thread hop it is entirely possible to get here and not 109 // have a valid handle registered for |watcher_id|. 110 Handle handle; 111 if (!GetMojoHandleByWatcherID(watcher_id, &handle)) 112 return; 113 114 handle_to_data_.erase(handle); 115 MessagePumpMojo::current()->RemoveHandler(handle); 116 } 117 118 void WatcherBackend::RemoveAndNotify(const Handle& handle, 119 MojoResult result) { 120 if (handle_to_data_.count(handle) == 0) 121 return; 122 123 const WatchData data(handle_to_data_[handle]); 124 handle_to_data_.erase(handle); 125 MessagePumpMojo::current()->RemoveHandler(handle); 126 127 data.task_runner->PostTask(FROM_HERE, base::Bind(data.callback, result)); 128 } 129 130 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, 131 Handle* handle) const { 132 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); 133 i != handle_to_data_.end(); ++i) { 134 if (i->second.id == watcher_id) { 135 *handle = i->second.handle; 136 return true; 137 } 138 } 139 return false; 140 } 141 142 void WatcherBackend::OnHandleReady(const Handle& handle) { 143 RemoveAndNotify(handle, MOJO_RESULT_OK); 144 } 145 146 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) { 147 RemoveAndNotify(handle, result); 148 } 149 150 // WatcherThreadManager -------------------------------------------------------- 151 152 // WatcherThreadManager manages the background thread that listens for handles 153 // to be ready. All requests are handled by WatcherBackend. 154 class WatcherThreadManager { 155 public: 156 ~WatcherThreadManager(); 157 158 // Returns the shared instance. 159 static WatcherThreadManager* GetInstance(); 160 161 // Starts watching the requested handle. Returns a unique ID that is used to 162 // stop watching the handle. When the handle is ready |callback| is notified 163 // on the thread StartWatching() was invoked on. 164 // This may be invoked on any thread. 165 WatcherID StartWatching(const Handle& handle, 166 MojoHandleSignals handle_signals, 167 base::TimeTicks deadline, 168 const base::Callback<void(MojoResult)>& callback); 169 170 // Stops watching a handle. 171 // This may be invoked on any thread. 172 void StopWatching(WatcherID watcher_id); 173 174 private: 175 enum RequestType { 176 REQUEST_START, 177 REQUEST_STOP, 178 }; 179 180 // See description of |requests_| for details. 181 struct RequestData { 182 RequestData() : type(REQUEST_START), stop_id(0) {} 183 184 RequestType type; 185 WatchData start_data; 186 WatcherID stop_id; 187 }; 188 189 typedef std::vector<RequestData> Requests; 190 191 friend struct base::DefaultSingletonTraits<WatcherThreadManager>; 192 193 WatcherThreadManager(); 194 195 // Schedules a request on the background thread. See |requests_| for details. 196 void AddRequest(const RequestData& data); 197 198 // Processes requests added to |requests_|. This is invoked on the backend 199 // thread. 200 void ProcessRequestsOnBackendThread(); 201 202 base::Thread thread_; 203 204 base::AtomicSequenceNumber watcher_id_generator_; 205 206 WatcherBackend backend_; 207 208 // Protects |requests_|. 209 base::Lock lock_; 210 211 // Start/Stop result in adding a RequestData to |requests_| (protected by 212 // |lock_|). When the background thread wakes up it processes the requests. 213 Requests requests_; 214 215 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); 216 }; 217 218 WatcherThreadManager::~WatcherThreadManager() { 219 thread_.Stop(); 220 } 221 222 WatcherThreadManager* WatcherThreadManager::GetInstance() { 223 return base::Singleton<WatcherThreadManager>::get(); 224 } 225 226 WatcherID WatcherThreadManager::StartWatching( 227 const Handle& handle, 228 MojoHandleSignals handle_signals, 229 base::TimeTicks deadline, 230 const base::Callback<void(MojoResult)>& callback) { 231 RequestData request_data; 232 request_data.type = REQUEST_START; 233 request_data.start_data.id = watcher_id_generator_.GetNext(); 234 request_data.start_data.handle = handle; 235 request_data.start_data.callback = callback; 236 request_data.start_data.handle_signals = handle_signals; 237 request_data.start_data.deadline = deadline; 238 request_data.start_data.task_runner = base::ThreadTaskRunnerHandle::Get(); 239 AddRequest(request_data); 240 return request_data.start_data.id; 241 } 242 243 void WatcherThreadManager::StopWatching(WatcherID watcher_id) { 244 // Handle the case of StartWatching() followed by StopWatching() before 245 // |thread_| woke up. 246 { 247 base::AutoLock auto_lock(lock_); 248 for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) { 249 if (i->type == REQUEST_START && i->start_data.id == watcher_id) { 250 // Watcher ids are not reused, so if we find it we can stop. 251 requests_.erase(i); 252 return; 253 } 254 } 255 } 256 257 RequestData request_data; 258 request_data.type = REQUEST_STOP; 259 request_data.stop_id = watcher_id; 260 AddRequest(request_data); 261 } 262 263 void WatcherThreadManager::AddRequest(const RequestData& data) { 264 { 265 base::AutoLock auto_lock(lock_); 266 const bool was_empty = requests_.empty(); 267 requests_.push_back(data); 268 if (!was_empty) 269 return; 270 } 271 272 // We outlive |thread_|, so it's safe to use Unretained() here. 273 thread_.task_runner()->PostTask( 274 FROM_HERE, 275 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread, 276 base::Unretained(this))); 277 } 278 279 void WatcherThreadManager::ProcessRequestsOnBackendThread() { 280 DCHECK(thread_.task_runner()->BelongsToCurrentThread()); 281 282 Requests requests; 283 { 284 base::AutoLock auto_lock(lock_); 285 requests_.swap(requests); 286 } 287 for (size_t i = 0; i < requests.size(); ++i) { 288 if (requests[i].type == REQUEST_START) { 289 backend_.StartWatching(requests[i].start_data); 290 } else { 291 backend_.StopWatching(requests[i].stop_id); 292 } 293 } 294 } 295 296 WatcherThreadManager::WatcherThreadManager() 297 : thread_(kWatcherThreadName) { 298 base::Thread::Options thread_options; 299 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create); 300 thread_.StartWithOptions(thread_options); 301 } 302 303 } // namespace 304 305 // HandleWatcher::StateBase and subclasses ------------------------------------- 306 307 // The base class of HandleWatcher's state. Owns the user's callback and 308 // monitors the current thread's MessageLoop to know when to force the callback 309 // to run (with an error) even though the pipe hasn't been signaled yet. 310 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver { 311 public: 312 StateBase(HandleWatcher* watcher, 313 const base::Callback<void(MojoResult)>& callback) 314 : watcher_(watcher), 315 callback_(callback), 316 got_ready_(false) { 317 base::MessageLoop::current()->AddDestructionObserver(this); 318 } 319 320 ~StateBase() override { 321 base::MessageLoop::current()->RemoveDestructionObserver(this); 322 } 323 324 protected: 325 void NotifyHandleReady(MojoResult result) { 326 got_ready_ = true; 327 NotifyAndDestroy(result); 328 } 329 330 bool got_ready() const { return got_ready_; } 331 332 private: 333 void WillDestroyCurrentMessageLoop() override { 334 // The current thread is exiting. Simulate a watch error. 335 NotifyAndDestroy(MOJO_RESULT_ABORTED); 336 } 337 338 void NotifyAndDestroy(MojoResult result) { 339 base::Callback<void(MojoResult)> callback = callback_; 340 watcher_->Stop(); // Destroys |this|. 341 342 callback.Run(result); 343 } 344 345 HandleWatcher* watcher_; 346 base::Callback<void(MojoResult)> callback_; 347 348 // Have we been notified that the handle is ready? 349 bool got_ready_; 350 351 DISALLOW_COPY_AND_ASSIGN(StateBase); 352 }; 353 354 // If the thread on which HandleWatcher is used runs MessagePumpMojo, 355 // SameThreadWatchingState is used to directly watch the handle on the same 356 // thread. 357 class HandleWatcher::SameThreadWatchingState : public StateBase, 358 public MessagePumpMojoHandler { 359 public: 360 SameThreadWatchingState(HandleWatcher* watcher, 361 const Handle& handle, 362 MojoHandleSignals handle_signals, 363 MojoDeadline deadline, 364 const base::Callback<void(MojoResult)>& callback) 365 : StateBase(watcher, callback), 366 handle_(handle) { 367 DCHECK(MessagePumpMojo::IsCurrent()); 368 369 MessagePumpMojo::current()->AddHandler( 370 this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline)); 371 } 372 373 ~SameThreadWatchingState() override { 374 if (!got_ready()) 375 MessagePumpMojo::current()->RemoveHandler(handle_); 376 } 377 378 private: 379 // MessagePumpMojoHandler overrides: 380 void OnHandleReady(const Handle& handle) override { 381 StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK); 382 } 383 384 void OnHandleError(const Handle& handle, MojoResult result) override { 385 StopWatchingAndNotifyReady(handle, result); 386 } 387 388 void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) { 389 DCHECK_EQ(handle.value(), handle_.value()); 390 MessagePumpMojo::current()->RemoveHandler(handle_); 391 NotifyHandleReady(result); 392 } 393 394 Handle handle_; 395 396 DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState); 397 }; 398 399 // If the thread on which HandleWatcher is used runs a message pump different 400 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the 401 // handle on the handle watcher thread. 402 class HandleWatcher::SecondaryThreadWatchingState : public StateBase { 403 public: 404 SecondaryThreadWatchingState(HandleWatcher* watcher, 405 const Handle& handle, 406 MojoHandleSignals handle_signals, 407 MojoDeadline deadline, 408 const base::Callback<void(MojoResult)>& callback) 409 : StateBase(watcher, callback), 410 weak_factory_(this) { 411 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching( 412 handle, 413 handle_signals, 414 MojoDeadlineToTimeTicks(deadline), 415 base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady, 416 weak_factory_.GetWeakPtr())); 417 } 418 419 ~SecondaryThreadWatchingState() override { 420 // If we've been notified the handle is ready (|got_ready()| is true) then 421 // the watch has been implicitly removed by 422 // WatcherThreadManager/MessagePumpMojo and we don't have to call 423 // StopWatching(). To do so would needlessly entail posting a task and 424 // blocking until the background thread services it. 425 if (!got_ready()) 426 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); 427 } 428 429 private: 430 WatcherID watcher_id_; 431 432 // Used to weakly bind |this| to the WatcherThreadManager. 433 base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_; 434 435 DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState); 436 }; 437 438 // HandleWatcher --------------------------------------------------------------- 439 440 HandleWatcher::HandleWatcher() { 441 } 442 443 HandleWatcher::~HandleWatcher() { 444 } 445 446 void HandleWatcher::Start(const Handle& handle, 447 MojoHandleSignals handle_signals, 448 MojoDeadline deadline, 449 const base::Callback<void(MojoResult)>& callback) { 450 DCHECK(handle.is_valid()); 451 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); 452 453 // Need to clear the state before creating a new one. 454 state_.reset(); 455 if (MessagePumpMojo::IsCurrent()) { 456 state_.reset(new SameThreadWatchingState( 457 this, handle, handle_signals, deadline, callback)); 458 } else { 459 #if !defined(OFFICIAL_BUILD) 460 // Just for making debugging non-transferable message pipes easier. Since 461 // they can't be sent after they're read/written/listened to, 462 // MessagePipeDispatcher saves the callstack of when it's "bound" to a 463 // pipe id. Triggering a read here, instead of later in the PostTask, means 464 // we have a callstack that is useful to check if the pipe is erronously 465 // attempted to be sent. 466 uint32_t temp = 0; 467 MojoReadMessage(handle.value(), nullptr, &temp, nullptr, nullptr, 468 MOJO_READ_MESSAGE_FLAG_NONE); 469 #endif 470 state_.reset(new SecondaryThreadWatchingState( 471 this, handle, handle_signals, deadline, callback)); 472 } 473 } 474 475 void HandleWatcher::Stop() { 476 state_.reset(); 477 } 478 479 } // namespace common 480 } // namespace mojo 481