1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/common/handle_watcher.h" 6 7 #include <map> 8 9 #include "base/atomic_sequence_num.h" 10 #include "base/bind.h" 11 #include "base/lazy_instance.h" 12 #include "base/memory/weak_ptr.h" 13 #include "base/message_loop/message_loop.h" 14 #include "base/message_loop/message_loop_proxy.h" 15 #include "base/threading/thread.h" 16 #include "base/time/tick_clock.h" 17 #include "base/time/time.h" 18 #include "mojo/common/message_pump_mojo.h" 19 #include "mojo/common/message_pump_mojo_handler.h" 20 21 namespace mojo { 22 namespace common { 23 24 typedef int WatcherID; 25 26 namespace { 27 28 const char kWatcherThreadName[] = "handle-watcher-thread"; 29 30 // TODO(sky): this should be unnecessary once MessageLoop has been refactored. 31 MessagePumpMojo* message_pump_mojo = NULL; 32 33 scoped_ptr<base::MessagePump> CreateMessagePumpMojo() { 34 message_pump_mojo = new MessagePumpMojo; 35 return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass(); 36 } 37 38 // Tracks the data for a single call to Start(). 39 struct WatchData { 40 WatchData() 41 : id(0), 42 wait_flags(MOJO_WAIT_FLAG_NONE), 43 message_loop(NULL) {} 44 45 WatcherID id; 46 Handle handle; 47 MojoWaitFlags wait_flags; 48 base::TimeTicks deadline; 49 base::Callback<void(MojoResult)> callback; 50 scoped_refptr<base::MessageLoopProxy> message_loop; 51 }; 52 53 // WatcherBackend -------------------------------------------------------------- 54 55 // WatcherBackend is responsible for managing the requests and interacting with 56 // MessagePumpMojo. All access (outside of creation/destruction) is done on the 57 // thread WatcherThreadManager creates. 58 class WatcherBackend : public MessagePumpMojoHandler { 59 public: 60 WatcherBackend(); 61 virtual ~WatcherBackend(); 62 63 void StartWatching(const WatchData& data); 64 void StopWatching(WatcherID watcher_id); 65 66 private: 67 typedef std::map<Handle, WatchData> HandleToWatchDataMap; 68 69 // Invoked when a handle needs to be removed and notified. 70 void RemoveAndNotify(const Handle& handle, MojoResult result); 71 72 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found 73 // and sets |handle| to the Handle. Returns false if not a known id. 74 bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const; 75 76 // MessagePumpMojoHandler overrides: 77 virtual void OnHandleReady(const Handle& handle) OVERRIDE; 78 virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE; 79 80 // Maps from assigned id to WatchData. 81 HandleToWatchDataMap handle_to_data_; 82 83 DISALLOW_COPY_AND_ASSIGN(WatcherBackend); 84 }; 85 86 WatcherBackend::WatcherBackend() { 87 } 88 89 WatcherBackend::~WatcherBackend() { 90 } 91 92 void WatcherBackend::StartWatching(const WatchData& data) { 93 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); 94 95 DCHECK_EQ(0u, handle_to_data_.count(data.handle)); 96 97 handle_to_data_[data.handle] = data; 98 message_pump_mojo->AddHandler(this, data.handle, 99 data.wait_flags, 100 data.deadline); 101 } 102 103 void WatcherBackend::StopWatching(WatcherID watcher_id) { 104 // Because of the thread hop it is entirely possible to get here and not 105 // have a valid handle registered for |watcher_id|. 106 Handle handle; 107 if (!GetMojoHandleByWatcherID(watcher_id, &handle)) 108 return; 109 110 handle_to_data_.erase(handle); 111 message_pump_mojo->RemoveHandler(handle); 112 } 113 114 void WatcherBackend::RemoveAndNotify(const Handle& handle, 115 MojoResult result) { 116 if (handle_to_data_.count(handle) == 0) 117 return; 118 119 const WatchData data(handle_to_data_[handle]); 120 handle_to_data_.erase(handle); 121 message_pump_mojo->RemoveHandler(handle); 122 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result)); 123 } 124 125 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, 126 Handle* handle) const { 127 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); 128 i != handle_to_data_.end(); ++i) { 129 if (i->second.id == watcher_id) { 130 *handle = i->second.handle; 131 return true; 132 } 133 } 134 return false; 135 } 136 137 void WatcherBackend::OnHandleReady(const Handle& handle) { 138 RemoveAndNotify(handle, MOJO_RESULT_OK); 139 } 140 141 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) { 142 RemoveAndNotify(handle, result); 143 } 144 145 // WatcherThreadManager -------------------------------------------------------- 146 147 // WatcherThreadManager manages the background thread that listens for handles 148 // to be ready. All requests are handled by WatcherBackend. 149 class WatcherThreadManager { 150 public: 151 // Returns the shared instance. 152 static WatcherThreadManager* GetInstance(); 153 154 // Starts watching the requested handle. Returns a unique ID that is used to 155 // stop watching the handle. When the handle is ready |callback| is notified 156 // on the thread StartWatching() was invoked on. 157 // This may be invoked on any thread. 158 WatcherID StartWatching(const Handle& handle, 159 MojoWaitFlags wait_flags, 160 base::TimeTicks deadline, 161 const base::Callback<void(MojoResult)>& callback); 162 163 // Stops watching a handle. 164 // This may be invoked on any thread. 165 void StopWatching(WatcherID watcher_id); 166 167 private: 168 friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>; 169 170 WatcherThreadManager(); 171 ~WatcherThreadManager(); 172 173 base::Thread thread_; 174 175 base::AtomicSequenceNumber watcher_id_generator_; 176 177 WatcherBackend backend_; 178 179 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); 180 }; 181 182 WatcherThreadManager* WatcherThreadManager::GetInstance() { 183 static base::LazyInstance<WatcherThreadManager> instance = 184 LAZY_INSTANCE_INITIALIZER; 185 return &instance.Get(); 186 } 187 188 WatcherID WatcherThreadManager::StartWatching( 189 const Handle& handle, 190 MojoWaitFlags wait_flags, 191 base::TimeTicks deadline, 192 const base::Callback<void(MojoResult)>& callback) { 193 WatchData data; 194 data.id = watcher_id_generator_.GetNext(); 195 data.handle = handle; 196 data.callback = callback; 197 data.wait_flags = wait_flags; 198 data.deadline = deadline; 199 data.message_loop = base::MessageLoopProxy::current(); 200 // We outlive |thread_|, so it's safe to use Unretained() here. 201 thread_.message_loop()->PostTask( 202 FROM_HERE, 203 base::Bind(&WatcherBackend::StartWatching, 204 base::Unretained(&backend_), 205 data)); 206 return data.id; 207 } 208 209 void WatcherThreadManager::StopWatching(WatcherID watcher_id) { 210 // We outlive |thread_|, so it's safe to use Unretained() here. 211 thread_.message_loop()->PostTask( 212 FROM_HERE, 213 base::Bind(&WatcherBackend::StopWatching, 214 base::Unretained(&backend_), 215 watcher_id)); 216 } 217 218 WatcherThreadManager::WatcherThreadManager() 219 : thread_(kWatcherThreadName) { 220 base::Thread::Options thread_options; 221 thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo); 222 thread_.StartWithOptions(thread_options); 223 } 224 225 WatcherThreadManager::~WatcherThreadManager() { 226 thread_.Stop(); 227 } 228 229 } // namespace 230 231 // HandleWatcher::StartState --------------------------------------------------- 232 233 // Contains the information passed to Start(). 234 struct HandleWatcher::StartState { 235 explicit StartState(HandleWatcher* watcher) : weak_factory(watcher) { 236 } 237 238 ~StartState() { 239 } 240 241 // ID assigned by WatcherThreadManager. 242 WatcherID watcher_id; 243 244 // Callback to notify when done. 245 base::Callback<void(MojoResult)> callback; 246 247 // When Start() is invoked a callback is passed to WatcherThreadManager 248 // using a WeakRef from |weak_refactory_|. The callback invokes 249 // OnHandleReady() (on the thread Start() is invoked from) which in turn 250 // notifies |callback_|. Doing this allows us to reset state when the handle 251 // is ready, and then notify the callback. Doing this also means Stop() 252 // cancels any pending callbacks that may be inflight. 253 base::WeakPtrFactory<HandleWatcher> weak_factory; 254 }; 255 256 // HandleWatcher --------------------------------------------------------------- 257 258 // static 259 base::TickClock* HandleWatcher::tick_clock_ = NULL; 260 261 HandleWatcher::HandleWatcher() { 262 } 263 264 HandleWatcher::~HandleWatcher() { 265 Stop(); 266 } 267 268 void HandleWatcher::Start(const Handle& handle, 269 MojoWaitFlags wait_flags, 270 MojoDeadline deadline, 271 const base::Callback<void(MojoResult)>& callback) { 272 DCHECK(handle.is_valid()); 273 DCHECK_NE(MOJO_WAIT_FLAG_NONE, wait_flags); 274 275 Stop(); 276 277 start_state_.reset(new StartState(this)); 278 start_state_->callback = callback; 279 start_state_->watcher_id = 280 WatcherThreadManager::GetInstance()->StartWatching( 281 handle, 282 wait_flags, 283 MojoDeadlineToTimeTicks(deadline), 284 base::Bind(&HandleWatcher::OnHandleReady, 285 start_state_->weak_factory.GetWeakPtr())); 286 } 287 288 void HandleWatcher::Stop() { 289 if (!start_state_.get()) 290 return; 291 292 scoped_ptr<StartState> old_state(start_state_.Pass()); 293 WatcherThreadManager::GetInstance()->StopWatching(old_state->watcher_id); 294 } 295 296 void HandleWatcher::OnHandleReady(MojoResult result) { 297 DCHECK(start_state_.get()); 298 scoped_ptr<StartState> old_state(start_state_.Pass()); 299 old_state->callback.Run(result); 300 301 // NOTE: We may have been deleted during callback execution. 302 } 303 304 // static 305 base::TimeTicks HandleWatcher::NowTicks() { 306 return tick_clock_ ? tick_clock_->NowTicks() : base::TimeTicks::Now(); 307 } 308 309 // static 310 base::TimeTicks HandleWatcher::MojoDeadlineToTimeTicks(MojoDeadline deadline) { 311 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : 312 NowTicks() + base::TimeDelta::FromMicroseconds(deadline); 313 } 314 315 } // namespace common 316 } // namespace mojo 317