Home | History | Annotate | Download | only in system
      1 // Copyright 2017 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/public/cpp/system/wait_set.h"
      6 
      7 #include <algorithm>
      8 #include <limits>
      9 #include <map>
     10 #include <set>
     11 #include <vector>
     12 
     13 #include "base/containers/stack_container.h"
     14 #include "base/logging.h"
     15 #include "base/macros.h"
     16 #include "base/memory/ptr_util.h"
     17 #include "base/synchronization/lock.h"
     18 #include "base/synchronization/waitable_event.h"
     19 #include "mojo/public/cpp/system/trap.h"
     20 
     21 namespace mojo {
     22 
     23 class WaitSet::State : public base::RefCountedThreadSafe<State> {
     24  public:
     25   State()
     26       : handle_event_(base::WaitableEvent::ResetPolicy::MANUAL,
     27                       base::WaitableEvent::InitialState::NOT_SIGNALED) {
     28     MojoResult rv = CreateTrap(&Context::OnNotification, &trap_handle_);
     29     DCHECK_EQ(MOJO_RESULT_OK, rv);
     30   }
     31 
     32   void ShutDown() {
     33     // NOTE: This may immediately invoke Notify for every context.
     34     trap_handle_.reset();
     35 
     36     cancelled_contexts_.clear();
     37   }
     38 
     39   MojoResult AddEvent(base::WaitableEvent* event) {
     40     auto result = user_events_.insert(event);
     41     if (result.second)
     42       return MOJO_RESULT_OK;
     43     return MOJO_RESULT_ALREADY_EXISTS;
     44   }
     45 
     46   MojoResult RemoveEvent(base::WaitableEvent* event) {
     47     auto it = user_events_.find(event);
     48     if (it == user_events_.end())
     49       return MOJO_RESULT_NOT_FOUND;
     50     user_events_.erase(it);
     51     return MOJO_RESULT_OK;
     52   }
     53 
     54   MojoResult AddHandle(Handle handle, MojoHandleSignals signals) {
     55     DCHECK(trap_handle_.is_valid());
     56 
     57     scoped_refptr<Context> context = new Context(this, handle);
     58 
     59     {
     60       base::AutoLock lock(lock_);
     61 
     62       if (handle_to_context_.count(handle))
     63         return MOJO_RESULT_ALREADY_EXISTS;
     64       DCHECK(!contexts_.count(context->context_value()));
     65 
     66       handle_to_context_[handle] = context;
     67       contexts_[context->context_value()] = context;
     68     }
     69 
     70     // Balanced in State::Notify() with MOJO_RESULT_CANCELLED if
     71     // MojoAddTrigger() succeeds. Otherwise balanced immediately below.
     72     context->AddRef();
     73 
     74     // This can notify immediately if the watcher is already armed. Don't hold
     75     // |lock_| while calling it.
     76     MojoResult rv =
     77         MojoAddTrigger(trap_handle_.get().value(), handle.value(), signals,
     78                        MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
     79                        context->context_value(), nullptr);
     80     if (rv == MOJO_RESULT_INVALID_ARGUMENT) {
     81       base::AutoLock lock(lock_);
     82       handle_to_context_.erase(handle);
     83       contexts_.erase(context->context_value());
     84 
     85       // Balanced above.
     86       context->Release();
     87       return rv;
     88     }
     89     DCHECK_EQ(MOJO_RESULT_OK, rv);
     90 
     91     return rv;
     92   }
     93 
     94   MojoResult RemoveHandle(Handle handle) {
     95     DCHECK(trap_handle_.is_valid());
     96 
     97     scoped_refptr<Context> context;
     98     {
     99       base::AutoLock lock(lock_);
    100 
    101       // Always clear |cancelled_contexts_| in case it's accumulated any more
    102       // entries since the last time we ran.
    103       cancelled_contexts_.clear();
    104 
    105       auto it = handle_to_context_.find(handle);
    106       if (it == handle_to_context_.end())
    107         return MOJO_RESULT_NOT_FOUND;
    108 
    109       context = std::move(it->second);
    110       handle_to_context_.erase(it);
    111 
    112       // Ensure that we never return this handle as a ready result again. Note
    113       // that it's removal from |handle_to_context_| above ensures it will never
    114       // be added back to this map.
    115       ready_handles_.erase(handle);
    116     }
    117 
    118     // NOTE: This may enter the notification callback immediately, so don't hold
    119     // |lock_| while calling it.
    120     MojoResult rv = MojoRemoveTrigger(trap_handle_.get().value(),
    121                                       context->context_value(), nullptr);
    122 
    123     // We don't really care whether or not this succeeds. In either case, the
    124     // context was or will imminently be cancelled and moved from |contexts_|
    125     // to |cancelled_contexts_|.
    126     DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_NOT_FOUND);
    127 
    128     return rv;
    129   }
    130 
    131   void Wait(base::WaitableEvent** ready_event,
    132             size_t* num_ready_handles,
    133             Handle* ready_handles,
    134             MojoResult* ready_results,
    135             MojoHandleSignalsState* signals_states) {
    136     DCHECK(trap_handle_.is_valid());
    137     DCHECK(num_ready_handles);
    138     DCHECK(ready_handles);
    139     DCHECK(ready_results);
    140     {
    141       base::AutoLock lock(lock_);
    142       if (ready_handles_.empty()) {
    143         // No handles are currently in the ready set. Make sure the event is
    144         // reset and try to arm the watcher.
    145         handle_event_.Reset();
    146 
    147         DCHECK_LE(*num_ready_handles, std::numeric_limits<uint32_t>::max());
    148         uint32_t num_blocking_events =
    149             static_cast<uint32_t>(*num_ready_handles);
    150 
    151         base::StackVector<MojoTrapEvent, 4> blocking_events;
    152         blocking_events.container().resize(num_blocking_events);
    153         for (size_t i = 0; i < num_blocking_events; ++i) {
    154           blocking_events.container()[i].struct_size =
    155               sizeof(blocking_events.container()[i]);
    156         }
    157         MojoResult rv = MojoArmTrap(trap_handle_.get().value(), nullptr,
    158                                     &num_blocking_events,
    159                                     blocking_events.container().data());
    160 
    161         if (rv == MOJO_RESULT_FAILED_PRECONDITION) {
    162           // Simulate the handles becoming ready. We do this in lieu of
    163           // returning the results immediately so as to avoid potentially
    164           // starving user events. i.e., we always want to call WaitMany()
    165           // below.
    166           handle_event_.Signal();
    167           for (size_t i = 0; i < num_blocking_events; ++i) {
    168             const auto& event = blocking_events.container()[i];
    169             auto it = contexts_.find(event.trigger_context);
    170             DCHECK(it != contexts_.end());
    171             ready_handles_[it->second->handle()] = {event.result,
    172                                                     event.signals_state};
    173           }
    174         } else if (rv == MOJO_RESULT_NOT_FOUND) {
    175           // Nothing to watch. If there are no user events, always signal to
    176           // avoid deadlock.
    177           if (user_events_.empty())
    178             handle_event_.Signal();
    179         } else {
    180           // Watcher must be armed now. No need to manually signal.
    181           DCHECK_EQ(MOJO_RESULT_OK, rv);
    182         }
    183       }
    184     }
    185 
    186     // Build a local contiguous array of events to wait on. These are rotated
    187     // across Wait() calls to avoid starvation, by virtue of the fact that
    188     // WaitMany guarantees left-to-right priority when multiple events are
    189     // signaled.
    190 
    191     base::StackVector<base::WaitableEvent*, 4> events;
    192     events.container().resize(user_events_.size() + 1);
    193     if (waitable_index_shift_ > user_events_.size())
    194       waitable_index_shift_ = 0;
    195 
    196     size_t dest_index = waitable_index_shift_++;
    197     events.container()[dest_index] = &handle_event_;
    198     for (auto* e : user_events_) {
    199       dest_index = (dest_index + 1) % events.container().size();
    200       events.container()[dest_index] = e;
    201     }
    202 
    203     size_t index = base::WaitableEvent::WaitMany(events.container().data(),
    204                                                  events.container().size());
    205     base::AutoLock lock(lock_);
    206 
    207     // Pop as many handles as we can out of the ready set and return them. Note
    208     // that we do this regardless of which event signaled, as there may be
    209     // ready handles in any case and they may be interesting to the caller.
    210     *num_ready_handles = std::min(*num_ready_handles, ready_handles_.size());
    211     for (size_t i = 0; i < *num_ready_handles; ++i) {
    212       auto it = ready_handles_.begin();
    213       ready_handles[i] = it->first;
    214       ready_results[i] = it->second.result;
    215       if (signals_states)
    216         signals_states[i] = it->second.signals_state;
    217       ready_handles_.erase(it);
    218     }
    219 
    220     // If the caller cares, let them know which user event unblocked us, if any.
    221     if (ready_event) {
    222       if (events.container()[index] == &handle_event_)
    223         *ready_event = nullptr;
    224       else
    225         *ready_event = events.container()[index];
    226     }
    227   }
    228 
    229  private:
    230   friend class base::RefCountedThreadSafe<State>;
    231 
    232   class Context : public base::RefCountedThreadSafe<Context> {
    233    public:
    234     Context(scoped_refptr<State> state, Handle handle)
    235         : state_(state), handle_(handle) {}
    236 
    237     Handle handle() const { return handle_; }
    238 
    239     uintptr_t context_value() const {
    240       return reinterpret_cast<uintptr_t>(this);
    241     }
    242 
    243     static void OnNotification(const MojoTrapEvent* event) {
    244       reinterpret_cast<Context*>(event->trigger_context)
    245           ->Notify(event->result, event->signals_state);
    246     }
    247 
    248    private:
    249     friend class base::RefCountedThreadSafe<Context>;
    250 
    251     ~Context() {}
    252 
    253     void Notify(MojoResult result, MojoHandleSignalsState signals_state) {
    254       state_->Notify(handle_, result, signals_state, this);
    255     }
    256 
    257     const scoped_refptr<State> state_;
    258     const Handle handle_;
    259 
    260     DISALLOW_COPY_AND_ASSIGN(Context);
    261   };
    262 
    263   ~State() {}
    264 
    265   void Notify(Handle handle,
    266               MojoResult result,
    267               MojoHandleSignalsState signals_state,
    268               Context* context) {
    269     base::AutoLock lock(lock_);
    270 
    271     // This notification may have raced with RemoveHandle() from another
    272     // sequence. We only signal the WaitSet if that's not the case.
    273     if (handle_to_context_.count(handle)) {
    274       ready_handles_[handle] = {result, signals_state};
    275       handle_event_.Signal();
    276     }
    277 
    278     // Whether it's an implicit or explicit cancellation, erase from |contexts_|
    279     // and append to |cancelled_contexts_|.
    280     if (result == MOJO_RESULT_CANCELLED) {
    281       contexts_.erase(context->context_value());
    282       handle_to_context_.erase(handle);
    283 
    284       // NOTE: We retain a context ref in |cancelled_contexts_| to ensure that
    285       // this Context's heap address is not reused too soon. For example, it
    286       // would otherwise be possible for the user to call AddHandle() from the
    287       // WaitSet's sequence immediately after this notification has fired on
    288       // another sequence, potentially reusing the same heap address for the
    289       // newly added Context; and then they may call RemoveHandle() for this
    290       // handle (not knowing its context has just been implicitly cancelled) and
    291       // cause the new Context to be incorrectly removed from |contexts_|.
    292       //
    293       // This vector is cleared on the WaitSet's own sequence every time
    294       // RemoveHandle is called.
    295       cancelled_contexts_.emplace_back(base::WrapRefCounted(context));
    296 
    297       // Balanced in State::AddHandle().
    298       context->Release();
    299     }
    300   }
    301 
    302   struct ReadyState {
    303     ReadyState() = default;
    304     ReadyState(MojoResult result, MojoHandleSignalsState signals_state)
    305         : result(result), signals_state(signals_state) {}
    306     ~ReadyState() = default;
    307 
    308     MojoResult result = MOJO_RESULT_UNKNOWN;
    309     MojoHandleSignalsState signals_state = {0, 0};
    310   };
    311 
    312   // Not guarded by lock. Must only be accessed from the WaitSet's owning
    313   // sequence.
    314   ScopedTrapHandle trap_handle_;
    315 
    316   base::Lock lock_;
    317   std::map<uintptr_t, scoped_refptr<Context>> contexts_;
    318   std::map<Handle, scoped_refptr<Context>> handle_to_context_;
    319   std::map<Handle, ReadyState> ready_handles_;
    320   std::vector<scoped_refptr<Context>> cancelled_contexts_;
    321   std::set<base::WaitableEvent*> user_events_;
    322 
    323   // Event signaled any time a handle notification is received.
    324   base::WaitableEvent handle_event_;
    325 
    326   // Offset by which to rotate the current set of waitable objects. This is used
    327   // to guard against event starvation, as base::WaitableEvent::WaitMany gives
    328   // preference to events in left-to-right order.
    329   size_t waitable_index_shift_ = 0;
    330 
    331   DISALLOW_COPY_AND_ASSIGN(State);
    332 };
    333 
    334 WaitSet::WaitSet() : state_(new State) {}
    335 
    336 WaitSet::~WaitSet() {
    337   state_->ShutDown();
    338 }
    339 
    340 MojoResult WaitSet::AddEvent(base::WaitableEvent* event) {
    341   return state_->AddEvent(event);
    342 }
    343 
    344 MojoResult WaitSet::RemoveEvent(base::WaitableEvent* event) {
    345   return state_->RemoveEvent(event);
    346 }
    347 
    348 MojoResult WaitSet::AddHandle(Handle handle, MojoHandleSignals signals) {
    349   return state_->AddHandle(handle, signals);
    350 }
    351 
    352 MojoResult WaitSet::RemoveHandle(Handle handle) {
    353   return state_->RemoveHandle(handle);
    354 }
    355 
    356 void WaitSet::Wait(base::WaitableEvent** ready_event,
    357                    size_t* num_ready_handles,
    358                    Handle* ready_handles,
    359                    MojoResult* ready_results,
    360                    MojoHandleSignalsState* signals_states) {
    361   state_->Wait(ready_event, num_ready_handles, ready_handles, ready_results,
    362                signals_states);
    363 }
    364 
    365 }  // namespace mojo
    366