Home | History | Annotate | Download | only in common
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/common/handle_watcher.h"
      6 
      7 #include <map>
      8 
      9 #include "base/atomic_sequence_num.h"
     10 #include "base/bind.h"
     11 #include "base/lazy_instance.h"
     12 #include "base/memory/weak_ptr.h"
     13 #include "base/message_loop/message_loop.h"
     14 #include "base/message_loop/message_loop_proxy.h"
     15 #include "base/threading/thread.h"
     16 #include "base/time/tick_clock.h"
     17 #include "base/time/time.h"
     18 #include "mojo/common/message_pump_mojo.h"
     19 #include "mojo/common/message_pump_mojo_handler.h"
     20 
     21 namespace mojo {
     22 namespace common {
     23 
     24 typedef int WatcherID;
     25 
     26 namespace {
     27 
     28 const char kWatcherThreadName[] = "handle-watcher-thread";
     29 
     30 // TODO(sky): this should be unnecessary once MessageLoop has been refactored.
     31 MessagePumpMojo* message_pump_mojo = NULL;
     32 
     33 scoped_ptr<base::MessagePump> CreateMessagePumpMojo() {
     34   message_pump_mojo = new MessagePumpMojo;
     35   return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass();
     36 }
     37 
     38 // Tracks the data for a single call to Start().
     39 struct WatchData {
     40   WatchData()
     41       : id(0),
     42         wait_flags(MOJO_WAIT_FLAG_NONE),
     43         message_loop(NULL) {}
     44 
     45   WatcherID id;
     46   Handle handle;
     47   MojoWaitFlags wait_flags;
     48   base::TimeTicks deadline;
     49   base::Callback<void(MojoResult)> callback;
     50   scoped_refptr<base::MessageLoopProxy> message_loop;
     51 };
     52 
     53 // WatcherBackend --------------------------------------------------------------
     54 
     55 // WatcherBackend is responsible for managing the requests and interacting with
     56 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
     57 // thread WatcherThreadManager creates.
     58 class WatcherBackend : public MessagePumpMojoHandler {
     59  public:
     60   WatcherBackend();
     61   virtual ~WatcherBackend();
     62 
     63   void StartWatching(const WatchData& data);
     64   void StopWatching(WatcherID watcher_id);
     65 
     66  private:
     67   typedef std::map<Handle, WatchData> HandleToWatchDataMap;
     68 
     69   // Invoked when a handle needs to be removed and notified.
     70   void RemoveAndNotify(const Handle& handle, MojoResult result);
     71 
     72   // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
     73   // and sets |handle| to the Handle. Returns false if not a known id.
     74   bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
     75 
     76   // MessagePumpMojoHandler overrides:
     77   virtual void OnHandleReady(const Handle& handle) OVERRIDE;
     78   virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE;
     79 
     80   // Maps from assigned id to WatchData.
     81   HandleToWatchDataMap handle_to_data_;
     82 
     83   DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
     84 };
     85 
     86 WatcherBackend::WatcherBackend() {
     87 }
     88 
     89 WatcherBackend::~WatcherBackend() {
     90 }
     91 
     92 void WatcherBackend::StartWatching(const WatchData& data) {
     93   RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
     94 
     95   DCHECK_EQ(0u, handle_to_data_.count(data.handle));
     96 
     97   handle_to_data_[data.handle] = data;
     98   message_pump_mojo->AddHandler(this, data.handle,
     99                                 data.wait_flags,
    100                                 data.deadline);
    101 }
    102 
    103 void WatcherBackend::StopWatching(WatcherID watcher_id) {
    104   // Because of the thread hop it is entirely possible to get here and not
    105   // have a valid handle registered for |watcher_id|.
    106   Handle handle;
    107   if (!GetMojoHandleByWatcherID(watcher_id, &handle))
    108     return;
    109 
    110   handle_to_data_.erase(handle);
    111   message_pump_mojo->RemoveHandler(handle);
    112 }
    113 
    114 void WatcherBackend::RemoveAndNotify(const Handle& handle,
    115                                      MojoResult result) {
    116   if (handle_to_data_.count(handle) == 0)
    117     return;
    118 
    119   const WatchData data(handle_to_data_[handle]);
    120   handle_to_data_.erase(handle);
    121   message_pump_mojo->RemoveHandler(handle);
    122   data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
    123 }
    124 
    125 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
    126                                               Handle* handle) const {
    127   for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
    128        i != handle_to_data_.end(); ++i) {
    129     if (i->second.id == watcher_id) {
    130       *handle = i->second.handle;
    131       return true;
    132     }
    133   }
    134   return false;
    135 }
    136 
    137 void WatcherBackend::OnHandleReady(const Handle& handle) {
    138   RemoveAndNotify(handle, MOJO_RESULT_OK);
    139 }
    140 
    141 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
    142   RemoveAndNotify(handle, result);
    143 }
    144 
    145 // WatcherThreadManager --------------------------------------------------------
    146 
    147 // WatcherThreadManager manages the background thread that listens for handles
    148 // to be ready. All requests are handled by WatcherBackend.
    149 class WatcherThreadManager {
    150  public:
    151   // Returns the shared instance.
    152   static WatcherThreadManager* GetInstance();
    153 
    154   // Starts watching the requested handle. Returns a unique ID that is used to
    155   // stop watching the handle. When the handle is ready |callback| is notified
    156   // on the thread StartWatching() was invoked on.
    157   // This may be invoked on any thread.
    158   WatcherID StartWatching(const Handle& handle,
    159                           MojoWaitFlags wait_flags,
    160                           base::TimeTicks deadline,
    161                           const base::Callback<void(MojoResult)>& callback);
    162 
    163   // Stops watching a handle.
    164   // This may be invoked on any thread.
    165   void StopWatching(WatcherID watcher_id);
    166 
    167  private:
    168   friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>;
    169 
    170   WatcherThreadManager();
    171   ~WatcherThreadManager();
    172 
    173   base::Thread thread_;
    174 
    175   base::AtomicSequenceNumber watcher_id_generator_;
    176 
    177   WatcherBackend backend_;
    178 
    179   DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
    180 };
    181 
    182 WatcherThreadManager* WatcherThreadManager::GetInstance() {
    183   static base::LazyInstance<WatcherThreadManager> instance =
    184       LAZY_INSTANCE_INITIALIZER;
    185   return &instance.Get();
    186 }
    187 
    188 WatcherID WatcherThreadManager::StartWatching(
    189     const Handle& handle,
    190     MojoWaitFlags wait_flags,
    191     base::TimeTicks deadline,
    192     const base::Callback<void(MojoResult)>& callback) {
    193   WatchData data;
    194   data.id = watcher_id_generator_.GetNext();
    195   data.handle = handle;
    196   data.callback = callback;
    197   data.wait_flags = wait_flags;
    198   data.deadline = deadline;
    199   data.message_loop = base::MessageLoopProxy::current();
    200   // We outlive |thread_|, so it's safe to use Unretained() here.
    201   thread_.message_loop()->PostTask(
    202       FROM_HERE,
    203       base::Bind(&WatcherBackend::StartWatching,
    204                  base::Unretained(&backend_),
    205                  data));
    206   return data.id;
    207 }
    208 
    209 void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
    210   // We outlive |thread_|, so it's safe to use Unretained() here.
    211   thread_.message_loop()->PostTask(
    212       FROM_HERE,
    213       base::Bind(&WatcherBackend::StopWatching,
    214                  base::Unretained(&backend_),
    215                  watcher_id));
    216 }
    217 
    218 WatcherThreadManager::WatcherThreadManager()
    219     : thread_(kWatcherThreadName) {
    220   base::Thread::Options thread_options;
    221   thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo);
    222   thread_.StartWithOptions(thread_options);
    223 }
    224 
    225 WatcherThreadManager::~WatcherThreadManager() {
    226   thread_.Stop();
    227 }
    228 
    229 }  // namespace
    230 
    231 // HandleWatcher::StartState ---------------------------------------------------
    232 
    233 // Contains the information passed to Start().
    234 struct HandleWatcher::StartState {
    235   explicit StartState(HandleWatcher* watcher) : weak_factory(watcher) {
    236   }
    237 
    238   ~StartState() {
    239   }
    240 
    241   // ID assigned by WatcherThreadManager.
    242   WatcherID watcher_id;
    243 
    244   // Callback to notify when done.
    245   base::Callback<void(MojoResult)> callback;
    246 
    247   // When Start() is invoked a callback is passed to WatcherThreadManager
    248   // using a WeakRef from |weak_refactory_|. The callback invokes
    249   // OnHandleReady() (on the thread Start() is invoked from) which in turn
    250   // notifies |callback_|. Doing this allows us to reset state when the handle
    251   // is ready, and then notify the callback. Doing this also means Stop()
    252   // cancels any pending callbacks that may be inflight.
    253   base::WeakPtrFactory<HandleWatcher> weak_factory;
    254 };
    255 
    256 // HandleWatcher ---------------------------------------------------------------
    257 
    258 // static
    259 base::TickClock* HandleWatcher::tick_clock_ = NULL;
    260 
    261 HandleWatcher::HandleWatcher() {
    262 }
    263 
    264 HandleWatcher::~HandleWatcher() {
    265   Stop();
    266 }
    267 
    268 void HandleWatcher::Start(const Handle& handle,
    269                           MojoWaitFlags wait_flags,
    270                           MojoDeadline deadline,
    271                           const base::Callback<void(MojoResult)>& callback) {
    272   DCHECK(handle.is_valid());
    273   DCHECK_NE(MOJO_WAIT_FLAG_NONE, wait_flags);
    274 
    275   Stop();
    276 
    277   start_state_.reset(new StartState(this));
    278   start_state_->callback = callback;
    279   start_state_->watcher_id =
    280       WatcherThreadManager::GetInstance()->StartWatching(
    281           handle,
    282           wait_flags,
    283           MojoDeadlineToTimeTicks(deadline),
    284           base::Bind(&HandleWatcher::OnHandleReady,
    285                      start_state_->weak_factory.GetWeakPtr()));
    286 }
    287 
    288 void HandleWatcher::Stop() {
    289   if (!start_state_.get())
    290     return;
    291 
    292   scoped_ptr<StartState> old_state(start_state_.Pass());
    293   WatcherThreadManager::GetInstance()->StopWatching(old_state->watcher_id);
    294 }
    295 
    296 void HandleWatcher::OnHandleReady(MojoResult result) {
    297   DCHECK(start_state_.get());
    298   scoped_ptr<StartState> old_state(start_state_.Pass());
    299   old_state->callback.Run(result);
    300 
    301   // NOTE: We may have been deleted during callback execution.
    302 }
    303 
    304 // static
    305 base::TimeTicks HandleWatcher::NowTicks() {
    306   return tick_clock_ ? tick_clock_->NowTicks() : base::TimeTicks::Now();
    307 }
    308 
    309 // static
    310 base::TimeTicks HandleWatcher::MojoDeadlineToTimeTicks(MojoDeadline deadline) {
    311   return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
    312       NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
    313 }
    314 
    315 }  // namespace common
    316 }  // namespace mojo
    317