Home | History | Annotate | Download | only in common
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/common/handle_watcher.h"
      6 
      7 #include <map>
      8 
      9 #include "base/atomic_sequence_num.h"
     10 #include "base/bind.h"
     11 #include "base/lazy_instance.h"
     12 #include "base/logging.h"
     13 #include "base/macros.h"
     14 #include "base/memory/singleton.h"
     15 #include "base/memory/weak_ptr.h"
     16 #include "base/message_loop/message_loop.h"
     17 #include "base/message_loop/message_loop_proxy.h"
     18 #include "base/synchronization/lock.h"
     19 #include "base/synchronization/waitable_event.h"
     20 #include "base/threading/thread.h"
     21 #include "base/threading/thread_restrictions.h"
     22 #include "base/time/time.h"
     23 #include "mojo/common/message_pump_mojo.h"
     24 #include "mojo/common/message_pump_mojo_handler.h"
     25 #include "mojo/common/time_helper.h"
     26 
     27 namespace mojo {
     28 namespace common {
     29 
     30 typedef int WatcherID;
     31 
     32 namespace {
     33 
     34 const char kWatcherThreadName[] = "handle-watcher-thread";
     35 
     36 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
     37   return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
     38       internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
     39 }
     40 
     41 // Tracks the data for a single call to Start().
     42 struct WatchData {
     43   WatchData()
     44       : id(0),
     45         handle_signals(MOJO_HANDLE_SIGNAL_NONE),
     46         message_loop(NULL) {}
     47 
     48   WatcherID id;
     49   Handle handle;
     50   MojoHandleSignals handle_signals;
     51   base::TimeTicks deadline;
     52   base::Callback<void(MojoResult)> callback;
     53   scoped_refptr<base::MessageLoopProxy> message_loop;
     54 };
     55 
     56 // WatcherBackend --------------------------------------------------------------
     57 
     58 // WatcherBackend is responsible for managing the requests and interacting with
     59 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
     60 // thread WatcherThreadManager creates.
     61 class WatcherBackend : public MessagePumpMojoHandler {
     62  public:
     63   WatcherBackend();
     64   virtual ~WatcherBackend();
     65 
     66   void StartWatching(const WatchData& data);
     67 
     68   // Cancels a previously scheduled request to start a watch.
     69   void StopWatching(WatcherID watcher_id);
     70 
     71  private:
     72   typedef std::map<Handle, WatchData> HandleToWatchDataMap;
     73 
     74   // Invoked when a handle needs to be removed and notified.
     75   void RemoveAndNotify(const Handle& handle, MojoResult result);
     76 
     77   // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
     78   // and sets |handle| to the Handle. Returns false if not a known id.
     79   bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
     80 
     81   // MessagePumpMojoHandler overrides:
     82   virtual void OnHandleReady(const Handle& handle) OVERRIDE;
     83   virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE;
     84 
     85   // Maps from assigned id to WatchData.
     86   HandleToWatchDataMap handle_to_data_;
     87 
     88   DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
     89 };
     90 
     91 WatcherBackend::WatcherBackend() {
     92 }
     93 
     94 WatcherBackend::~WatcherBackend() {
     95 }
     96 
     97 void WatcherBackend::StartWatching(const WatchData& data) {
     98   RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
     99 
    100   DCHECK_EQ(0u, handle_to_data_.count(data.handle));
    101 
    102   handle_to_data_[data.handle] = data;
    103   MessagePumpMojo::current()->AddHandler(this, data.handle,
    104                                          data.handle_signals,
    105                                          data.deadline);
    106 }
    107 
    108 void WatcherBackend::StopWatching(WatcherID watcher_id) {
    109   // Because of the thread hop it is entirely possible to get here and not
    110   // have a valid handle registered for |watcher_id|.
    111   Handle handle;
    112   if (GetMojoHandleByWatcherID(watcher_id, &handle)) {
    113     handle_to_data_.erase(handle);
    114     MessagePumpMojo::current()->RemoveHandler(handle);
    115   }
    116 }
    117 
    118 void WatcherBackend::RemoveAndNotify(const Handle& handle,
    119                                      MojoResult result) {
    120   if (handle_to_data_.count(handle) == 0)
    121     return;
    122 
    123   const WatchData data(handle_to_data_[handle]);
    124   handle_to_data_.erase(handle);
    125   MessagePumpMojo::current()->RemoveHandler(handle);
    126   data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
    127 }
    128 
    129 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
    130                                               Handle* handle) const {
    131   for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
    132        i != handle_to_data_.end(); ++i) {
    133     if (i->second.id == watcher_id) {
    134       *handle = i->second.handle;
    135       return true;
    136     }
    137   }
    138   return false;
    139 }
    140 
    141 void WatcherBackend::OnHandleReady(const Handle& handle) {
    142   RemoveAndNotify(handle, MOJO_RESULT_OK);
    143 }
    144 
    145 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
    146   RemoveAndNotify(handle, result);
    147 }
    148 
    149 // WatcherThreadManager --------------------------------------------------------
    150 
    151 // WatcherThreadManager manages the background thread that listens for handles
    152 // to be ready. All requests are handled by WatcherBackend.
    153 }  // namespace
    154 
    155 class WatcherThreadManager {
    156  public:
    157   ~WatcherThreadManager();
    158 
    159   // Returns the shared instance.
    160   static WatcherThreadManager* GetInstance();
    161 
    162   // Starts watching the requested handle. Returns a unique ID that is used to
    163   // stop watching the handle. When the handle is ready |callback| is notified
    164   // on the thread StartWatching() was invoked on.
    165   // This may be invoked on any thread.
    166   WatcherID StartWatching(const Handle& handle,
    167                           MojoHandleSignals handle_signals,
    168                           base::TimeTicks deadline,
    169                           const base::Callback<void(MojoResult)>& callback);
    170 
    171   // Stops watching a handle.
    172   // This may be invoked on any thread.
    173   void StopWatching(WatcherID watcher_id);
    174 
    175  private:
    176   enum RequestType {
    177     REQUEST_START,
    178     REQUEST_STOP,
    179   };
    180 
    181   // See description of |requests_| for details.
    182   struct RequestData {
    183     RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {}
    184 
    185     RequestType type;
    186     WatchData start_data;
    187     WatcherID stop_id;
    188     base::WaitableEvent* stop_event;
    189   };
    190 
    191   typedef std::vector<RequestData> Requests;
    192 
    193   friend struct DefaultSingletonTraits<WatcherThreadManager>;
    194 
    195   WatcherThreadManager();
    196 
    197   // Schedules a request on the background thread. See |requests_| for details.
    198   void AddRequest(const RequestData& data);
    199 
    200   // Processes requests added to |requests_|. This is invoked on the backend
    201   // thread.
    202   void ProcessRequestsOnBackendThread();
    203 
    204   base::Thread thread_;
    205 
    206   base::AtomicSequenceNumber watcher_id_generator_;
    207 
    208   WatcherBackend backend_;
    209 
    210   // Protects |requests_|.
    211   base::Lock lock_;
    212 
    213   // Start/Stop result in adding a RequestData to |requests_| (protected by
    214   // |lock_|). When the background thread wakes up it processes the requests.
    215   Requests requests_;
    216 
    217   DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
    218 };
    219 
    220 WatcherThreadManager::~WatcherThreadManager() {
    221   thread_.Stop();
    222 }
    223 
    224 WatcherThreadManager* WatcherThreadManager::GetInstance() {
    225   return Singleton<WatcherThreadManager>::get();
    226 }
    227 
    228 WatcherID WatcherThreadManager::StartWatching(
    229     const Handle& handle,
    230     MojoHandleSignals handle_signals,
    231     base::TimeTicks deadline,
    232     const base::Callback<void(MojoResult)>& callback) {
    233   RequestData request_data;
    234   request_data.type = REQUEST_START;
    235   request_data.start_data.id = watcher_id_generator_.GetNext();
    236   request_data.start_data.handle = handle;
    237   request_data.start_data.callback = callback;
    238   request_data.start_data.handle_signals = handle_signals;
    239   request_data.start_data.deadline = deadline;
    240   request_data.start_data.message_loop = base::MessageLoopProxy::current();
    241   DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL),
    242             request_data.start_data.message_loop.get());
    243   AddRequest(request_data);
    244   return request_data.start_data.id;
    245 }
    246 
    247 void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
    248   // Handle the case of StartWatching() followed by StopWatching() before
    249   // |thread_| woke up.
    250   {
    251     base::AutoLock auto_lock(lock_);
    252     for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) {
    253       if (i->type == REQUEST_START && i->start_data.id == watcher_id) {
    254         // Watcher ids are not reused, so if we find it we can stop.
    255         requests_.erase(i);
    256         return;
    257       }
    258     }
    259   }
    260 
    261   base::ThreadRestrictions::ScopedAllowWait allow_wait;
    262   base::WaitableEvent event(true, false);
    263   RequestData request_data;
    264   request_data.type = REQUEST_STOP;
    265   request_data.stop_id = watcher_id;
    266   request_data.stop_event = &event;
    267   AddRequest(request_data);
    268 
    269   // We need to block until the handle is actually removed.
    270   event.Wait();
    271 }
    272 
    273 void WatcherThreadManager::AddRequest(const RequestData& data) {
    274   {
    275     base::AutoLock auto_lock(lock_);
    276     const bool was_empty = requests_.empty();
    277     requests_.push_back(data);
    278     if (!was_empty)
    279       return;
    280   }
    281   // We own |thread_|, so it's safe to use Unretained() here.
    282   thread_.message_loop()->PostTask(
    283       FROM_HERE,
    284       base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread,
    285                  base::Unretained(this)));
    286 }
    287 
    288 void WatcherThreadManager::ProcessRequestsOnBackendThread() {
    289   DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current());
    290 
    291   Requests requests;
    292   {
    293     base::AutoLock auto_lock(lock_);
    294     requests_.swap(requests);
    295   }
    296   for (size_t i = 0; i < requests.size(); ++i) {
    297     if (requests[i].type == REQUEST_START) {
    298       backend_.StartWatching(requests[i].start_data);
    299     } else {
    300       backend_.StopWatching(requests[i].stop_id);
    301       requests[i].stop_event->Signal();
    302     }
    303   }
    304 }
    305 
    306 WatcherThreadManager::WatcherThreadManager()
    307     : thread_(kWatcherThreadName) {
    308   base::Thread::Options thread_options;
    309   thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
    310   thread_.StartWithOptions(thread_options);
    311 }
    312 
    313 // HandleWatcher::StateBase and subclasses -------------------------------------
    314 
    315 // The base class of HandleWatcher's state. Owns the user's callback and
    316 // monitors the current thread's MessageLoop to know when to force the callback
    317 // to run (with an error) even though the pipe hasn't been signaled yet.
    318 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
    319  public:
    320   StateBase(HandleWatcher* watcher,
    321             const base::Callback<void(MojoResult)>& callback)
    322       : watcher_(watcher),
    323         callback_(callback),
    324         got_ready_(false) {
    325     base::MessageLoop::current()->AddDestructionObserver(this);
    326   }
    327 
    328   virtual ~StateBase() {
    329     base::MessageLoop::current()->RemoveDestructionObserver(this);
    330   }
    331 
    332  protected:
    333   void NotifyHandleReady(MojoResult result) {
    334     got_ready_ = true;
    335     NotifyAndDestroy(result);
    336   }
    337 
    338   bool got_ready() const { return got_ready_; }
    339 
    340  private:
    341   virtual void WillDestroyCurrentMessageLoop() OVERRIDE {
    342     // The current thread is exiting. Simulate a watch error.
    343     NotifyAndDestroy(MOJO_RESULT_ABORTED);
    344   }
    345 
    346   void NotifyAndDestroy(MojoResult result) {
    347     base::Callback<void(MojoResult)> callback = callback_;
    348     watcher_->Stop();  // Destroys |this|.
    349 
    350     callback.Run(result);
    351   }
    352 
    353   HandleWatcher* watcher_;
    354   base::Callback<void(MojoResult)> callback_;
    355 
    356   // Have we been notified that the handle is ready?
    357   bool got_ready_;
    358 
    359   DISALLOW_COPY_AND_ASSIGN(StateBase);
    360 };
    361 
    362 // If the thread on which HandleWatcher is used runs MessagePumpMojo,
    363 // SameThreadWatchingState is used to directly watch the handle on the same
    364 // thread.
    365 class HandleWatcher::SameThreadWatchingState : public StateBase,
    366                                                public MessagePumpMojoHandler {
    367  public:
    368   SameThreadWatchingState(HandleWatcher* watcher,
    369                           const Handle& handle,
    370                           MojoHandleSignals handle_signals,
    371                           MojoDeadline deadline,
    372                           const base::Callback<void(MojoResult)>& callback)
    373       : StateBase(watcher, callback),
    374         handle_(handle) {
    375     DCHECK(MessagePumpMojo::IsCurrent());
    376 
    377     MessagePumpMojo::current()->AddHandler(
    378         this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline));
    379   }
    380 
    381   virtual ~SameThreadWatchingState() {
    382     if (!got_ready())
    383       MessagePumpMojo::current()->RemoveHandler(handle_);
    384   }
    385 
    386  private:
    387   // MessagePumpMojoHandler overrides:
    388   virtual void OnHandleReady(const Handle& handle) OVERRIDE {
    389     StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK);
    390   }
    391 
    392   virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE {
    393     StopWatchingAndNotifyReady(handle, result);
    394   }
    395 
    396   void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) {
    397     DCHECK_EQ(handle.value(), handle_.value());
    398     MessagePumpMojo::current()->RemoveHandler(handle_);
    399     NotifyHandleReady(result);
    400   }
    401 
    402   Handle handle_;
    403 
    404   DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState);
    405 };
    406 
    407 // If the thread on which HandleWatcher is used runs a message pump different
    408 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the
    409 // handle on the handle watcher thread.
    410 class HandleWatcher::SecondaryThreadWatchingState : public StateBase {
    411  public:
    412   SecondaryThreadWatchingState(HandleWatcher* watcher,
    413                                const Handle& handle,
    414                                MojoHandleSignals handle_signals,
    415                                MojoDeadline deadline,
    416                                const base::Callback<void(MojoResult)>& callback)
    417       : StateBase(watcher, callback),
    418         weak_factory_(this) {
    419     watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
    420         handle,
    421         handle_signals,
    422         MojoDeadlineToTimeTicks(deadline),
    423         base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady,
    424                    weak_factory_.GetWeakPtr()));
    425   }
    426 
    427   virtual ~SecondaryThreadWatchingState() {
    428     // If we've been notified the handle is ready (|got_ready()| is true) then
    429     // the watch has been implicitly removed by
    430     // WatcherThreadManager/MessagePumpMojo and we don't have to call
    431     // StopWatching(). To do so would needlessly entail posting a task and
    432     // blocking until the background thread services it.
    433     if (!got_ready())
    434       WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
    435   }
    436 
    437  private:
    438   WatcherID watcher_id_;
    439 
    440   // Used to weakly bind |this| to the WatcherThreadManager.
    441   base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_;
    442 
    443   DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState);
    444 };
    445 
    446 // HandleWatcher ---------------------------------------------------------------
    447 
    448 HandleWatcher::HandleWatcher() {
    449 }
    450 
    451 HandleWatcher::~HandleWatcher() {
    452 }
    453 
    454 void HandleWatcher::Start(const Handle& handle,
    455                           MojoHandleSignals handle_signals,
    456                           MojoDeadline deadline,
    457                           const base::Callback<void(MojoResult)>& callback) {
    458   DCHECK(handle.is_valid());
    459   DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
    460 
    461   if (MessagePumpMojo::IsCurrent()) {
    462     state_.reset(new SameThreadWatchingState(
    463         this, handle, handle_signals, deadline, callback));
    464   } else {
    465     state_.reset(new SecondaryThreadWatchingState(
    466         this, handle, handle_signals, deadline, callback));
    467   }
    468 }
    469 
    470 void HandleWatcher::Stop() {
    471   state_.reset();
    472 }
    473 
    474 }  // namespace common
    475 }  // namespace mojo
    476