Home | History | Annotate | Download | only in message_pump
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/message_pump/handle_watcher.h"
      6 
      7 #include <stddef.h>
      8 #include <stdint.h>
      9 
     10 #include <map>
     11 
     12 #include "base/atomic_sequence_num.h"
     13 #include "base/bind.h"
     14 #include "base/lazy_instance.h"
     15 #include "base/logging.h"
     16 #include "base/macros.h"
     17 #include "base/memory/singleton.h"
     18 #include "base/memory/weak_ptr.h"
     19 #include "base/message_loop/message_loop.h"
     20 #include "base/single_thread_task_runner.h"
     21 #include "base/synchronization/lock.h"
     22 #include "base/threading/thread.h"
     23 #include "base/threading/thread_task_runner_handle.h"
     24 #include "base/time/time.h"
     25 #include "mojo/message_pump/message_pump_mojo.h"
     26 #include "mojo/message_pump/message_pump_mojo_handler.h"
     27 #include "mojo/message_pump/time_helper.h"
     28 #include "mojo/public/c/system/message_pipe.h"
     29 
     30 namespace mojo {
     31 namespace common {
     32 
     33 typedef int WatcherID;
     34 
     35 namespace {
     36 
     37 const char kWatcherThreadName[] = "handle-watcher-thread";
     38 
     39 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
     40   return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
     41       internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
     42 }
     43 
     44 // Tracks the data for a single call to Start().
     45 struct WatchData {
     46   WatchData()
     47       : id(0), handle_signals(MOJO_HANDLE_SIGNAL_NONE), task_runner(NULL) {}
     48 
     49   WatcherID id;
     50   Handle handle;
     51   MojoHandleSignals handle_signals;
     52   base::TimeTicks deadline;
     53   base::Callback<void(MojoResult)> callback;
     54   scoped_refptr<base::SingleThreadTaskRunner> task_runner;
     55 };
     56 
     57 // WatcherBackend --------------------------------------------------------------
     58 
     59 // WatcherBackend is responsible for managing the requests and interacting with
     60 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
     61 // thread WatcherThreadManager creates.
     62 class WatcherBackend : public MessagePumpMojoHandler {
     63  public:
     64   WatcherBackend();
     65   ~WatcherBackend() override;
     66 
     67   void StartWatching(const WatchData& data);
     68   void StopWatching(WatcherID watcher_id);
     69 
     70  private:
     71   typedef std::map<Handle, WatchData> HandleToWatchDataMap;
     72 
     73   // Invoked when a handle needs to be removed and notified.
     74   void RemoveAndNotify(const Handle& handle, MojoResult result);
     75 
     76   // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
     77   // and sets |handle| to the Handle. Returns false if not a known id.
     78   bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
     79 
     80   // MessagePumpMojoHandler overrides:
     81   void OnHandleReady(const Handle& handle) override;
     82   void OnHandleError(const Handle& handle, MojoResult result) override;
     83 
     84   // Maps from assigned id to WatchData.
     85   HandleToWatchDataMap handle_to_data_;
     86 
     87   DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
     88 };
     89 
     90 WatcherBackend::WatcherBackend() {
     91 }
     92 
     93 WatcherBackend::~WatcherBackend() {
     94 }
     95 
     96 void WatcherBackend::StartWatching(const WatchData& data) {
     97   RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
     98 
     99   DCHECK_EQ(0u, handle_to_data_.count(data.handle));
    100 
    101   handle_to_data_[data.handle] = data;
    102   MessagePumpMojo::current()->AddHandler(this, data.handle,
    103                                          data.handle_signals,
    104                                          data.deadline);
    105 }
    106 
    107 void WatcherBackend::StopWatching(WatcherID watcher_id) {
    108   // Because of the thread hop it is entirely possible to get here and not
    109   // have a valid handle registered for |watcher_id|.
    110   Handle handle;
    111   if (!GetMojoHandleByWatcherID(watcher_id, &handle))
    112     return;
    113 
    114   handle_to_data_.erase(handle);
    115   MessagePumpMojo::current()->RemoveHandler(handle);
    116 }
    117 
    118 void WatcherBackend::RemoveAndNotify(const Handle& handle,
    119                                      MojoResult result) {
    120   if (handle_to_data_.count(handle) == 0)
    121     return;
    122 
    123   const WatchData data(handle_to_data_[handle]);
    124   handle_to_data_.erase(handle);
    125   MessagePumpMojo::current()->RemoveHandler(handle);
    126 
    127   data.task_runner->PostTask(FROM_HERE, base::Bind(data.callback, result));
    128 }
    129 
    130 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
    131                                               Handle* handle) const {
    132   for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
    133        i != handle_to_data_.end(); ++i) {
    134     if (i->second.id == watcher_id) {
    135       *handle = i->second.handle;
    136       return true;
    137     }
    138   }
    139   return false;
    140 }
    141 
    142 void WatcherBackend::OnHandleReady(const Handle& handle) {
    143   RemoveAndNotify(handle, MOJO_RESULT_OK);
    144 }
    145 
    146 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
    147   RemoveAndNotify(handle, result);
    148 }
    149 
    150 // WatcherThreadManager --------------------------------------------------------
    151 
    152 // WatcherThreadManager manages the background thread that listens for handles
    153 // to be ready. All requests are handled by WatcherBackend.
    154 class WatcherThreadManager {
    155  public:
    156   ~WatcherThreadManager();
    157 
    158   // Returns the shared instance.
    159   static WatcherThreadManager* GetInstance();
    160 
    161   // Starts watching the requested handle. Returns a unique ID that is used to
    162   // stop watching the handle. When the handle is ready |callback| is notified
    163   // on the thread StartWatching() was invoked on.
    164   // This may be invoked on any thread.
    165   WatcherID StartWatching(const Handle& handle,
    166                           MojoHandleSignals handle_signals,
    167                           base::TimeTicks deadline,
    168                           const base::Callback<void(MojoResult)>& callback);
    169 
    170   // Stops watching a handle.
    171   // This may be invoked on any thread.
    172   void StopWatching(WatcherID watcher_id);
    173 
    174  private:
    175   enum RequestType {
    176     REQUEST_START,
    177     REQUEST_STOP,
    178   };
    179 
    180   // See description of |requests_| for details.
    181   struct RequestData {
    182     RequestData() : type(REQUEST_START), stop_id(0) {}
    183 
    184     RequestType type;
    185     WatchData start_data;
    186     WatcherID stop_id;
    187   };
    188 
    189   typedef std::vector<RequestData> Requests;
    190 
    191   friend struct base::DefaultSingletonTraits<WatcherThreadManager>;
    192 
    193   WatcherThreadManager();
    194 
    195   // Schedules a request on the background thread. See |requests_| for details.
    196   void AddRequest(const RequestData& data);
    197 
    198   // Processes requests added to |requests_|. This is invoked on the backend
    199   // thread.
    200   void ProcessRequestsOnBackendThread();
    201 
    202   base::Thread thread_;
    203 
    204   base::AtomicSequenceNumber watcher_id_generator_;
    205 
    206   WatcherBackend backend_;
    207 
    208   // Protects |requests_|.
    209   base::Lock lock_;
    210 
    211   // Start/Stop result in adding a RequestData to |requests_| (protected by
    212   // |lock_|). When the background thread wakes up it processes the requests.
    213   Requests requests_;
    214 
    215   DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
    216 };
    217 
    218 WatcherThreadManager::~WatcherThreadManager() {
    219   thread_.Stop();
    220 }
    221 
    222 WatcherThreadManager* WatcherThreadManager::GetInstance() {
    223   return base::Singleton<WatcherThreadManager>::get();
    224 }
    225 
    226 WatcherID WatcherThreadManager::StartWatching(
    227     const Handle& handle,
    228     MojoHandleSignals handle_signals,
    229     base::TimeTicks deadline,
    230     const base::Callback<void(MojoResult)>& callback) {
    231   RequestData request_data;
    232   request_data.type = REQUEST_START;
    233   request_data.start_data.id = watcher_id_generator_.GetNext();
    234   request_data.start_data.handle = handle;
    235   request_data.start_data.callback = callback;
    236   request_data.start_data.handle_signals = handle_signals;
    237   request_data.start_data.deadline = deadline;
    238   request_data.start_data.task_runner = base::ThreadTaskRunnerHandle::Get();
    239   AddRequest(request_data);
    240   return request_data.start_data.id;
    241 }
    242 
    243 void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
    244   // Handle the case of StartWatching() followed by StopWatching() before
    245   // |thread_| woke up.
    246   {
    247     base::AutoLock auto_lock(lock_);
    248     for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) {
    249       if (i->type == REQUEST_START && i->start_data.id == watcher_id) {
    250         // Watcher ids are not reused, so if we find it we can stop.
    251         requests_.erase(i);
    252         return;
    253       }
    254     }
    255   }
    256 
    257   RequestData request_data;
    258   request_data.type = REQUEST_STOP;
    259   request_data.stop_id = watcher_id;
    260   AddRequest(request_data);
    261 }
    262 
    263 void WatcherThreadManager::AddRequest(const RequestData& data) {
    264   {
    265     base::AutoLock auto_lock(lock_);
    266     const bool was_empty = requests_.empty();
    267     requests_.push_back(data);
    268     if (!was_empty)
    269       return;
    270   }
    271 
    272   // We outlive |thread_|, so it's safe to use Unretained() here.
    273   thread_.task_runner()->PostTask(
    274       FROM_HERE,
    275       base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread,
    276                  base::Unretained(this)));
    277 }
    278 
    279 void WatcherThreadManager::ProcessRequestsOnBackendThread() {
    280   DCHECK(thread_.task_runner()->BelongsToCurrentThread());
    281 
    282   Requests requests;
    283   {
    284     base::AutoLock auto_lock(lock_);
    285     requests_.swap(requests);
    286   }
    287   for (size_t i = 0; i < requests.size(); ++i) {
    288     if (requests[i].type == REQUEST_START) {
    289       backend_.StartWatching(requests[i].start_data);
    290     } else {
    291       backend_.StopWatching(requests[i].stop_id);
    292     }
    293   }
    294 }
    295 
    296 WatcherThreadManager::WatcherThreadManager()
    297     : thread_(kWatcherThreadName) {
    298   base::Thread::Options thread_options;
    299   thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
    300   thread_.StartWithOptions(thread_options);
    301 }
    302 
    303 }  // namespace
    304 
    305 // HandleWatcher::StateBase and subclasses -------------------------------------
    306 
    307 // The base class of HandleWatcher's state. Owns the user's callback and
    308 // monitors the current thread's MessageLoop to know when to force the callback
    309 // to run (with an error) even though the pipe hasn't been signaled yet.
    310 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
    311  public:
    312   StateBase(HandleWatcher* watcher,
    313             const base::Callback<void(MojoResult)>& callback)
    314       : watcher_(watcher),
    315         callback_(callback),
    316         got_ready_(false) {
    317     base::MessageLoop::current()->AddDestructionObserver(this);
    318   }
    319 
    320   ~StateBase() override {
    321     base::MessageLoop::current()->RemoveDestructionObserver(this);
    322   }
    323 
    324  protected:
    325   void NotifyHandleReady(MojoResult result) {
    326     got_ready_ = true;
    327     NotifyAndDestroy(result);
    328   }
    329 
    330   bool got_ready() const { return got_ready_; }
    331 
    332  private:
    333   void WillDestroyCurrentMessageLoop() override {
    334     // The current thread is exiting. Simulate a watch error.
    335     NotifyAndDestroy(MOJO_RESULT_ABORTED);
    336   }
    337 
    338   void NotifyAndDestroy(MojoResult result) {
    339     base::Callback<void(MojoResult)> callback = callback_;
    340     watcher_->Stop();  // Destroys |this|.
    341 
    342     callback.Run(result);
    343   }
    344 
    345   HandleWatcher* watcher_;
    346   base::Callback<void(MojoResult)> callback_;
    347 
    348   // Have we been notified that the handle is ready?
    349   bool got_ready_;
    350 
    351   DISALLOW_COPY_AND_ASSIGN(StateBase);
    352 };
    353 
    354 // If the thread on which HandleWatcher is used runs MessagePumpMojo,
    355 // SameThreadWatchingState is used to directly watch the handle on the same
    356 // thread.
    357 class HandleWatcher::SameThreadWatchingState : public StateBase,
    358                                                public MessagePumpMojoHandler {
    359  public:
    360   SameThreadWatchingState(HandleWatcher* watcher,
    361                           const Handle& handle,
    362                           MojoHandleSignals handle_signals,
    363                           MojoDeadline deadline,
    364                           const base::Callback<void(MojoResult)>& callback)
    365       : StateBase(watcher, callback),
    366         handle_(handle) {
    367     DCHECK(MessagePumpMojo::IsCurrent());
    368 
    369     MessagePumpMojo::current()->AddHandler(
    370         this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline));
    371   }
    372 
    373   ~SameThreadWatchingState() override {
    374     if (!got_ready())
    375       MessagePumpMojo::current()->RemoveHandler(handle_);
    376   }
    377 
    378  private:
    379   // MessagePumpMojoHandler overrides:
    380   void OnHandleReady(const Handle& handle) override {
    381     StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK);
    382   }
    383 
    384   void OnHandleError(const Handle& handle, MojoResult result) override {
    385     StopWatchingAndNotifyReady(handle, result);
    386   }
    387 
    388   void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) {
    389     DCHECK_EQ(handle.value(), handle_.value());
    390     MessagePumpMojo::current()->RemoveHandler(handle_);
    391     NotifyHandleReady(result);
    392   }
    393 
    394   Handle handle_;
    395 
    396   DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState);
    397 };
    398 
    399 // If the thread on which HandleWatcher is used runs a message pump different
    400 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the
    401 // handle on the handle watcher thread.
    402 class HandleWatcher::SecondaryThreadWatchingState : public StateBase {
    403  public:
    404   SecondaryThreadWatchingState(HandleWatcher* watcher,
    405                                const Handle& handle,
    406                                MojoHandleSignals handle_signals,
    407                                MojoDeadline deadline,
    408                                const base::Callback<void(MojoResult)>& callback)
    409       : StateBase(watcher, callback),
    410         weak_factory_(this) {
    411     watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
    412         handle,
    413         handle_signals,
    414         MojoDeadlineToTimeTicks(deadline),
    415         base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady,
    416                    weak_factory_.GetWeakPtr()));
    417   }
    418 
    419   ~SecondaryThreadWatchingState() override {
    420     // If we've been notified the handle is ready (|got_ready()| is true) then
    421     // the watch has been implicitly removed by
    422     // WatcherThreadManager/MessagePumpMojo and we don't have to call
    423     // StopWatching(). To do so would needlessly entail posting a task and
    424     // blocking until the background thread services it.
    425     if (!got_ready())
    426       WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
    427   }
    428 
    429  private:
    430   WatcherID watcher_id_;
    431 
    432   // Used to weakly bind |this| to the WatcherThreadManager.
    433   base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_;
    434 
    435   DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState);
    436 };
    437 
    438 // HandleWatcher ---------------------------------------------------------------
    439 
    440 HandleWatcher::HandleWatcher() {
    441 }
    442 
    443 HandleWatcher::~HandleWatcher() {
    444 }
    445 
    446 void HandleWatcher::Start(const Handle& handle,
    447                           MojoHandleSignals handle_signals,
    448                           MojoDeadline deadline,
    449                           const base::Callback<void(MojoResult)>& callback) {
    450   DCHECK(handle.is_valid());
    451   DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
    452 
    453   // Need to clear the state before creating a new one.
    454   state_.reset();
    455   if (MessagePumpMojo::IsCurrent()) {
    456     state_.reset(new SameThreadWatchingState(
    457         this, handle, handle_signals, deadline, callback));
    458   } else {
    459 #if !defined(OFFICIAL_BUILD)
    460     // Just for making debugging non-transferable message pipes easier. Since
    461     // they can't be sent after they're read/written/listened to,
    462     // MessagePipeDispatcher saves the callstack of when it's "bound" to a
    463     // pipe id. Triggering a read here, instead of later in the PostTask, means
    464     // we have a callstack that is useful to check if the pipe is erronously
    465     // attempted to be sent.
    466     uint32_t temp = 0;
    467     MojoReadMessage(handle.value(), nullptr, &temp, nullptr, nullptr,
    468                     MOJO_READ_MESSAGE_FLAG_NONE);
    469 #endif
    470     state_.reset(new SecondaryThreadWatchingState(
    471         this, handle, handle_signals, deadline, callback));
    472   }
    473 }
    474 
    475 void HandleWatcher::Stop() {
    476   state_.reset();
    477 }
    478 
    479 }  // namespace common
    480 }  // namespace mojo
    481