Home | History | Annotate | Download | only in lib
      1 // Copyright 2015 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
      6 
      7 #include <stdint.h>
      8 
      9 #include <utility>
     10 
     11 #include "base/bind.h"
     12 #include "base/location.h"
     13 #include "base/macros.h"
     14 #include "base/memory/ptr_util.h"
     15 #include "base/single_thread_task_runner.h"
     16 #include "base/stl_util.h"
     17 #include "base/threading/thread_task_runner_handle.h"
     18 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
     19 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
     20 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
     21 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
     22 
     23 namespace mojo {
     24 namespace internal {
     25 
     26 // InterfaceEndpoint stores the information of an interface endpoint registered
     27 // with the router.
     28 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
     29 // this object.
     30 class MultiplexRouter::InterfaceEndpoint
     31     : public base::RefCounted<InterfaceEndpoint>,
     32       public InterfaceEndpointController {
     33  public:
     34   InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
     35       : router_(router),
     36         id_(id),
     37         closed_(false),
     38         peer_closed_(false),
     39         handle_created_(false),
     40         client_(nullptr),
     41         event_signalled_(false) {}
     42 
     43   // ---------------------------------------------------------------------------
     44   // The following public methods are safe to call from any threads without
     45   // locking.
     46 
     47   InterfaceId id() const { return id_; }
     48 
     49   // ---------------------------------------------------------------------------
     50   // The following public methods are called under the router's lock.
     51 
     52   bool closed() const { return closed_; }
     53   void set_closed() {
     54     router_->AssertLockAcquired();
     55     closed_ = true;
     56   }
     57 
     58   bool peer_closed() const { return peer_closed_; }
     59   void set_peer_closed() {
     60     router_->AssertLockAcquired();
     61     peer_closed_ = true;
     62   }
     63 
     64   bool handle_created() const { return handle_created_; }
     65   void set_handle_created() {
     66     router_->AssertLockAcquired();
     67     handle_created_ = true;
     68   }
     69 
     70   const base::Optional<DisconnectReason>& disconnect_reason() const {
     71     return disconnect_reason_;
     72   }
     73   void set_disconnect_reason(
     74       const base::Optional<DisconnectReason>& disconnect_reason) {
     75     router_->AssertLockAcquired();
     76     disconnect_reason_ = disconnect_reason;
     77   }
     78 
     79   base::SingleThreadTaskRunner* task_runner() const {
     80     return task_runner_.get();
     81   }
     82 
     83   InterfaceEndpointClient* client() const { return client_; }
     84 
     85   void AttachClient(InterfaceEndpointClient* client,
     86                     scoped_refptr<base::SingleThreadTaskRunner> runner) {
     87     router_->AssertLockAcquired();
     88     DCHECK(!client_);
     89     DCHECK(!closed_);
     90     DCHECK(runner->BelongsToCurrentThread());
     91 
     92     task_runner_ = std::move(runner);
     93     client_ = client;
     94   }
     95 
     96   // This method must be called on the same thread as the corresponding
     97   // AttachClient() call.
     98   void DetachClient() {
     99     router_->AssertLockAcquired();
    100     DCHECK(client_);
    101     DCHECK(task_runner_->BelongsToCurrentThread());
    102     DCHECK(!closed_);
    103 
    104     task_runner_ = nullptr;
    105     client_ = nullptr;
    106     sync_watcher_.reset();
    107   }
    108 
    109   void SignalSyncMessageEvent() {
    110     router_->AssertLockAcquired();
    111     if (event_signalled_)
    112       return;
    113 
    114     event_signalled_ = true;
    115     if (!sync_message_event_sender_.is_valid())
    116       return;
    117 
    118     MojoResult result =
    119         WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr,
    120                         0, MOJO_WRITE_MESSAGE_FLAG_NONE);
    121     DCHECK_EQ(MOJO_RESULT_OK, result);
    122   }
    123 
    124   void ResetSyncMessageSignal() {
    125     router_->AssertLockAcquired();
    126 
    127     if (!event_signalled_)
    128       return;
    129 
    130     event_signalled_ = false;
    131     if (!sync_message_event_receiver_.is_valid())
    132       return;
    133 
    134     MojoResult result =
    135         ReadMessageRaw(sync_message_event_receiver_.get(), nullptr, nullptr,
    136                        nullptr, nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
    137     DCHECK_EQ(MOJO_RESULT_OK, result);
    138   }
    139 
    140   // ---------------------------------------------------------------------------
    141   // The following public methods (i.e., InterfaceEndpointController
    142   // implementation) are called by the client on the same thread as the
    143   // AttachClient() call. They are called outside of the router's lock.
    144 
    145   bool SendMessage(Message* message) override {
    146     DCHECK(task_runner_->BelongsToCurrentThread());
    147     message->set_interface_id(id_);
    148     return router_->connector_.Accept(message);
    149   }
    150 
    151   void AllowWokenUpBySyncWatchOnSameThread() override {
    152     DCHECK(task_runner_->BelongsToCurrentThread());
    153 
    154     EnsureSyncWatcherExists();
    155     sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
    156   }
    157 
    158   bool SyncWatch(const bool* should_stop) override {
    159     DCHECK(task_runner_->BelongsToCurrentThread());
    160 
    161     EnsureSyncWatcherExists();
    162     return sync_watcher_->SyncWatch(should_stop);
    163   }
    164 
    165  private:
    166   friend class base::RefCounted<InterfaceEndpoint>;
    167 
    168   ~InterfaceEndpoint() override {
    169     router_->AssertLockAcquired();
    170 
    171     DCHECK(!client_);
    172     DCHECK(closed_);
    173     DCHECK(peer_closed_);
    174     DCHECK(!sync_watcher_);
    175   }
    176 
    177   void OnHandleReady(MojoResult result) {
    178     DCHECK(task_runner_->BelongsToCurrentThread());
    179     scoped_refptr<MultiplexRouter> router_protector(router_);
    180 
    181     // Because we never close |sync_message_event_{sender,receiver}_| before
    182     // destruction or set a deadline, |result| should always be MOJO_RESULT_OK.
    183     DCHECK_EQ(MOJO_RESULT_OK, result);
    184 
    185     MayAutoLock locker(&router_->lock_);
    186     scoped_refptr<InterfaceEndpoint> self_protector(this);
    187 
    188     bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
    189 
    190     if (!more_to_process)
    191       ResetSyncMessageSignal();
    192 
    193     // Currently there are no queued sync messages and the peer has closed so
    194     // there won't be incoming sync messages in the future.
    195     if (!more_to_process && peer_closed_) {
    196       // If a SyncWatch() call (or multiple ones) of this interface endpoint is
    197       // on the call stack, resetting the sync watcher will allow it to exit
    198       // when the call stack unwinds to that frame.
    199       sync_watcher_.reset();
    200     }
    201   }
    202 
    203   void EnsureSyncWatcherExists() {
    204     DCHECK(task_runner_->BelongsToCurrentThread());
    205     if (sync_watcher_)
    206       return;
    207 
    208     {
    209       MayAutoLock locker(&router_->lock_);
    210 
    211       if (!sync_message_event_sender_.is_valid()) {
    212         MojoResult result =
    213             CreateMessagePipe(nullptr, &sync_message_event_sender_,
    214                               &sync_message_event_receiver_);
    215         DCHECK_EQ(MOJO_RESULT_OK, result);
    216 
    217         if (event_signalled_) {
    218           // Reset the flag so that SignalSyncMessageEvent() will actually
    219           // signal using the newly-created message pipe.
    220           event_signalled_ = false;
    221           SignalSyncMessageEvent();
    222         }
    223       }
    224     }
    225 
    226     sync_watcher_.reset(new SyncHandleWatcher(
    227         sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE,
    228         base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this))));
    229   }
    230 
    231   // ---------------------------------------------------------------------------
    232   // The following members are safe to access from any threads.
    233 
    234   MultiplexRouter* const router_;
    235   const InterfaceId id_;
    236 
    237   // ---------------------------------------------------------------------------
    238   // The following members are accessed under the router's lock.
    239 
    240   // Whether the endpoint has been closed.
    241   bool closed_;
    242   // Whether the peer endpoint has been closed.
    243   bool peer_closed_;
    244 
    245   // Whether there is already a ScopedInterfaceEndpointHandle created for this
    246   // endpoint.
    247   bool handle_created_;
    248 
    249   base::Optional<DisconnectReason> disconnect_reason_;
    250 
    251   // The task runner on which |client_|'s methods can be called.
    252   scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
    253   // Not owned. It is null if no client is attached to this endpoint.
    254   InterfaceEndpointClient* client_;
    255 
    256   // A message pipe used as an event to signal that sync messages are available.
    257   // The message pipe handles are initialized under the router's lock and remain
    258   // unchanged afterwards. They may be accessed outside of the router's lock
    259   // later.
    260   ScopedMessagePipeHandle sync_message_event_sender_;
    261   ScopedMessagePipeHandle sync_message_event_receiver_;
    262   bool event_signalled_;
    263 
    264   // ---------------------------------------------------------------------------
    265   // The following members are only valid while a client is attached. They are
    266   // used exclusively on the client's thread. They may be accessed outside of
    267   // the router's lock.
    268 
    269   std::unique_ptr<SyncHandleWatcher> sync_watcher_;
    270 
    271   DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
    272 };
    273 
    274 // MessageWrapper objects are always destroyed under the router's lock. On
    275 // destruction, if the message it wrappers contains
    276 // ScopedInterfaceEndpointHandles (which cannot be destructed under the
    277 // router's lock), the wrapper unlocks to clean them up.
    278 class MultiplexRouter::MessageWrapper {
    279  public:
    280   MessageWrapper() = default;
    281 
    282   MessageWrapper(MultiplexRouter* router, Message message)
    283       : router_(router), value_(std::move(message)) {}
    284 
    285   MessageWrapper(MessageWrapper&& other)
    286       : router_(other.router_), value_(std::move(other.value_)) {}
    287 
    288   ~MessageWrapper() {
    289     if (value_.associated_endpoint_handles()->empty())
    290       return;
    291 
    292     router_->AssertLockAcquired();
    293     {
    294       MayAutoUnlock unlocker(&router_->lock_);
    295       value_.mutable_associated_endpoint_handles()->clear();
    296     }
    297   }
    298 
    299   MessageWrapper& operator=(MessageWrapper&& other) {
    300     router_ = other.router_;
    301     value_ = std::move(other.value_);
    302     return *this;
    303   }
    304 
    305   Message& value() { return value_; }
    306 
    307  private:
    308   MultiplexRouter* router_ = nullptr;
    309   Message value_;
    310 
    311   DISALLOW_COPY_AND_ASSIGN(MessageWrapper);
    312 };
    313 
    314 struct MultiplexRouter::Task {
    315  public:
    316   // Doesn't take ownership of |message| but takes its contents.
    317   static std::unique_ptr<Task> CreateMessageTask(
    318       MessageWrapper message_wrapper) {
    319     Task* task = new Task(MESSAGE);
    320     task->message_wrapper = std::move(message_wrapper);
    321     return base::WrapUnique(task);
    322   }
    323   static std::unique_ptr<Task> CreateNotifyErrorTask(
    324       InterfaceEndpoint* endpoint) {
    325     Task* task = new Task(NOTIFY_ERROR);
    326     task->endpoint_to_notify = endpoint;
    327     return base::WrapUnique(task);
    328   }
    329 
    330   ~Task() {}
    331 
    332   bool IsMessageTask() const { return type == MESSAGE; }
    333   bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
    334 
    335   MessageWrapper message_wrapper;
    336   scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
    337 
    338   enum Type { MESSAGE, NOTIFY_ERROR };
    339   Type type;
    340 
    341  private:
    342   explicit Task(Type in_type) : type(in_type) {}
    343 
    344   DISALLOW_COPY_AND_ASSIGN(Task);
    345 };
    346 
    347 MultiplexRouter::MultiplexRouter(
    348     ScopedMessagePipeHandle message_pipe,
    349     Config config,
    350     bool set_interface_id_namesapce_bit,
    351     scoped_refptr<base::SingleThreadTaskRunner> runner)
    352     : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
    353       task_runner_(runner),
    354       header_validator_(nullptr),
    355       filters_(this),
    356       connector_(std::move(message_pipe),
    357                  config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND
    358                                            : Connector::SINGLE_THREADED_SEND,
    359                  std::move(runner)),
    360       control_message_handler_(this),
    361       control_message_proxy_(&connector_),
    362       next_interface_id_value_(1),
    363       posted_to_process_tasks_(false),
    364       encountered_error_(false),
    365       paused_(false),
    366       testing_mode_(false) {
    367   DCHECK(task_runner_->BelongsToCurrentThread());
    368 
    369   if (config == MULTI_INTERFACE)
    370     lock_.emplace();
    371 
    372   if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS ||
    373       config == MULTI_INTERFACE) {
    374     // Always participate in sync handle watching in multi-interface mode,
    375     // because even if it doesn't expect sync requests during sync handle
    376     // watching, it may still need to dispatch messages to associated endpoints
    377     // on a different thread.
    378     connector_.AllowWokenUpBySyncWatchOnSameThread();
    379   }
    380   connector_.set_incoming_receiver(&filters_);
    381   connector_.set_connection_error_handler(
    382       base::Bind(&MultiplexRouter::OnPipeConnectionError,
    383                  base::Unretained(this)));
    384 
    385   std::unique_ptr<MessageHeaderValidator> header_validator =
    386       base::MakeUnique<MessageHeaderValidator>();
    387   header_validator_ = header_validator.get();
    388   filters_.Append(std::move(header_validator));
    389 }
    390 
    391 MultiplexRouter::~MultiplexRouter() {
    392   MayAutoLock locker(&lock_);
    393 
    394   sync_message_tasks_.clear();
    395   tasks_.clear();
    396 
    397   for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
    398     InterfaceEndpoint* endpoint = iter->second.get();
    399     // Increment the iterator before calling UpdateEndpointStateMayRemove()
    400     // because it may remove the corresponding value from the map.
    401     ++iter;
    402 
    403     if (!endpoint->closed()) {
    404       // This happens when a NotifyPeerEndpointClosed message been received, but
    405       // the interface ID hasn't been used to create local endpoint handle.
    406       DCHECK(!endpoint->client());
    407       DCHECK(endpoint->peer_closed());
    408       UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
    409     } else {
    410       UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
    411     }
    412   }
    413 
    414   DCHECK(endpoints_.empty());
    415 }
    416 
    417 void MultiplexRouter::SetMasterInterfaceName(const char* name) {
    418   DCHECK(thread_checker_.CalledOnValidThread());
    419   header_validator_->SetDescription(
    420       std::string(name) + " [master] MessageHeaderValidator");
    421   control_message_handler_.SetDescription(
    422       std::string(name) + " [master] PipeControlMessageHandler");
    423   connector_.SetWatcherHeapProfilerTag(name);
    424 }
    425 
    426 InterfaceId MultiplexRouter::AssociateInterface(
    427     ScopedInterfaceEndpointHandle handle_to_send) {
    428   if (!handle_to_send.pending_association())
    429     return kInvalidInterfaceId;
    430 
    431   uint32_t id = 0;
    432   {
    433     MayAutoLock locker(&lock_);
    434     do {
    435       if (next_interface_id_value_ >= kInterfaceIdNamespaceMask)
    436         next_interface_id_value_ = 1;
    437       id = next_interface_id_value_++;
    438       if (set_interface_id_namespace_bit_)
    439         id |= kInterfaceIdNamespaceMask;
    440     } while (base::ContainsKey(endpoints_, id));
    441 
    442     InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
    443     endpoints_[id] = endpoint;
    444     if (encountered_error_)
    445       UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
    446     endpoint->set_handle_created();
    447   }
    448 
    449   if (!NotifyAssociation(&handle_to_send, id)) {
    450     // The peer handle of |handle_to_send|, which is supposed to join this
    451     // associated group, has been closed.
    452     {
    453       MayAutoLock locker(&lock_);
    454       InterfaceEndpoint* endpoint = FindEndpoint(id);
    455       if (endpoint)
    456         UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
    457     }
    458 
    459     control_message_proxy_.NotifyPeerEndpointClosed(
    460         id, handle_to_send.disconnect_reason());
    461   }
    462   return id;
    463 }
    464 
    465 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
    466     InterfaceId id) {
    467   if (!IsValidInterfaceId(id))
    468     return ScopedInterfaceEndpointHandle();
    469 
    470   MayAutoLock locker(&lock_);
    471   bool inserted = false;
    472   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
    473   if (inserted) {
    474     DCHECK(!endpoint->handle_created());
    475 
    476     if (encountered_error_)
    477       UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
    478   } else {
    479     // If the endpoint already exist, it is because we have received a
    480     // notification that the peer endpoint has closed.
    481     CHECK(!endpoint->closed());
    482     CHECK(endpoint->peer_closed());
    483 
    484     if (endpoint->handle_created())
    485       return ScopedInterfaceEndpointHandle();
    486   }
    487 
    488   endpoint->set_handle_created();
    489   return CreateScopedInterfaceEndpointHandle(id);
    490 }
    491 
    492 void MultiplexRouter::CloseEndpointHandle(
    493     InterfaceId id,
    494     const base::Optional<DisconnectReason>& reason) {
    495   if (!IsValidInterfaceId(id))
    496     return;
    497 
    498   MayAutoLock locker(&lock_);
    499   DCHECK(base::ContainsKey(endpoints_, id));
    500   InterfaceEndpoint* endpoint = endpoints_[id].get();
    501   DCHECK(!endpoint->client());
    502   DCHECK(!endpoint->closed());
    503   UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
    504 
    505   if (!IsMasterInterfaceId(id) || reason) {
    506     MayAutoUnlock unlocker(&lock_);
    507     control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
    508   }
    509 
    510   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
    511 }
    512 
    513 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
    514     const ScopedInterfaceEndpointHandle& handle,
    515     InterfaceEndpointClient* client,
    516     scoped_refptr<base::SingleThreadTaskRunner> runner) {
    517   const InterfaceId id = handle.id();
    518 
    519   DCHECK(IsValidInterfaceId(id));
    520   DCHECK(client);
    521 
    522   MayAutoLock locker(&lock_);
    523   DCHECK(base::ContainsKey(endpoints_, id));
    524 
    525   InterfaceEndpoint* endpoint = endpoints_[id].get();
    526   endpoint->AttachClient(client, std::move(runner));
    527 
    528   if (endpoint->peer_closed())
    529     tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
    530   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
    531 
    532   return endpoint;
    533 }
    534 
    535 void MultiplexRouter::DetachEndpointClient(
    536     const ScopedInterfaceEndpointHandle& handle) {
    537   const InterfaceId id = handle.id();
    538 
    539   DCHECK(IsValidInterfaceId(id));
    540 
    541   MayAutoLock locker(&lock_);
    542   DCHECK(base::ContainsKey(endpoints_, id));
    543 
    544   InterfaceEndpoint* endpoint = endpoints_[id].get();
    545   endpoint->DetachClient();
    546 }
    547 
    548 void MultiplexRouter::RaiseError() {
    549   if (task_runner_->BelongsToCurrentThread()) {
    550     connector_.RaiseError();
    551   } else {
    552     task_runner_->PostTask(FROM_HERE,
    553                            base::Bind(&MultiplexRouter::RaiseError, this));
    554   }
    555 }
    556 
    557 void MultiplexRouter::CloseMessagePipe() {
    558   DCHECK(thread_checker_.CalledOnValidThread());
    559   connector_.CloseMessagePipe();
    560   // CloseMessagePipe() above won't trigger connection error handler.
    561   // Explicitly call OnPipeConnectionError() so that associated endpoints will
    562   // get notified.
    563   OnPipeConnectionError();
    564 }
    565 
    566 void MultiplexRouter::PauseIncomingMethodCallProcessing() {
    567   DCHECK(thread_checker_.CalledOnValidThread());
    568   connector_.PauseIncomingMethodCallProcessing();
    569 
    570   MayAutoLock locker(&lock_);
    571   paused_ = true;
    572 
    573   for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter)
    574     iter->second->ResetSyncMessageSignal();
    575 }
    576 
    577 void MultiplexRouter::ResumeIncomingMethodCallProcessing() {
    578   DCHECK(thread_checker_.CalledOnValidThread());
    579   connector_.ResumeIncomingMethodCallProcessing();
    580 
    581   MayAutoLock locker(&lock_);
    582   paused_ = false;
    583 
    584   for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) {
    585     auto sync_iter = sync_message_tasks_.find(iter->first);
    586     if (iter->second->peer_closed() ||
    587         (sync_iter != sync_message_tasks_.end() &&
    588          !sync_iter->second.empty())) {
    589       iter->second->SignalSyncMessageEvent();
    590     }
    591   }
    592 
    593   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
    594 }
    595 
    596 bool MultiplexRouter::HasAssociatedEndpoints() const {
    597   DCHECK(thread_checker_.CalledOnValidThread());
    598   MayAutoLock locker(&lock_);
    599 
    600   if (endpoints_.size() > 1)
    601     return true;
    602   if (endpoints_.size() == 0)
    603     return false;
    604 
    605   return !base::ContainsKey(endpoints_, kMasterInterfaceId);
    606 }
    607 
    608 void MultiplexRouter::EnableTestingMode() {
    609   DCHECK(thread_checker_.CalledOnValidThread());
    610   MayAutoLock locker(&lock_);
    611 
    612   testing_mode_ = true;
    613   connector_.set_enforce_errors_from_incoming_receiver(false);
    614 }
    615 
    616 bool MultiplexRouter::Accept(Message* message) {
    617   DCHECK(thread_checker_.CalledOnValidThread());
    618 
    619   if (!message->DeserializeAssociatedEndpointHandles(this))
    620     return false;
    621 
    622   scoped_refptr<MultiplexRouter> protector(this);
    623   MayAutoLock locker(&lock_);
    624 
    625   DCHECK(!paused_);
    626 
    627   ClientCallBehavior client_call_behavior =
    628       connector_.during_sync_handle_watcher_callback()
    629           ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
    630           : ALLOW_DIRECT_CLIENT_CALLS;
    631 
    632   bool processed =
    633       tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior,
    634                                                connector_.task_runner());
    635 
    636   if (!processed) {
    637     // Either the task queue is not empty or we cannot process the message
    638     // directly. In both cases, there is no need to call ProcessTasks().
    639     tasks_.push_back(
    640         Task::CreateMessageTask(MessageWrapper(this, std::move(*message))));
    641     Task* task = tasks_.back().get();
    642 
    643     if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) {
    644       InterfaceId id = task->message_wrapper.value().interface_id();
    645       sync_message_tasks_[id].push_back(task);
    646       InterfaceEndpoint* endpoint = FindEndpoint(id);
    647       if (endpoint)
    648         endpoint->SignalSyncMessageEvent();
    649     }
    650   } else if (!tasks_.empty()) {
    651     // Processing the message may result in new tasks (for error notification)
    652     // being added to the queue. In this case, we have to attempt to process the
    653     // tasks.
    654     ProcessTasks(client_call_behavior, connector_.task_runner());
    655   }
    656 
    657   // Always return true. If we see errors during message processing, we will
    658   // explicitly call Connector::RaiseError() to disconnect the message pipe.
    659   return true;
    660 }
    661 
    662 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(
    663     InterfaceId id,
    664     const base::Optional<DisconnectReason>& reason) {
    665   DCHECK(!IsMasterInterfaceId(id) || reason);
    666 
    667   MayAutoLock locker(&lock_);
    668   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
    669 
    670   if (reason)
    671     endpoint->set_disconnect_reason(reason);
    672 
    673   // It is possible that this endpoint has been set as peer closed. That is
    674   // because when the message pipe is closed, all the endpoints are updated with
    675   // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue,
    676   // as long as there are refs keeping the router alive. If there is a
    677   // PeerAssociatedEndpointClosedEvent control message in the queue, we will get
    678   // here and see that the endpoint has been marked as peer closed.
    679   if (!endpoint->peer_closed()) {
    680     if (endpoint->client())
    681       tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
    682     UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
    683   }
    684 
    685   // No need to trigger a ProcessTasks() because it is already on the stack.
    686 
    687   return true;
    688 }
    689 
    690 void MultiplexRouter::OnPipeConnectionError() {
    691   DCHECK(thread_checker_.CalledOnValidThread());
    692 
    693   scoped_refptr<MultiplexRouter> protector(this);
    694   MayAutoLock locker(&lock_);
    695 
    696   encountered_error_ = true;
    697 
    698   for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
    699     InterfaceEndpoint* endpoint = iter->second.get();
    700     // Increment the iterator before calling UpdateEndpointStateMayRemove()
    701     // because it may remove the corresponding value from the map.
    702     ++iter;
    703 
    704     if (endpoint->client())
    705       tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
    706 
    707     UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
    708   }
    709 
    710   ProcessTasks(connector_.during_sync_handle_watcher_callback()
    711                    ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
    712                    : ALLOW_DIRECT_CLIENT_CALLS,
    713                connector_.task_runner());
    714 }
    715 
    716 void MultiplexRouter::ProcessTasks(
    717     ClientCallBehavior client_call_behavior,
    718     base::SingleThreadTaskRunner* current_task_runner) {
    719   AssertLockAcquired();
    720 
    721   if (posted_to_process_tasks_)
    722     return;
    723 
    724   while (!tasks_.empty() && !paused_) {
    725     std::unique_ptr<Task> task(std::move(tasks_.front()));
    726     tasks_.pop_front();
    727 
    728     InterfaceId id = kInvalidInterfaceId;
    729     bool sync_message =
    730         task->IsMessageTask() && !task->message_wrapper.value().IsNull() &&
    731         task->message_wrapper.value().has_flag(Message::kFlagIsSync);
    732     if (sync_message) {
    733       id = task->message_wrapper.value().interface_id();
    734       auto& sync_message_queue = sync_message_tasks_[id];
    735       DCHECK_EQ(task.get(), sync_message_queue.front());
    736       sync_message_queue.pop_front();
    737     }
    738 
    739     bool processed =
    740         task->IsNotifyErrorTask()
    741             ? ProcessNotifyErrorTask(task.get(), client_call_behavior,
    742                                      current_task_runner)
    743             : ProcessIncomingMessage(&task->message_wrapper.value(),
    744                                      client_call_behavior, current_task_runner);
    745 
    746     if (!processed) {
    747       if (sync_message) {
    748         auto& sync_message_queue = sync_message_tasks_[id];
    749         sync_message_queue.push_front(task.get());
    750       }
    751       tasks_.push_front(std::move(task));
    752       break;
    753     } else {
    754       if (sync_message) {
    755         auto iter = sync_message_tasks_.find(id);
    756         if (iter != sync_message_tasks_.end() && iter->second.empty())
    757           sync_message_tasks_.erase(iter);
    758       }
    759     }
    760   }
    761 }
    762 
    763 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
    764   AssertLockAcquired();
    765 
    766   auto iter = sync_message_tasks_.find(id);
    767   if (iter == sync_message_tasks_.end())
    768     return false;
    769 
    770   if (paused_)
    771     return true;
    772 
    773   MultiplexRouter::Task* task = iter->second.front();
    774   iter->second.pop_front();
    775 
    776   DCHECK(task->IsMessageTask());
    777   MessageWrapper message_wrapper = std::move(task->message_wrapper);
    778 
    779   // Note: after this call, |task| and |iter| may be invalidated.
    780   bool processed = ProcessIncomingMessage(
    781       &message_wrapper.value(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES,
    782       nullptr);
    783   DCHECK(processed);
    784 
    785   iter = sync_message_tasks_.find(id);
    786   if (iter == sync_message_tasks_.end())
    787     return false;
    788 
    789   if (iter->second.empty()) {
    790     sync_message_tasks_.erase(iter);
    791     return false;
    792   }
    793 
    794   return true;
    795 }
    796 
    797 bool MultiplexRouter::ProcessNotifyErrorTask(
    798     Task* task,
    799     ClientCallBehavior client_call_behavior,
    800     base::SingleThreadTaskRunner* current_task_runner) {
    801   DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
    802   DCHECK(!paused_);
    803 
    804   AssertLockAcquired();
    805   InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
    806   if (!endpoint->client())
    807     return true;
    808 
    809   if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS ||
    810       endpoint->task_runner() != current_task_runner) {
    811     MaybePostToProcessTasks(endpoint->task_runner());
    812     return false;
    813   }
    814 
    815   DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
    816 
    817   InterfaceEndpointClient* client = endpoint->client();
    818   base::Optional<DisconnectReason> disconnect_reason(
    819       endpoint->disconnect_reason());
    820 
    821   {
    822     // We must unlock before calling into |client| because it may call this
    823     // object within NotifyError(). Holding the lock will lead to deadlock.
    824     //
    825     // It is safe to call into |client| without the lock. Because |client| is
    826     // always accessed on the same thread, including DetachEndpointClient().
    827     MayAutoUnlock unlocker(&lock_);
    828     client->NotifyError(disconnect_reason);
    829   }
    830   return true;
    831 }
    832 
    833 bool MultiplexRouter::ProcessIncomingMessage(
    834     Message* message,
    835     ClientCallBehavior client_call_behavior,
    836     base::SingleThreadTaskRunner* current_task_runner) {
    837   DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
    838   DCHECK(!paused_);
    839   DCHECK(message);
    840   AssertLockAcquired();
    841 
    842   if (message->IsNull()) {
    843     // This is a sync message and has been processed during sync handle
    844     // watching.
    845     return true;
    846   }
    847 
    848   if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
    849     bool result = false;
    850 
    851     {
    852       MayAutoUnlock unlocker(&lock_);
    853       result = control_message_handler_.Accept(message);
    854     }
    855 
    856     if (!result)
    857       RaiseErrorInNonTestingMode();
    858 
    859     return true;
    860   }
    861 
    862   InterfaceId id = message->interface_id();
    863   DCHECK(IsValidInterfaceId(id));
    864 
    865   InterfaceEndpoint* endpoint = FindEndpoint(id);
    866   if (!endpoint || endpoint->closed())
    867     return true;
    868 
    869   if (!endpoint->client()) {
    870     // We need to wait until a client is attached in order to dispatch further
    871     // messages.
    872     return false;
    873   }
    874 
    875   bool can_direct_call;
    876   if (message->has_flag(Message::kFlagIsSync)) {
    877     can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
    878                       endpoint->task_runner()->BelongsToCurrentThread();
    879   } else {
    880     can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS &&
    881                       endpoint->task_runner() == current_task_runner;
    882   }
    883 
    884   if (!can_direct_call) {
    885     MaybePostToProcessTasks(endpoint->task_runner());
    886     return false;
    887   }
    888 
    889   DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
    890 
    891   InterfaceEndpointClient* client = endpoint->client();
    892   bool result = false;
    893   {
    894     // We must unlock before calling into |client| because it may call this
    895     // object within HandleIncomingMessage(). Holding the lock will lead to
    896     // deadlock.
    897     //
    898     // It is safe to call into |client| without the lock. Because |client| is
    899     // always accessed on the same thread, including DetachEndpointClient().
    900     MayAutoUnlock unlocker(&lock_);
    901     result = client->HandleIncomingMessage(message);
    902   }
    903   if (!result)
    904     RaiseErrorInNonTestingMode();
    905 
    906   return true;
    907 }
    908 
    909 void MultiplexRouter::MaybePostToProcessTasks(
    910     base::SingleThreadTaskRunner* task_runner) {
    911   AssertLockAcquired();
    912   if (posted_to_process_tasks_)
    913     return;
    914 
    915   posted_to_process_tasks_ = true;
    916   posted_to_task_runner_ = task_runner;
    917   task_runner->PostTask(
    918       FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
    919 }
    920 
    921 void MultiplexRouter::LockAndCallProcessTasks() {
    922   // There is no need to hold a ref to this class in this case because this is
    923   // always called using base::Bind(), which holds a ref.
    924   MayAutoLock locker(&lock_);
    925   posted_to_process_tasks_ = false;
    926   scoped_refptr<base::SingleThreadTaskRunner> runner(
    927       std::move(posted_to_task_runner_));
    928   ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get());
    929 }
    930 
    931 void MultiplexRouter::UpdateEndpointStateMayRemove(
    932     InterfaceEndpoint* endpoint,
    933     EndpointStateUpdateType type) {
    934   if (type == ENDPOINT_CLOSED) {
    935     endpoint->set_closed();
    936   } else {
    937     endpoint->set_peer_closed();
    938     // If the interface endpoint is performing a sync watch, this makes sure
    939     // it is notified and eventually exits the sync watch.
    940     endpoint->SignalSyncMessageEvent();
    941   }
    942   if (endpoint->closed() && endpoint->peer_closed())
    943     endpoints_.erase(endpoint->id());
    944 }
    945 
    946 void MultiplexRouter::RaiseErrorInNonTestingMode() {
    947   AssertLockAcquired();
    948   if (!testing_mode_)
    949     RaiseError();
    950 }
    951 
    952 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
    953     InterfaceId id,
    954     bool* inserted) {
    955   AssertLockAcquired();
    956   // Either |inserted| is nullptr or it points to a boolean initialized as
    957   // false.
    958   DCHECK(!inserted || !*inserted);
    959 
    960   InterfaceEndpoint* endpoint = FindEndpoint(id);
    961   if (!endpoint) {
    962     endpoint = new InterfaceEndpoint(this, id);
    963     endpoints_[id] = endpoint;
    964     if (inserted)
    965       *inserted = true;
    966   }
    967 
    968   return endpoint;
    969 }
    970 
    971 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindEndpoint(
    972     InterfaceId id) {
    973   AssertLockAcquired();
    974   auto iter = endpoints_.find(id);
    975   return iter != endpoints_.end() ? iter->second.get() : nullptr;
    976 }
    977 
    978 void MultiplexRouter::AssertLockAcquired() {
    979 #if DCHECK_IS_ON()
    980   if (lock_)
    981     lock_->AssertAcquired();
    982 #endif
    983 }
    984 
    985 }  // namespace internal
    986 }  // namespace mojo
    987