Home | History | Annotate | Download | only in lib
      1 // Copyright 2015 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
      6 
      7 #include <stdint.h>
      8 
      9 #include <utility>
     10 
     11 #include "base/bind.h"
     12 #include "base/location.h"
     13 #include "base/macros.h"
     14 #include "base/memory/ptr_util.h"
     15 #include "base/sequenced_task_runner.h"
     16 #include "base/stl_util.h"
     17 #include "base/synchronization/waitable_event.h"
     18 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
     19 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
     20 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
     21 #include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
     22 
     23 namespace mojo {
     24 namespace internal {
     25 
     26 // InterfaceEndpoint stores the information of an interface endpoint registered
     27 // with the router.
     28 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
     29 // this object.
     30 class MultiplexRouter::InterfaceEndpoint
     31     : public base::RefCountedThreadSafe<InterfaceEndpoint>,
     32       public InterfaceEndpointController {
     33  public:
     34   InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
     35       : router_(router),
     36         id_(id),
     37         closed_(false),
     38         peer_closed_(false),
     39         handle_created_(false),
     40         client_(nullptr) {}
     41 
     42   // ---------------------------------------------------------------------------
     43   // The following public methods are safe to call from any sequence without
     44   // locking.
     45 
     46   InterfaceId id() const { return id_; }
     47 
     48   // ---------------------------------------------------------------------------
     49   // The following public methods are called under the router's lock.
     50 
     51   bool closed() const { return closed_; }
     52   void set_closed() {
     53     router_->AssertLockAcquired();
     54     closed_ = true;
     55   }
     56 
     57   bool peer_closed() const { return peer_closed_; }
     58   void set_peer_closed() {
     59     router_->AssertLockAcquired();
     60     peer_closed_ = true;
     61   }
     62 
     63   bool handle_created() const { return handle_created_; }
     64   void set_handle_created() {
     65     router_->AssertLockAcquired();
     66     handle_created_ = true;
     67   }
     68 
     69   const base::Optional<DisconnectReason>& disconnect_reason() const {
     70     return disconnect_reason_;
     71   }
     72   void set_disconnect_reason(
     73       const base::Optional<DisconnectReason>& disconnect_reason) {
     74     router_->AssertLockAcquired();
     75     disconnect_reason_ = disconnect_reason;
     76   }
     77 
     78   base::SequencedTaskRunner* task_runner() const { return task_runner_.get(); }
     79 
     80   InterfaceEndpointClient* client() const { return client_; }
     81 
     82   void AttachClient(InterfaceEndpointClient* client,
     83                     scoped_refptr<base::SequencedTaskRunner> runner) {
     84     router_->AssertLockAcquired();
     85     DCHECK(!client_);
     86     DCHECK(!closed_);
     87     DCHECK(runner->RunsTasksInCurrentSequence());
     88 
     89     task_runner_ = std::move(runner);
     90     client_ = client;
     91   }
     92 
     93   // This method must be called on the same sequence as the corresponding
     94   // AttachClient() call.
     95   void DetachClient() {
     96     router_->AssertLockAcquired();
     97     DCHECK(client_);
     98     DCHECK(task_runner_->RunsTasksInCurrentSequence());
     99     DCHECK(!closed_);
    100 
    101     task_runner_ = nullptr;
    102     client_ = nullptr;
    103     sync_watcher_.reset();
    104   }
    105 
    106   void SignalSyncMessageEvent() {
    107     router_->AssertLockAcquired();
    108     if (sync_message_event_signaled_)
    109       return;
    110     sync_message_event_signaled_ = true;
    111     if (sync_watcher_)
    112       sync_watcher_->SignalEvent();
    113   }
    114 
    115   void ResetSyncMessageSignal() {
    116     router_->AssertLockAcquired();
    117     if (!sync_message_event_signaled_)
    118       return;
    119     sync_message_event_signaled_ = false;
    120     if (sync_watcher_)
    121       sync_watcher_->ResetEvent();
    122   }
    123 
    124   // ---------------------------------------------------------------------------
    125   // The following public methods (i.e., InterfaceEndpointController
    126   // implementation) are called by the client on the same sequence as the
    127   // AttachClient() call. They are called outside of the router's lock.
    128 
    129   bool SendMessage(Message* message) override {
    130     DCHECK(task_runner_->RunsTasksInCurrentSequence());
    131     message->set_interface_id(id_);
    132     return router_->connector_.Accept(message);
    133   }
    134 
    135   void AllowWokenUpBySyncWatchOnSameThread() override {
    136     DCHECK(task_runner_->RunsTasksInCurrentSequence());
    137 
    138     EnsureSyncWatcherExists();
    139     sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
    140   }
    141 
    142   bool SyncWatch(const bool* should_stop) override {
    143     DCHECK(task_runner_->RunsTasksInCurrentSequence());
    144 
    145     EnsureSyncWatcherExists();
    146     return sync_watcher_->SyncWatch(should_stop);
    147   }
    148 
    149  private:
    150   friend class base::RefCountedThreadSafe<InterfaceEndpoint>;
    151 
    152   ~InterfaceEndpoint() override {
    153     router_->AssertLockAcquired();
    154 
    155     DCHECK(!client_);
    156   }
    157 
    158   void OnSyncEventSignaled() {
    159     DCHECK(task_runner_->RunsTasksInCurrentSequence());
    160     scoped_refptr<MultiplexRouter> router_protector(router_);
    161 
    162     MayAutoLock locker(&router_->lock_);
    163     scoped_refptr<InterfaceEndpoint> self_protector(this);
    164 
    165     bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
    166 
    167     if (!more_to_process)
    168       ResetSyncMessageSignal();
    169 
    170     // Currently there are no queued sync messages and the peer has closed so
    171     // there won't be incoming sync messages in the future.
    172     if (!more_to_process && peer_closed_) {
    173       // If a SyncWatch() call (or multiple ones) of this interface endpoint is
    174       // on the call stack, resetting the sync watcher will allow it to exit
    175       // when the call stack unwinds to that frame.
    176       sync_watcher_.reset();
    177     }
    178   }
    179 
    180   void EnsureSyncWatcherExists() {
    181     DCHECK(task_runner_->RunsTasksInCurrentSequence());
    182     if (sync_watcher_)
    183       return;
    184 
    185     MayAutoLock locker(&router_->lock_);
    186     sync_watcher_ =
    187         std::make_unique<SequenceLocalSyncEventWatcher>(base::BindRepeating(
    188             &InterfaceEndpoint::OnSyncEventSignaled, base::Unretained(this)));
    189     if (sync_message_event_signaled_)
    190       sync_watcher_->SignalEvent();
    191   }
    192 
    193   // ---------------------------------------------------------------------------
    194   // The following members are safe to access from any sequence.
    195 
    196   MultiplexRouter* const router_;
    197   const InterfaceId id_;
    198 
    199   // ---------------------------------------------------------------------------
    200   // The following members are accessed under the router's lock.
    201 
    202   // Whether the endpoint has been closed.
    203   bool closed_;
    204   // Whether the peer endpoint has been closed.
    205   bool peer_closed_;
    206 
    207   // Whether there is already a ScopedInterfaceEndpointHandle created for this
    208   // endpoint.
    209   bool handle_created_;
    210 
    211   base::Optional<DisconnectReason> disconnect_reason_;
    212 
    213   // The task runner on which |client_|'s methods can be called.
    214   scoped_refptr<base::SequencedTaskRunner> task_runner_;
    215   // Not owned. It is null if no client is attached to this endpoint.
    216   InterfaceEndpointClient* client_;
    217 
    218   // Indicates whether the sync watcher should be signaled for this endpoint.
    219   bool sync_message_event_signaled_ = false;
    220 
    221   // Guarded by the router's lock. Used to synchronously wait on replies.
    222   std::unique_ptr<SequenceLocalSyncEventWatcher> sync_watcher_;
    223 
    224   DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
    225 };
    226 
    227 // MessageWrapper objects are always destroyed under the router's lock. On
    228 // destruction, if the message it wrappers contains interface IDs, the wrapper
    229 // closes the corresponding endpoints.
    230 class MultiplexRouter::MessageWrapper {
    231  public:
    232   MessageWrapper() = default;
    233 
    234   MessageWrapper(MultiplexRouter* router, Message message)
    235       : router_(router), value_(std::move(message)) {}
    236 
    237   MessageWrapper(MessageWrapper&& other)
    238       : router_(other.router_), value_(std::move(other.value_)) {}
    239 
    240   ~MessageWrapper() {
    241     if (!router_ || value_.IsNull())
    242       return;
    243 
    244     router_->AssertLockAcquired();
    245     // Don't try to close the endpoints if at this point the router is already
    246     // half-destructed.
    247     if (!router_->being_destructed_)
    248       router_->CloseEndpointsForMessage(value_);
    249   }
    250 
    251   MessageWrapper& operator=(MessageWrapper&& other) {
    252     router_ = other.router_;
    253     value_ = std::move(other.value_);
    254     return *this;
    255   }
    256 
    257   const Message& value() const { return value_; }
    258 
    259   // Must be called outside of the router's lock.
    260   // Returns a null message if it fails to deseralize the associated endpoint
    261   // handles.
    262   Message DeserializeEndpointHandlesAndTake() {
    263     if (!value_.DeserializeAssociatedEndpointHandles(router_)) {
    264       // The previous call may have deserialized part of the associated
    265       // interface endpoint handles. They must be destroyed outside of the
    266       // router's lock, so we cannot wait until destruction of MessageWrapper.
    267       value_.Reset();
    268       return Message();
    269     }
    270     return std::move(value_);
    271   }
    272 
    273  private:
    274   MultiplexRouter* router_ = nullptr;
    275   Message value_;
    276 
    277   DISALLOW_COPY_AND_ASSIGN(MessageWrapper);
    278 };
    279 
    280 struct MultiplexRouter::Task {
    281  public:
    282   // Doesn't take ownership of |message| but takes its contents.
    283   static std::unique_ptr<Task> CreateMessageTask(
    284       MessageWrapper message_wrapper) {
    285     Task* task = new Task(MESSAGE);
    286     task->message_wrapper = std::move(message_wrapper);
    287     return base::WrapUnique(task);
    288   }
    289   static std::unique_ptr<Task> CreateNotifyErrorTask(
    290       InterfaceEndpoint* endpoint) {
    291     Task* task = new Task(NOTIFY_ERROR);
    292     task->endpoint_to_notify = endpoint;
    293     return base::WrapUnique(task);
    294   }
    295 
    296   ~Task() {}
    297 
    298   bool IsMessageTask() const { return type == MESSAGE; }
    299   bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
    300 
    301   MessageWrapper message_wrapper;
    302   scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
    303 
    304   enum Type { MESSAGE, NOTIFY_ERROR };
    305   Type type;
    306 
    307  private:
    308   explicit Task(Type in_type) : type(in_type) {}
    309 
    310   DISALLOW_COPY_AND_ASSIGN(Task);
    311 };
    312 
    313 MultiplexRouter::MultiplexRouter(
    314     ScopedMessagePipeHandle message_pipe,
    315     Config config,
    316     bool set_interface_id_namesapce_bit,
    317     scoped_refptr<base::SequencedTaskRunner> runner)
    318     : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
    319       task_runner_(runner),
    320       filters_(this),
    321       connector_(std::move(message_pipe),
    322                  config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND
    323                                            : Connector::SINGLE_THREADED_SEND,
    324                  std::move(runner)),
    325       control_message_handler_(this),
    326       control_message_proxy_(&connector_) {
    327   DCHECK(task_runner_->RunsTasksInCurrentSequence());
    328 
    329   if (config == MULTI_INTERFACE)
    330     lock_.emplace();
    331 
    332   if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS ||
    333       config == MULTI_INTERFACE) {
    334     // Always participate in sync handle watching in multi-interface mode,
    335     // because even if it doesn't expect sync requests during sync handle
    336     // watching, it may still need to dispatch messages to associated endpoints
    337     // on a different sequence.
    338     connector_.AllowWokenUpBySyncWatchOnSameThread();
    339   }
    340   connector_.set_incoming_receiver(&filters_);
    341   connector_.set_connection_error_handler(base::Bind(
    342       &MultiplexRouter::OnPipeConnectionError, base::Unretained(this)));
    343 
    344   std::unique_ptr<MessageHeaderValidator> header_validator =
    345       std::make_unique<MessageHeaderValidator>();
    346   header_validator_ = header_validator.get();
    347   filters_.Append(std::move(header_validator));
    348 }
    349 
    350 MultiplexRouter::~MultiplexRouter() {
    351   MayAutoLock locker(&lock_);
    352 
    353   being_destructed_ = true;
    354 
    355   sync_message_tasks_.clear();
    356   tasks_.clear();
    357   endpoints_.clear();
    358 }
    359 
    360 void MultiplexRouter::AddIncomingMessageFilter(
    361     std::unique_ptr<MessageReceiver> filter) {
    362   filters_.Append(std::move(filter));
    363 }
    364 
    365 void MultiplexRouter::SetMasterInterfaceName(const char* name) {
    366   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    367   header_validator_->SetDescription(std::string(name) +
    368                                     " [master] MessageHeaderValidator");
    369   control_message_handler_.SetDescription(
    370       std::string(name) + " [master] PipeControlMessageHandler");
    371   connector_.SetWatcherHeapProfilerTag(name);
    372 }
    373 
    374 InterfaceId MultiplexRouter::AssociateInterface(
    375     ScopedInterfaceEndpointHandle handle_to_send) {
    376   if (!handle_to_send.pending_association())
    377     return kInvalidInterfaceId;
    378 
    379   uint32_t id = 0;
    380   {
    381     MayAutoLock locker(&lock_);
    382     do {
    383       if (next_interface_id_value_ >= kInterfaceIdNamespaceMask)
    384         next_interface_id_value_ = 1;
    385       id = next_interface_id_value_++;
    386       if (set_interface_id_namespace_bit_)
    387         id |= kInterfaceIdNamespaceMask;
    388     } while (base::ContainsKey(endpoints_, id));
    389 
    390     InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
    391     endpoints_[id] = endpoint;
    392     if (encountered_error_)
    393       UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
    394     endpoint->set_handle_created();
    395   }
    396 
    397   if (!NotifyAssociation(&handle_to_send, id)) {
    398     // The peer handle of |handle_to_send|, which is supposed to join this
    399     // associated group, has been closed.
    400     {
    401       MayAutoLock locker(&lock_);
    402       InterfaceEndpoint* endpoint = FindEndpoint(id);
    403       if (endpoint)
    404         UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
    405     }
    406 
    407     control_message_proxy_.NotifyPeerEndpointClosed(
    408         id, handle_to_send.disconnect_reason());
    409   }
    410   return id;
    411 }
    412 
    413 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
    414     InterfaceId id) {
    415   if (!IsValidInterfaceId(id))
    416     return ScopedInterfaceEndpointHandle();
    417 
    418   MayAutoLock locker(&lock_);
    419   bool inserted = false;
    420   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
    421   if (inserted) {
    422     if (encountered_error_)
    423       UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
    424   } else {
    425     if (endpoint->handle_created() || endpoint->closed())
    426       return ScopedInterfaceEndpointHandle();
    427   }
    428 
    429   endpoint->set_handle_created();
    430   return CreateScopedInterfaceEndpointHandle(id);
    431 }
    432 
    433 void MultiplexRouter::CloseEndpointHandle(
    434     InterfaceId id,
    435     const base::Optional<DisconnectReason>& reason) {
    436   if (!IsValidInterfaceId(id))
    437     return;
    438 
    439   MayAutoLock locker(&lock_);
    440   DCHECK(base::ContainsKey(endpoints_, id));
    441   InterfaceEndpoint* endpoint = endpoints_[id].get();
    442   DCHECK(!endpoint->client());
    443   DCHECK(!endpoint->closed());
    444   UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
    445 
    446   if (!IsMasterInterfaceId(id) || reason) {
    447     MayAutoUnlock unlocker(&lock_);
    448     control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
    449   }
    450 
    451   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
    452 }
    453 
    454 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
    455     const ScopedInterfaceEndpointHandle& handle,
    456     InterfaceEndpointClient* client,
    457     scoped_refptr<base::SequencedTaskRunner> runner) {
    458   const InterfaceId id = handle.id();
    459 
    460   DCHECK(IsValidInterfaceId(id));
    461   DCHECK(client);
    462 
    463   MayAutoLock locker(&lock_);
    464   DCHECK(base::ContainsKey(endpoints_, id));
    465 
    466   InterfaceEndpoint* endpoint = endpoints_[id].get();
    467   endpoint->AttachClient(client, std::move(runner));
    468 
    469   if (endpoint->peer_closed())
    470     tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
    471   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
    472 
    473   return endpoint;
    474 }
    475 
    476 void MultiplexRouter::DetachEndpointClient(
    477     const ScopedInterfaceEndpointHandle& handle) {
    478   const InterfaceId id = handle.id();
    479 
    480   DCHECK(IsValidInterfaceId(id));
    481 
    482   MayAutoLock locker(&lock_);
    483   DCHECK(base::ContainsKey(endpoints_, id));
    484 
    485   InterfaceEndpoint* endpoint = endpoints_[id].get();
    486   endpoint->DetachClient();
    487 }
    488 
    489 void MultiplexRouter::RaiseError() {
    490   if (task_runner_->RunsTasksInCurrentSequence()) {
    491     connector_.RaiseError();
    492   } else {
    493     task_runner_->PostTask(FROM_HERE,
    494                            base::Bind(&MultiplexRouter::RaiseError, this));
    495   }
    496 }
    497 
    498 bool MultiplexRouter::PrefersSerializedMessages() {
    499   MayAutoLock locker(&lock_);
    500   return connector_.PrefersSerializedMessages();
    501 }
    502 
    503 void MultiplexRouter::CloseMessagePipe() {
    504   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    505   connector_.CloseMessagePipe();
    506   // CloseMessagePipe() above won't trigger connection error handler.
    507   // Explicitly call OnPipeConnectionError() so that associated endpoints will
    508   // get notified.
    509   OnPipeConnectionError();
    510 }
    511 
    512 void MultiplexRouter::PauseIncomingMethodCallProcessing() {
    513   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    514   connector_.PauseIncomingMethodCallProcessing();
    515 
    516   MayAutoLock locker(&lock_);
    517   paused_ = true;
    518 
    519   for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter)
    520     iter->second->ResetSyncMessageSignal();
    521 }
    522 
    523 void MultiplexRouter::ResumeIncomingMethodCallProcessing() {
    524   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    525   connector_.ResumeIncomingMethodCallProcessing();
    526 
    527   MayAutoLock locker(&lock_);
    528   paused_ = false;
    529 
    530   for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) {
    531     auto sync_iter = sync_message_tasks_.find(iter->first);
    532     if (iter->second->peer_closed() ||
    533         (sync_iter != sync_message_tasks_.end() &&
    534          !sync_iter->second.empty())) {
    535       iter->second->SignalSyncMessageEvent();
    536     }
    537   }
    538 
    539   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
    540 }
    541 
    542 bool MultiplexRouter::HasAssociatedEndpoints() const {
    543   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    544   MayAutoLock locker(&lock_);
    545 
    546   if (endpoints_.size() > 1)
    547     return true;
    548   if (endpoints_.size() == 0)
    549     return false;
    550 
    551   return !base::ContainsKey(endpoints_, kMasterInterfaceId);
    552 }
    553 
    554 void MultiplexRouter::EnableTestingMode() {
    555   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    556   MayAutoLock locker(&lock_);
    557 
    558   testing_mode_ = true;
    559   connector_.set_enforce_errors_from_incoming_receiver(false);
    560 }
    561 
    562 bool MultiplexRouter::Accept(Message* message) {
    563   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    564 
    565   // Insert endpoints for the payload interface IDs as soon as the message
    566   // arrives, instead of waiting till the message is dispatched. Consider the
    567   // following sequence:
    568   // 1) Async message msg1 arrives, containing interface ID x. Msg1 is not
    569   //    dispatched because a sync call is blocking the thread.
    570   // 2) Sync message msg2 arrives targeting interface ID x.
    571   //
    572   // If we don't insert endpoint for interface ID x, when trying to dispatch
    573   // msg2 we don't know whether it is an unexpected message or it is just
    574   // because the message containing x hasn't been dispatched.
    575   if (!InsertEndpointsForMessage(*message))
    576     return false;
    577 
    578   scoped_refptr<MultiplexRouter> protector(this);
    579   MayAutoLock locker(&lock_);
    580 
    581   DCHECK(!paused_);
    582 
    583   ClientCallBehavior client_call_behavior =
    584       connector_.during_sync_handle_watcher_callback()
    585           ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
    586           : ALLOW_DIRECT_CLIENT_CALLS;
    587 
    588   MessageWrapper message_wrapper(this, std::move(*message));
    589   bool processed = tasks_.empty() && ProcessIncomingMessage(
    590                                          &message_wrapper, client_call_behavior,
    591                                          connector_.task_runner());
    592 
    593   if (!processed) {
    594     // Either the task queue is not empty or we cannot process the message
    595     // directly. In both cases, there is no need to call ProcessTasks().
    596     tasks_.push_back(Task::CreateMessageTask(std::move(message_wrapper)));
    597     Task* task = tasks_.back().get();
    598 
    599     if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) {
    600       InterfaceId id = task->message_wrapper.value().interface_id();
    601       sync_message_tasks_[id].push_back(task);
    602       InterfaceEndpoint* endpoint = FindEndpoint(id);
    603       if (endpoint)
    604         endpoint->SignalSyncMessageEvent();
    605     }
    606   } else if (!tasks_.empty()) {
    607     // Processing the message may result in new tasks (for error notification)
    608     // being added to the queue. In this case, we have to attempt to process the
    609     // tasks.
    610     ProcessTasks(client_call_behavior, connector_.task_runner());
    611   }
    612 
    613   // Always return true. If we see errors during message processing, we will
    614   // explicitly call Connector::RaiseError() to disconnect the message pipe.
    615   return true;
    616 }
    617 
    618 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(
    619     InterfaceId id,
    620     const base::Optional<DisconnectReason>& reason) {
    621   MayAutoLock locker(&lock_);
    622   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
    623 
    624   if (reason)
    625     endpoint->set_disconnect_reason(reason);
    626 
    627   // It is possible that this endpoint has been set as peer closed. That is
    628   // because when the message pipe is closed, all the endpoints are updated with
    629   // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue,
    630   // as long as there are refs keeping the router alive. If there is a
    631   // PeerAssociatedEndpointClosedEvent control message in the queue, we will get
    632   // here and see that the endpoint has been marked as peer closed.
    633   if (!endpoint->peer_closed()) {
    634     if (endpoint->client())
    635       tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
    636     UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
    637   }
    638 
    639   // No need to trigger a ProcessTasks() because it is already on the stack.
    640 
    641   return true;
    642 }
    643 
    644 void MultiplexRouter::OnPipeConnectionError() {
    645   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    646 
    647   scoped_refptr<MultiplexRouter> protector(this);
    648   MayAutoLock locker(&lock_);
    649 
    650   encountered_error_ = true;
    651 
    652   // Calling UpdateEndpointStateMayRemove() may remove the corresponding value
    653   // from |endpoints_| and invalidate any iterator of |endpoints_|. Therefore,
    654   // copy the endpoint pointers to a vector and iterate over it instead.
    655   std::vector<scoped_refptr<InterfaceEndpoint>> endpoint_vector;
    656   endpoint_vector.reserve(endpoints_.size());
    657   for (const auto& pair : endpoints_)
    658     endpoint_vector.push_back(pair.second);
    659 
    660   for (const auto& endpoint : endpoint_vector) {
    661     if (endpoint->client())
    662       tasks_.push_back(Task::CreateNotifyErrorTask(endpoint.get()));
    663 
    664     UpdateEndpointStateMayRemove(endpoint.get(), PEER_ENDPOINT_CLOSED);
    665   }
    666 
    667   ProcessTasks(connector_.during_sync_handle_watcher_callback()
    668                    ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
    669                    : ALLOW_DIRECT_CLIENT_CALLS,
    670                connector_.task_runner());
    671 }
    672 
    673 void MultiplexRouter::ProcessTasks(
    674     ClientCallBehavior client_call_behavior,
    675     base::SequencedTaskRunner* current_task_runner) {
    676   AssertLockAcquired();
    677 
    678   if (posted_to_process_tasks_)
    679     return;
    680 
    681   while (!tasks_.empty() && !paused_) {
    682     std::unique_ptr<Task> task(std::move(tasks_.front()));
    683     tasks_.pop_front();
    684 
    685     InterfaceId id = kInvalidInterfaceId;
    686     bool sync_message =
    687         task->IsMessageTask() && !task->message_wrapper.value().IsNull() &&
    688         task->message_wrapper.value().has_flag(Message::kFlagIsSync);
    689     if (sync_message) {
    690       id = task->message_wrapper.value().interface_id();
    691       auto& sync_message_queue = sync_message_tasks_[id];
    692       DCHECK_EQ(task.get(), sync_message_queue.front());
    693       sync_message_queue.pop_front();
    694     }
    695 
    696     bool processed =
    697         task->IsNotifyErrorTask()
    698             ? ProcessNotifyErrorTask(task.get(), client_call_behavior,
    699                                      current_task_runner)
    700             : ProcessIncomingMessage(&task->message_wrapper,
    701                                      client_call_behavior, current_task_runner);
    702 
    703     if (!processed) {
    704       if (sync_message) {
    705         auto& sync_message_queue = sync_message_tasks_[id];
    706         sync_message_queue.push_front(task.get());
    707       }
    708       tasks_.push_front(std::move(task));
    709       break;
    710     } else {
    711       if (sync_message) {
    712         auto iter = sync_message_tasks_.find(id);
    713         if (iter != sync_message_tasks_.end() && iter->second.empty())
    714           sync_message_tasks_.erase(iter);
    715       }
    716     }
    717   }
    718 }
    719 
    720 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
    721   AssertLockAcquired();
    722 
    723   auto iter = sync_message_tasks_.find(id);
    724   if (iter == sync_message_tasks_.end())
    725     return false;
    726 
    727   if (paused_)
    728     return true;
    729 
    730   MultiplexRouter::Task* task = iter->second.front();
    731   iter->second.pop_front();
    732 
    733   DCHECK(task->IsMessageTask());
    734   MessageWrapper message_wrapper = std::move(task->message_wrapper);
    735 
    736   // Note: after this call, |task| and |iter| may be invalidated.
    737   bool processed = ProcessIncomingMessage(
    738       &message_wrapper, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr);
    739   DCHECK(processed);
    740 
    741   iter = sync_message_tasks_.find(id);
    742   if (iter == sync_message_tasks_.end())
    743     return false;
    744 
    745   if (iter->second.empty()) {
    746     sync_message_tasks_.erase(iter);
    747     return false;
    748   }
    749 
    750   return true;
    751 }
    752 
    753 bool MultiplexRouter::ProcessNotifyErrorTask(
    754     Task* task,
    755     ClientCallBehavior client_call_behavior,
    756     base::SequencedTaskRunner* current_task_runner) {
    757   DCHECK(!current_task_runner ||
    758          current_task_runner->RunsTasksInCurrentSequence());
    759   DCHECK(!paused_);
    760 
    761   AssertLockAcquired();
    762   InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
    763   if (!endpoint->client())
    764     return true;
    765 
    766   if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS ||
    767       endpoint->task_runner() != current_task_runner) {
    768     MaybePostToProcessTasks(endpoint->task_runner());
    769     return false;
    770   }
    771 
    772   DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
    773 
    774   InterfaceEndpointClient* client = endpoint->client();
    775   base::Optional<DisconnectReason> disconnect_reason(
    776       endpoint->disconnect_reason());
    777 
    778   {
    779     // We must unlock before calling into |client| because it may call this
    780     // object within NotifyError(). Holding the lock will lead to deadlock.
    781     //
    782     // It is safe to call into |client| without the lock. Because |client| is
    783     // always accessed on the same sequence, including DetachEndpointClient().
    784     MayAutoUnlock unlocker(&lock_);
    785     client->NotifyError(disconnect_reason);
    786   }
    787   return true;
    788 }
    789 
    790 bool MultiplexRouter::ProcessIncomingMessage(
    791     MessageWrapper* message_wrapper,
    792     ClientCallBehavior client_call_behavior,
    793     base::SequencedTaskRunner* current_task_runner) {
    794   DCHECK(!current_task_runner ||
    795          current_task_runner->RunsTasksInCurrentSequence());
    796   DCHECK(!paused_);
    797   DCHECK(message_wrapper);
    798   AssertLockAcquired();
    799 
    800   const Message* message = &message_wrapper->value();
    801   if (message->IsNull()) {
    802     // This is a sync message and has been processed during sync handle
    803     // watching.
    804     return true;
    805   }
    806 
    807   if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
    808     bool result = false;
    809 
    810     {
    811       MayAutoUnlock unlocker(&lock_);
    812       Message tmp_message =
    813           message_wrapper->DeserializeEndpointHandlesAndTake();
    814       result = !tmp_message.IsNull() &&
    815                control_message_handler_.Accept(&tmp_message);
    816     }
    817 
    818     if (!result)
    819       RaiseErrorInNonTestingMode();
    820 
    821     return true;
    822   }
    823 
    824   InterfaceId id = message->interface_id();
    825   DCHECK(IsValidInterfaceId(id));
    826 
    827   InterfaceEndpoint* endpoint = FindEndpoint(id);
    828   if (!endpoint || endpoint->closed())
    829     return true;
    830 
    831   if (!endpoint->client()) {
    832     // We need to wait until a client is attached in order to dispatch further
    833     // messages.
    834     return false;
    835   }
    836 
    837   bool can_direct_call;
    838   if (message->has_flag(Message::kFlagIsSync)) {
    839     can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
    840                       endpoint->task_runner()->RunsTasksInCurrentSequence();
    841   } else {
    842     can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS &&
    843                       endpoint->task_runner() == current_task_runner;
    844   }
    845 
    846   if (!can_direct_call) {
    847     MaybePostToProcessTasks(endpoint->task_runner());
    848     return false;
    849   }
    850 
    851   DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
    852 
    853   InterfaceEndpointClient* client = endpoint->client();
    854   bool result = false;
    855   {
    856     // We must unlock before calling into |client| because it may call this
    857     // object within HandleIncomingMessage(). Holding the lock will lead to
    858     // deadlock.
    859     //
    860     // It is safe to call into |client| without the lock. Because |client| is
    861     // always accessed on the same sequence, including DetachEndpointClient().
    862     MayAutoUnlock unlocker(&lock_);
    863     Message tmp_message = message_wrapper->DeserializeEndpointHandlesAndTake();
    864     result =
    865         !tmp_message.IsNull() && client->HandleIncomingMessage(&tmp_message);
    866   }
    867   if (!result)
    868     RaiseErrorInNonTestingMode();
    869 
    870   return true;
    871 }
    872 
    873 void MultiplexRouter::MaybePostToProcessTasks(
    874     base::SequencedTaskRunner* task_runner) {
    875   AssertLockAcquired();
    876   if (posted_to_process_tasks_)
    877     return;
    878 
    879   posted_to_process_tasks_ = true;
    880   posted_to_task_runner_ = task_runner;
    881   task_runner->PostTask(
    882       FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
    883 }
    884 
    885 void MultiplexRouter::LockAndCallProcessTasks() {
    886   // There is no need to hold a ref to this class in this case because this is
    887   // always called using base::Bind(), which holds a ref.
    888   MayAutoLock locker(&lock_);
    889   posted_to_process_tasks_ = false;
    890   scoped_refptr<base::SequencedTaskRunner> runner(
    891       std::move(posted_to_task_runner_));
    892   ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get());
    893 }
    894 
    895 void MultiplexRouter::UpdateEndpointStateMayRemove(
    896     InterfaceEndpoint* endpoint,
    897     EndpointStateUpdateType type) {
    898   if (type == ENDPOINT_CLOSED) {
    899     endpoint->set_closed();
    900   } else {
    901     endpoint->set_peer_closed();
    902     // If the interface endpoint is performing a sync watch, this makes sure
    903     // it is notified and eventually exits the sync watch.
    904     endpoint->SignalSyncMessageEvent();
    905   }
    906   if (endpoint->closed() && endpoint->peer_closed())
    907     endpoints_.erase(endpoint->id());
    908 }
    909 
    910 void MultiplexRouter::RaiseErrorInNonTestingMode() {
    911   AssertLockAcquired();
    912   if (!testing_mode_)
    913     RaiseError();
    914 }
    915 
    916 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
    917     InterfaceId id,
    918     bool* inserted) {
    919   AssertLockAcquired();
    920   // Either |inserted| is nullptr or it points to a boolean initialized as
    921   // false.
    922   DCHECK(!inserted || !*inserted);
    923 
    924   InterfaceEndpoint* endpoint = FindEndpoint(id);
    925   if (!endpoint) {
    926     endpoint = new InterfaceEndpoint(this, id);
    927     endpoints_[id] = endpoint;
    928     if (inserted)
    929       *inserted = true;
    930   }
    931 
    932   return endpoint;
    933 }
    934 
    935 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindEndpoint(
    936     InterfaceId id) {
    937   AssertLockAcquired();
    938   auto iter = endpoints_.find(id);
    939   return iter != endpoints_.end() ? iter->second.get() : nullptr;
    940 }
    941 
    942 void MultiplexRouter::AssertLockAcquired() {
    943 #if DCHECK_IS_ON()
    944   if (lock_)
    945     lock_->AssertAcquired();
    946 #endif
    947 }
    948 
    949 bool MultiplexRouter::InsertEndpointsForMessage(const Message& message) {
    950   if (!message.is_serialized())
    951     return true;
    952 
    953   uint32_t num_ids = message.payload_num_interface_ids();
    954   if (num_ids == 0)
    955     return true;
    956 
    957   const uint32_t* ids = message.payload_interface_ids();
    958 
    959   MayAutoLock locker(&lock_);
    960   for (uint32_t i = 0; i < num_ids; ++i) {
    961     // Message header validation already ensures that the IDs are valid and not
    962     // the master ID.
    963     // The IDs are from the remote side and therefore their namespace bit is
    964     // supposed to be different than the value that this router would use.
    965     if (set_interface_id_namespace_bit_ ==
    966         HasInterfaceIdNamespaceBitSet(ids[i])) {
    967       return false;
    968     }
    969 
    970     // It is possible that the endpoint already exists even when the remote side
    971     // is well-behaved: it might have notified us that the peer endpoint has
    972     // closed.
    973     bool inserted = false;
    974     InterfaceEndpoint* endpoint = FindOrInsertEndpoint(ids[i], &inserted);
    975     if (endpoint->closed() || endpoint->handle_created())
    976       return false;
    977   }
    978 
    979   return true;
    980 }
    981 
    982 void MultiplexRouter::CloseEndpointsForMessage(const Message& message) {
    983   AssertLockAcquired();
    984 
    985   if (!message.is_serialized())
    986     return;
    987 
    988   uint32_t num_ids = message.payload_num_interface_ids();
    989   if (num_ids == 0)
    990     return;
    991 
    992   const uint32_t* ids = message.payload_interface_ids();
    993   for (uint32_t i = 0; i < num_ids; ++i) {
    994     InterfaceEndpoint* endpoint = FindEndpoint(ids[i]);
    995     // If the remote side maliciously sends the same interface ID in another
    996     // message which has been dispatched, we could get here with no endpoint
    997     // for the ID, a closed endpoint, or an endpoint with handle created.
    998     if (!endpoint || endpoint->closed() || endpoint->handle_created()) {
    999       RaiseErrorInNonTestingMode();
   1000       continue;
   1001     }
   1002 
   1003     UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
   1004     MayAutoUnlock unlocker(&lock_);
   1005     control_message_proxy_.NotifyPeerEndpointClosed(ids[i], base::nullopt);
   1006   }
   1007 
   1008   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
   1009 }
   1010 
   1011 }  // namespace internal
   1012 }  // namespace mojo
   1013