Home | History | Annotate | Download | only in system
      1 // Copyright 2015 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/edk/system/wait_set_dispatcher.h"
      6 
      7 #include <stdint.h>
      8 
      9 #include <algorithm>
     10 #include <utility>
     11 
     12 #include "base/logging.h"
     13 #include "mojo/edk/system/awakable.h"
     14 
     15 namespace mojo {
     16 namespace edk {
     17 
     18 class WaitSetDispatcher::Waiter final : public Awakable {
     19  public:
     20   explicit Waiter(WaitSetDispatcher* dispatcher) : dispatcher_(dispatcher) {}
     21   ~Waiter() {}
     22 
     23   // |Awakable| implementation.
     24   bool Awake(MojoResult result, uintptr_t context) override {
     25     // Note: This is called with various Mojo locks held.
     26     dispatcher_->WakeDispatcher(result, context);
     27     // Removes |this| from the dispatcher's list of waiters.
     28     return false;
     29   }
     30 
     31  private:
     32   WaitSetDispatcher* const dispatcher_;
     33 };
     34 
     35 WaitSetDispatcher::WaitState::WaitState() {}
     36 
     37 WaitSetDispatcher::WaitState::WaitState(const WaitState& other) = default;
     38 
     39 WaitSetDispatcher::WaitState::~WaitState() {}
     40 
     41 WaitSetDispatcher::WaitSetDispatcher()
     42     : waiter_(new WaitSetDispatcher::Waiter(this)) {}
     43 
     44 Dispatcher::Type WaitSetDispatcher::GetType() const {
     45   return Type::WAIT_SET;
     46 }
     47 
     48 MojoResult WaitSetDispatcher::Close() {
     49   base::AutoLock lock(lock_);
     50 
     51   if (is_closed_)
     52     return MOJO_RESULT_INVALID_ARGUMENT;
     53   is_closed_ = true;
     54 
     55   {
     56     base::AutoLock locker(awakable_lock_);
     57     awakable_list_.CancelAll();
     58   }
     59 
     60   for (const auto& entry : waiting_dispatchers_)
     61     entry.second.dispatcher->RemoveAwakable(waiter_.get(), nullptr);
     62   waiting_dispatchers_.clear();
     63 
     64   base::AutoLock locker(awoken_lock_);
     65   awoken_queue_.clear();
     66   processed_dispatchers_.clear();
     67 
     68   return MOJO_RESULT_OK;
     69 }
     70 
     71 MojoResult WaitSetDispatcher::AddWaitingDispatcher(
     72     const scoped_refptr<Dispatcher>& dispatcher,
     73     MojoHandleSignals signals,
     74     uintptr_t context) {
     75   if (dispatcher == this)
     76     return MOJO_RESULT_INVALID_ARGUMENT;
     77 
     78   base::AutoLock lock(lock_);
     79 
     80   if (is_closed_)
     81     return MOJO_RESULT_INVALID_ARGUMENT;
     82 
     83   uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
     84   auto it = waiting_dispatchers_.find(dispatcher_handle);
     85   if (it != waiting_dispatchers_.end()) {
     86     return MOJO_RESULT_ALREADY_EXISTS;
     87   }
     88 
     89   const MojoResult result = dispatcher->AddAwakable(waiter_.get(), signals,
     90                                                     dispatcher_handle, nullptr);
     91   if (result == MOJO_RESULT_INVALID_ARGUMENT) {
     92     // Dispatcher is closed.
     93     return result;
     94   } else if (result != MOJO_RESULT_OK) {
     95     WakeDispatcher(result, dispatcher_handle);
     96   }
     97 
     98   WaitState state;
     99   state.dispatcher = dispatcher;
    100   state.context = context;
    101   state.signals = signals;
    102   bool inserted = waiting_dispatchers_.insert(
    103       std::make_pair(dispatcher_handle, state)).second;
    104   DCHECK(inserted);
    105 
    106   return MOJO_RESULT_OK;
    107 }
    108 
    109 MojoResult WaitSetDispatcher::RemoveWaitingDispatcher(
    110     const scoped_refptr<Dispatcher>& dispatcher) {
    111   uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
    112 
    113   base::AutoLock lock(lock_);
    114   if (is_closed_)
    115     return MOJO_RESULT_INVALID_ARGUMENT;
    116 
    117   auto it = waiting_dispatchers_.find(dispatcher_handle);
    118   if (it == waiting_dispatchers_.end())
    119     return MOJO_RESULT_NOT_FOUND;
    120 
    121   dispatcher->RemoveAwakable(waiter_.get(), nullptr);
    122   // At this point, it should not be possible for |waiter_| to be woken with
    123   // |dispatcher|.
    124   waiting_dispatchers_.erase(it);
    125 
    126   base::AutoLock locker(awoken_lock_);
    127   int num_erased = 0;
    128   for (auto it = awoken_queue_.begin(); it != awoken_queue_.end();) {
    129     if (it->first == dispatcher_handle) {
    130       it = awoken_queue_.erase(it);
    131       num_erased++;
    132     } else {
    133       ++it;
    134     }
    135   }
    136   // The dispatcher should only exist in the queue once.
    137   DCHECK_LE(num_erased, 1);
    138   processed_dispatchers_.erase(
    139       std::remove(processed_dispatchers_.begin(), processed_dispatchers_.end(),
    140                   dispatcher_handle),
    141       processed_dispatchers_.end());
    142 
    143   return MOJO_RESULT_OK;
    144 }
    145 
    146 MojoResult WaitSetDispatcher::GetReadyDispatchers(
    147     uint32_t* count,
    148     DispatcherVector* dispatchers,
    149     MojoResult* results,
    150     uintptr_t* contexts) {
    151   base::AutoLock lock(lock_);
    152 
    153   if (is_closed_)
    154     return MOJO_RESULT_INVALID_ARGUMENT;
    155 
    156   dispatchers->clear();
    157 
    158   // Re-queue any already retrieved dispatchers. These should be the dispatchers
    159   // that were returned on the last call to this function. This loop is
    160   // necessary to preserve the logically level-triggering behaviour of waiting
    161   // in Mojo. In particular, if no action is taken on a signal, that signal
    162   // continues to be satisfied, and therefore a |MojoWait()| on that
    163   // handle/signal continues to return immediately.
    164   std::deque<uintptr_t> pending;
    165   {
    166     base::AutoLock locker(awoken_lock_);
    167     pending.swap(processed_dispatchers_);
    168   }
    169   for (uintptr_t d : pending) {
    170     auto it = waiting_dispatchers_.find(d);
    171     // Anything in |processed_dispatchers_| should also be in
    172     // |waiting_dispatchers_| since dispatchers are removed from both in
    173     // |RemoveWaitingDispatcherImplNoLock()|.
    174     DCHECK(it != waiting_dispatchers_.end());
    175 
    176     // |awoken_mutex_| cannot be held here because
    177     // |Dispatcher::AddAwakable()| acquires the Dispatcher's mutex. This
    178     // mutex is held while running |WakeDispatcher()| below, which needs to
    179     // acquire |awoken_mutex_|. Holding |awoken_mutex_| here would result in
    180     // a deadlock.
    181     const MojoResult result = it->second.dispatcher->AddAwakable(
    182         waiter_.get(), it->second.signals, d, nullptr);
    183 
    184     if (result == MOJO_RESULT_INVALID_ARGUMENT) {
    185       // Dispatcher is closed. Implicitly remove it from the wait set since
    186       // it may be impossible to remove using |MojoRemoveHandle()|.
    187       waiting_dispatchers_.erase(it);
    188     } else if (result != MOJO_RESULT_OK) {
    189       WakeDispatcher(result, d);
    190     }
    191   }
    192 
    193   const uint32_t max_woken = *count;
    194   uint32_t num_woken = 0;
    195 
    196   base::AutoLock locker(awoken_lock_);
    197   while (!awoken_queue_.empty() && num_woken < max_woken) {
    198     uintptr_t d = awoken_queue_.front().first;
    199     MojoResult result = awoken_queue_.front().second;
    200     awoken_queue_.pop_front();
    201 
    202     auto it = waiting_dispatchers_.find(d);
    203     DCHECK(it != waiting_dispatchers_.end());
    204 
    205     results[num_woken] = result;
    206     dispatchers->push_back(it->second.dispatcher);
    207     if (contexts)
    208       contexts[num_woken] = it->second.context;
    209 
    210     if (result != MOJO_RESULT_CANCELLED) {
    211       processed_dispatchers_.push_back(d);
    212     } else {
    213       // |MOJO_RESULT_CANCELLED| indicates that the dispatcher was closed.
    214       // Return it, but also implcitly remove it from the wait set.
    215       waiting_dispatchers_.erase(it);
    216     }
    217 
    218     num_woken++;
    219   }
    220 
    221   *count = num_woken;
    222   if (!num_woken)
    223     return MOJO_RESULT_SHOULD_WAIT;
    224 
    225   return MOJO_RESULT_OK;
    226 }
    227 
    228 HandleSignalsState WaitSetDispatcher::GetHandleSignalsState() const {
    229   base::AutoLock lock(lock_);
    230   return GetHandleSignalsStateNoLock();
    231 }
    232 
    233 HandleSignalsState WaitSetDispatcher::GetHandleSignalsStateNoLock() const {
    234   lock_.AssertAcquired();
    235   if (is_closed_)
    236     return HandleSignalsState();
    237 
    238   HandleSignalsState rv;
    239   rv.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
    240   base::AutoLock locker(awoken_lock_);
    241   if (!awoken_queue_.empty() || !processed_dispatchers_.empty())
    242     rv.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
    243   return rv;
    244 }
    245 
    246 MojoResult WaitSetDispatcher::AddAwakable(Awakable* awakable,
    247                                           MojoHandleSignals signals,
    248                                           uintptr_t context,
    249                                           HandleSignalsState* signals_state) {
    250   base::AutoLock lock(lock_);
    251   // |awakable_lock_| is acquired here instead of immediately before adding to
    252   // |awakable_list_| because we need to check the signals state and add to
    253   // |awakable_list_| as an atomic operation. If the pair isn't atomic, it is
    254   // possible for the signals state to change after it is checked, but before
    255   // the awakable is added. In that case, the added awakable won't be signalled.
    256   base::AutoLock awakable_locker(awakable_lock_);
    257   HandleSignalsState state(GetHandleSignalsStateNoLock());
    258   if (state.satisfies(signals)) {
    259     if (signals_state)
    260       *signals_state = state;
    261     return MOJO_RESULT_ALREADY_EXISTS;
    262   }
    263   if (!state.can_satisfy(signals)) {
    264     if (signals_state)
    265       *signals_state = state;
    266     return MOJO_RESULT_FAILED_PRECONDITION;
    267   }
    268 
    269   awakable_list_.Add(awakable, signals, context);
    270   return MOJO_RESULT_OK;
    271 }
    272 
    273 void WaitSetDispatcher::RemoveAwakable(Awakable* awakable,
    274                                        HandleSignalsState* signals_state) {
    275   {
    276     base::AutoLock locker(awakable_lock_);
    277     awakable_list_.Remove(awakable);
    278   }
    279   if (signals_state)
    280     *signals_state = GetHandleSignalsState();
    281 }
    282 
    283 bool WaitSetDispatcher::BeginTransit() {
    284   // You can't transfer wait sets!
    285   return false;
    286 }
    287 
    288 WaitSetDispatcher::~WaitSetDispatcher() {
    289   DCHECK(waiting_dispatchers_.empty());
    290   DCHECK(awoken_queue_.empty());
    291   DCHECK(processed_dispatchers_.empty());
    292 }
    293 
    294 void WaitSetDispatcher::WakeDispatcher(MojoResult result, uintptr_t context) {
    295   {
    296     base::AutoLock locker(awoken_lock_);
    297 
    298     if (result == MOJO_RESULT_ALREADY_EXISTS)
    299       result = MOJO_RESULT_OK;
    300 
    301     awoken_queue_.push_back(std::make_pair(context, result));
    302   }
    303 
    304   base::AutoLock locker(awakable_lock_);
    305   HandleSignalsState signals_state;
    306   signals_state.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
    307   signals_state.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
    308   awakable_list_.AwakeForStateChange(signals_state);
    309 }
    310 
    311 }  // namespace edk
    312 }  // namespace mojo
    313