Home | History | Annotate | Download | only in lib
      1 // Copyright 2015 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
      6 
      7 #include <stdint.h>
      8 
      9 #include <utility>
     10 
     11 #include "base/bind.h"
     12 #include "base/location.h"
     13 #include "base/macros.h"
     14 #include "base/memory/ptr_util.h"
     15 #include "base/single_thread_task_runner.h"
     16 #include "base/stl_util.h"
     17 #include "base/threading/thread_task_runner_handle.h"
     18 #include "mojo/public/cpp/bindings/associated_group.h"
     19 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
     20 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
     21 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
     22 
     23 namespace mojo {
     24 namespace internal {
     25 
     26 // InterfaceEndpoint stores the information of an interface endpoint registered
     27 // with the router.
     28 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
     29 // this object.
     30 class MultiplexRouter::InterfaceEndpoint
     31     : public base::RefCounted<InterfaceEndpoint>,
     32       public InterfaceEndpointController {
     33  public:
     34   InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
     35       : router_(router),
     36         id_(id),
     37         closed_(false),
     38         peer_closed_(false),
     39         client_(nullptr),
     40         event_signalled_(false) {}
     41 
     42   // ---------------------------------------------------------------------------
     43   // The following public methods are safe to call from any threads without
     44   // locking.
     45 
     46   InterfaceId id() const { return id_; }
     47 
     48   // ---------------------------------------------------------------------------
     49   // The following public methods are called under the router's lock.
     50 
     51   bool closed() const { return closed_; }
     52   void set_closed() {
     53     router_->lock_.AssertAcquired();
     54     closed_ = true;
     55   }
     56 
     57   bool peer_closed() const { return peer_closed_; }
     58   void set_peer_closed() {
     59     router_->lock_.AssertAcquired();
     60     peer_closed_ = true;
     61   }
     62 
     63   base::SingleThreadTaskRunner* task_runner() const {
     64     return task_runner_.get();
     65   }
     66 
     67   InterfaceEndpointClient* client() const { return client_; }
     68 
     69   void AttachClient(InterfaceEndpointClient* client,
     70                     scoped_refptr<base::SingleThreadTaskRunner> runner) {
     71     router_->lock_.AssertAcquired();
     72     DCHECK(!client_);
     73     DCHECK(!closed_);
     74     DCHECK(runner->BelongsToCurrentThread());
     75 
     76     task_runner_ = std::move(runner);
     77     client_ = client;
     78   }
     79 
     80   // This method must be called on the same thread as the corresponding
     81   // AttachClient() call.
     82   void DetachClient() {
     83     router_->lock_.AssertAcquired();
     84     DCHECK(client_);
     85     DCHECK(task_runner_->BelongsToCurrentThread());
     86     DCHECK(!closed_);
     87 
     88     task_runner_ = nullptr;
     89     client_ = nullptr;
     90     sync_watcher_.reset();
     91   }
     92 
     93   void SignalSyncMessageEvent() {
     94     router_->lock_.AssertAcquired();
     95     if (event_signalled_)
     96       return;
     97 
     98     EnsureEventMessagePipeExists();
     99     event_signalled_ = true;
    100     MojoResult result =
    101         WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr,
    102                         0, MOJO_WRITE_MESSAGE_FLAG_NONE);
    103     DCHECK_EQ(MOJO_RESULT_OK, result);
    104   }
    105 
    106   // ---------------------------------------------------------------------------
    107   // The following public methods (i.e., InterfaceEndpointController
    108   // implementation) are called by the client on the same thread as the
    109   // AttachClient() call. They are called outside of the router's lock.
    110 
    111   bool SendMessage(Message* message) override {
    112     DCHECK(task_runner_->BelongsToCurrentThread());
    113     message->set_interface_id(id_);
    114     return router_->connector_.Accept(message);
    115   }
    116 
    117   void AllowWokenUpBySyncWatchOnSameThread() override {
    118     DCHECK(task_runner_->BelongsToCurrentThread());
    119 
    120     EnsureSyncWatcherExists();
    121     sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
    122   }
    123 
    124   bool SyncWatch(const bool* should_stop) override {
    125     DCHECK(task_runner_->BelongsToCurrentThread());
    126 
    127     EnsureSyncWatcherExists();
    128     return sync_watcher_->SyncWatch(should_stop);
    129   }
    130 
    131  private:
    132   friend class base::RefCounted<InterfaceEndpoint>;
    133 
    134   ~InterfaceEndpoint() override {
    135     router_->lock_.AssertAcquired();
    136 
    137     DCHECK(!client_);
    138     DCHECK(closed_);
    139     DCHECK(peer_closed_);
    140     DCHECK(!sync_watcher_);
    141   }
    142 
    143   void OnHandleReady(MojoResult result) {
    144     DCHECK(task_runner_->BelongsToCurrentThread());
    145     scoped_refptr<InterfaceEndpoint> self_protector(this);
    146     scoped_refptr<MultiplexRouter> router_protector(router_);
    147 
    148     // Because we never close |sync_message_event_{sender,receiver}_| before
    149     // destruction or set a deadline, |result| should always be MOJO_RESULT_OK.
    150     DCHECK_EQ(MOJO_RESULT_OK, result);
    151     bool reset_sync_watcher = false;
    152     {
    153       base::AutoLock locker(router_->lock_);
    154 
    155       bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
    156 
    157       if (!more_to_process)
    158         ResetSyncMessageSignal();
    159 
    160       // Currently there are no queued sync messages and the peer has closed so
    161       // there won't be incoming sync messages in the future.
    162       reset_sync_watcher = !more_to_process && peer_closed_;
    163     }
    164     if (reset_sync_watcher) {
    165       // If a SyncWatch() call (or multiple ones) of this interface endpoint is
    166       // on the call stack, resetting the sync watcher will allow it to exit
    167       // when the call stack unwinds to that frame.
    168       sync_watcher_.reset();
    169     }
    170   }
    171 
    172   void EnsureSyncWatcherExists() {
    173     DCHECK(task_runner_->BelongsToCurrentThread());
    174     if (sync_watcher_)
    175       return;
    176 
    177     {
    178       base::AutoLock locker(router_->lock_);
    179       EnsureEventMessagePipeExists();
    180 
    181       auto iter = router_->sync_message_tasks_.find(id_);
    182       if (iter != router_->sync_message_tasks_.end() && !iter->second.empty())
    183         SignalSyncMessageEvent();
    184     }
    185 
    186     sync_watcher_.reset(new SyncHandleWatcher(
    187         sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE,
    188         base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this))));
    189   }
    190 
    191   void EnsureEventMessagePipeExists() {
    192     router_->lock_.AssertAcquired();
    193 
    194     if (sync_message_event_receiver_.is_valid())
    195       return;
    196 
    197     MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_,
    198                                           &sync_message_event_receiver_);
    199     DCHECK_EQ(MOJO_RESULT_OK, result);
    200   }
    201 
    202   void ResetSyncMessageSignal() {
    203     router_->lock_.AssertAcquired();
    204 
    205     if (!event_signalled_)
    206       return;
    207 
    208     DCHECK(sync_message_event_receiver_.is_valid());
    209     MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(),
    210                                        nullptr, nullptr, nullptr, nullptr,
    211                                        MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
    212     DCHECK_EQ(MOJO_RESULT_OK, result);
    213     event_signalled_ = false;
    214   }
    215 
    216   // ---------------------------------------------------------------------------
    217   // The following members are safe to access from any threads.
    218 
    219   MultiplexRouter* const router_;
    220   const InterfaceId id_;
    221 
    222   // ---------------------------------------------------------------------------
    223   // The following members are accessed under the router's lock.
    224 
    225   // Whether the endpoint has been closed.
    226   bool closed_;
    227   // Whether the peer endpoint has been closed.
    228   bool peer_closed_;
    229 
    230   // The task runner on which |client_|'s methods can be called.
    231   scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
    232   // Not owned. It is null if no client is attached to this endpoint.
    233   InterfaceEndpointClient* client_;
    234 
    235   // A message pipe used as an event to signal that sync messages are available.
    236   // The message pipe handles are initialized under the router's lock and remain
    237   // unchanged afterwards. They may be accessed outside of the router's lock
    238   // later.
    239   ScopedMessagePipeHandle sync_message_event_sender_;
    240   ScopedMessagePipeHandle sync_message_event_receiver_;
    241   bool event_signalled_;
    242 
    243   // ---------------------------------------------------------------------------
    244   // The following members are only valid while a client is attached. They are
    245   // used exclusively on the client's thread. They may be accessed outside of
    246   // the router's lock.
    247 
    248   std::unique_ptr<SyncHandleWatcher> sync_watcher_;
    249 
    250   DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
    251 };
    252 
    253 struct MultiplexRouter::Task {
    254  public:
    255   // Doesn't take ownership of |message| but takes its contents.
    256   static std::unique_ptr<Task> CreateMessageTask(Message* message) {
    257     Task* task = new Task(MESSAGE);
    258     task->message.reset(new Message);
    259     message->MoveTo(task->message.get());
    260     return base::WrapUnique(task);
    261   }
    262   static std::unique_ptr<Task> CreateNotifyErrorTask(
    263       InterfaceEndpoint* endpoint) {
    264     Task* task = new Task(NOTIFY_ERROR);
    265     task->endpoint_to_notify = endpoint;
    266     return base::WrapUnique(task);
    267   }
    268 
    269   ~Task() {}
    270 
    271   bool IsMessageTask() const { return type == MESSAGE; }
    272   bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
    273 
    274   std::unique_ptr<Message> message;
    275   scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
    276 
    277   enum Type { MESSAGE, NOTIFY_ERROR };
    278   Type type;
    279 
    280  private:
    281   explicit Task(Type in_type) : type(in_type) {}
    282 };
    283 
    284 MultiplexRouter::MultiplexRouter(
    285     bool set_interface_id_namesapce_bit,
    286     ScopedMessagePipeHandle message_pipe,
    287     scoped_refptr<base::SingleThreadTaskRunner> runner)
    288     : AssociatedGroupController(base::ThreadTaskRunnerHandle::Get()),
    289       set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
    290       header_validator_(this),
    291       connector_(std::move(message_pipe),
    292                  Connector::MULTI_THREADED_SEND,
    293                  std::move(runner)),
    294       control_message_handler_(this),
    295       control_message_proxy_(&connector_),
    296       next_interface_id_value_(1),
    297       posted_to_process_tasks_(false),
    298       encountered_error_(false),
    299       testing_mode_(false) {
    300   // Always participate in sync handle watching, because even if it doesn't
    301   // expect sync requests during sync handle watching, it may still need to
    302   // dispatch messages to associated endpoints on a different thread.
    303   connector_.AllowWokenUpBySyncWatchOnSameThread();
    304   connector_.set_incoming_receiver(&header_validator_);
    305   connector_.set_connection_error_handler(
    306       base::Bind(&MultiplexRouter::OnPipeConnectionError,
    307                  base::Unretained(this)));
    308 }
    309 
    310 MultiplexRouter::~MultiplexRouter() {
    311   base::AutoLock locker(lock_);
    312 
    313   sync_message_tasks_.clear();
    314   tasks_.clear();
    315 
    316   for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
    317     InterfaceEndpoint* endpoint = iter->second.get();
    318     // Increment the iterator before calling UpdateEndpointStateMayRemove()
    319     // because it may remove the corresponding value from the map.
    320     ++iter;
    321 
    322     DCHECK(endpoint->closed());
    323     UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
    324   }
    325 
    326   DCHECK(endpoints_.empty());
    327 }
    328 
    329 void MultiplexRouter::SetMasterInterfaceName(const std::string& name) {
    330   DCHECK(thread_checker_.CalledOnValidThread());
    331   header_validator_.SetDescription(name + " [master] MessageHeaderValidator");
    332   control_message_handler_.SetDescription(
    333       name + " [master] PipeControlMessageHandler");
    334 }
    335 
    336 void MultiplexRouter::CreateEndpointHandlePair(
    337     ScopedInterfaceEndpointHandle* local_endpoint,
    338     ScopedInterfaceEndpointHandle* remote_endpoint) {
    339   base::AutoLock locker(lock_);
    340   uint32_t id = 0;
    341   do {
    342     if (next_interface_id_value_ >= kInterfaceIdNamespaceMask)
    343       next_interface_id_value_ = 1;
    344     id = next_interface_id_value_++;
    345     if (set_interface_id_namespace_bit_)
    346       id |= kInterfaceIdNamespaceMask;
    347   } while (ContainsKey(endpoints_, id));
    348 
    349   InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
    350   endpoints_[id] = endpoint;
    351   if (encountered_error_)
    352     UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
    353 
    354   *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true);
    355   *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false);
    356 }
    357 
    358 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
    359     InterfaceId id) {
    360   if (!IsValidInterfaceId(id))
    361     return ScopedInterfaceEndpointHandle();
    362 
    363   base::AutoLock locker(lock_);
    364   bool inserted = false;
    365   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
    366   if (inserted) {
    367     if (encountered_error_)
    368       UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
    369   } else {
    370     // If the endpoint already exist, it is because we have received a
    371     // notification that the peer endpoint has closed.
    372     CHECK(!endpoint->closed());
    373     CHECK(endpoint->peer_closed());
    374   }
    375   return CreateScopedInterfaceEndpointHandle(id, true);
    376 }
    377 
    378 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) {
    379   if (!IsValidInterfaceId(id))
    380     return;
    381 
    382   base::AutoLock locker(lock_);
    383 
    384   if (!is_local) {
    385     DCHECK(ContainsKey(endpoints_, id));
    386     DCHECK(!IsMasterInterfaceId(id));
    387 
    388     // We will receive a NotifyPeerEndpointClosed message from the other side.
    389     control_message_proxy_.NotifyEndpointClosedBeforeSent(id);
    390 
    391     return;
    392   }
    393 
    394   DCHECK(ContainsKey(endpoints_, id));
    395   InterfaceEndpoint* endpoint = endpoints_[id].get();
    396   DCHECK(!endpoint->client());
    397   DCHECK(!endpoint->closed());
    398   UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
    399 
    400   if (!IsMasterInterfaceId(id))
    401     control_message_proxy_.NotifyPeerEndpointClosed(id);
    402 
    403   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
    404 }
    405 
    406 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
    407     const ScopedInterfaceEndpointHandle& handle,
    408     InterfaceEndpointClient* client,
    409     scoped_refptr<base::SingleThreadTaskRunner> runner) {
    410   const InterfaceId id = handle.id();
    411 
    412   DCHECK(IsValidInterfaceId(id));
    413   DCHECK(client);
    414 
    415   base::AutoLock locker(lock_);
    416   DCHECK(ContainsKey(endpoints_, id));
    417 
    418   InterfaceEndpoint* endpoint = endpoints_[id].get();
    419   endpoint->AttachClient(client, std::move(runner));
    420 
    421   if (endpoint->peer_closed())
    422     tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
    423   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
    424 
    425   return endpoint;
    426 }
    427 
    428 void MultiplexRouter::DetachEndpointClient(
    429     const ScopedInterfaceEndpointHandle& handle) {
    430   const InterfaceId id = handle.id();
    431 
    432   DCHECK(IsValidInterfaceId(id));
    433 
    434   base::AutoLock locker(lock_);
    435   DCHECK(ContainsKey(endpoints_, id));
    436 
    437   InterfaceEndpoint* endpoint = endpoints_[id].get();
    438   endpoint->DetachClient();
    439 }
    440 
    441 void MultiplexRouter::RaiseError() {
    442   if (task_runner_->BelongsToCurrentThread()) {
    443     connector_.RaiseError();
    444   } else {
    445     task_runner_->PostTask(FROM_HERE,
    446                            base::Bind(&MultiplexRouter::RaiseError, this));
    447   }
    448 }
    449 
    450 void MultiplexRouter::CloseMessagePipe() {
    451   DCHECK(thread_checker_.CalledOnValidThread());
    452   connector_.CloseMessagePipe();
    453   // CloseMessagePipe() above won't trigger connection error handler.
    454   // Explicitly call OnPipeConnectionError() so that associated endpoints will
    455   // get notified.
    456   OnPipeConnectionError();
    457 }
    458 
    459 bool MultiplexRouter::HasAssociatedEndpoints() const {
    460   DCHECK(thread_checker_.CalledOnValidThread());
    461   base::AutoLock locker(lock_);
    462 
    463   if (endpoints_.size() > 1)
    464     return true;
    465   if (endpoints_.size() == 0)
    466     return false;
    467 
    468   return !ContainsKey(endpoints_, kMasterInterfaceId);
    469 }
    470 
    471 void MultiplexRouter::EnableTestingMode() {
    472   DCHECK(thread_checker_.CalledOnValidThread());
    473   base::AutoLock locker(lock_);
    474 
    475   testing_mode_ = true;
    476   connector_.set_enforce_errors_from_incoming_receiver(false);
    477 }
    478 
    479 bool MultiplexRouter::Accept(Message* message) {
    480   DCHECK(thread_checker_.CalledOnValidThread());
    481 
    482   scoped_refptr<MultiplexRouter> protector(this);
    483   base::AutoLock locker(lock_);
    484 
    485   ClientCallBehavior client_call_behavior =
    486       connector_.during_sync_handle_watcher_callback()
    487           ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
    488           : ALLOW_DIRECT_CLIENT_CALLS;
    489 
    490   bool processed =
    491       tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior,
    492                                                connector_.task_runner());
    493 
    494   if (!processed) {
    495     // Either the task queue is not empty or we cannot process the message
    496     // directly. In both cases, there is no need to call ProcessTasks().
    497     tasks_.push_back(Task::CreateMessageTask(message));
    498     Task* task = tasks_.back().get();
    499 
    500     if (task->message->has_flag(Message::kFlagIsSync)) {
    501       InterfaceId id = task->message->interface_id();
    502       sync_message_tasks_[id].push_back(task);
    503       auto iter = endpoints_.find(id);
    504       if (iter != endpoints_.end())
    505         iter->second->SignalSyncMessageEvent();
    506     }
    507   } else if (!tasks_.empty()) {
    508     // Processing the message may result in new tasks (for error notification)
    509     // being added to the queue. In this case, we have to attempt to process the
    510     // tasks.
    511     ProcessTasks(client_call_behavior, connector_.task_runner());
    512   }
    513 
    514   // Always return true. If we see errors during message processing, we will
    515   // explicitly call Connector::RaiseError() to disconnect the message pipe.
    516   return true;
    517 }
    518 
    519 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) {
    520   lock_.AssertAcquired();
    521 
    522   if (IsMasterInterfaceId(id))
    523     return false;
    524 
    525   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
    526 
    527   // It is possible that this endpoint has been set as peer closed. That is
    528   // because when the message pipe is closed, all the endpoints are updated with
    529   // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue,
    530   // as long as there are refs keeping the router alive. If there is a
    531   // PeerAssociatedEndpointClosedEvent control message in the queue, we will get
    532   // here and see that the endpoint has been marked as peer closed.
    533   if (!endpoint->peer_closed()) {
    534     if (endpoint->client())
    535       tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
    536     UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
    537   }
    538 
    539   // No need to trigger a ProcessTasks() because it is already on the stack.
    540 
    541   return true;
    542 }
    543 
    544 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) {
    545   lock_.AssertAcquired();
    546 
    547   if (IsMasterInterfaceId(id))
    548     return false;
    549 
    550   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
    551   DCHECK(!endpoint->closed());
    552   UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
    553 
    554   control_message_proxy_.NotifyPeerEndpointClosed(id);
    555 
    556   return true;
    557 }
    558 
    559 void MultiplexRouter::OnPipeConnectionError() {
    560   DCHECK(thread_checker_.CalledOnValidThread());
    561 
    562   scoped_refptr<MultiplexRouter> protector(this);
    563   base::AutoLock locker(lock_);
    564 
    565   encountered_error_ = true;
    566 
    567   for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
    568     InterfaceEndpoint* endpoint = iter->second.get();
    569     // Increment the iterator before calling UpdateEndpointStateMayRemove()
    570     // because it may remove the corresponding value from the map.
    571     ++iter;
    572 
    573     if (endpoint->client())
    574       tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
    575 
    576     UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
    577   }
    578 
    579   ProcessTasks(connector_.during_sync_handle_watcher_callback()
    580                    ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
    581                    : ALLOW_DIRECT_CLIENT_CALLS,
    582                connector_.task_runner());
    583 }
    584 
    585 void MultiplexRouter::ProcessTasks(
    586     ClientCallBehavior client_call_behavior,
    587     base::SingleThreadTaskRunner* current_task_runner) {
    588   lock_.AssertAcquired();
    589 
    590   if (posted_to_process_tasks_)
    591     return;
    592 
    593   while (!tasks_.empty()) {
    594     std::unique_ptr<Task> task(std::move(tasks_.front()));
    595     tasks_.pop_front();
    596 
    597     InterfaceId id = kInvalidInterfaceId;
    598     bool sync_message = task->IsMessageTask() && task->message &&
    599                         task->message->has_flag(Message::kFlagIsSync);
    600     if (sync_message) {
    601       id = task->message->interface_id();
    602       auto& sync_message_queue = sync_message_tasks_[id];
    603       DCHECK_EQ(task.get(), sync_message_queue.front());
    604       sync_message_queue.pop_front();
    605     }
    606 
    607     bool processed =
    608         task->IsNotifyErrorTask()
    609             ? ProcessNotifyErrorTask(task.get(), client_call_behavior,
    610                                      current_task_runner)
    611             : ProcessIncomingMessage(task->message.get(), client_call_behavior,
    612                                      current_task_runner);
    613 
    614     if (!processed) {
    615       if (sync_message) {
    616         auto& sync_message_queue = sync_message_tasks_[id];
    617         sync_message_queue.push_front(task.get());
    618       }
    619       tasks_.push_front(std::move(task));
    620       break;
    621     } else {
    622       if (sync_message) {
    623         auto iter = sync_message_tasks_.find(id);
    624         if (iter != sync_message_tasks_.end() && iter->second.empty())
    625           sync_message_tasks_.erase(iter);
    626       }
    627     }
    628   }
    629 }
    630 
    631 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
    632   lock_.AssertAcquired();
    633 
    634   auto iter = sync_message_tasks_.find(id);
    635   if (iter == sync_message_tasks_.end())
    636     return false;
    637 
    638   MultiplexRouter::Task* task = iter->second.front();
    639   iter->second.pop_front();
    640 
    641   DCHECK(task->IsMessageTask());
    642   std::unique_ptr<Message> message(std::move(task->message));
    643 
    644   // Note: after this call, |task| and  |iter| may be invalidated.
    645   bool processed = ProcessIncomingMessage(
    646       message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr);
    647   DCHECK(processed);
    648 
    649   iter = sync_message_tasks_.find(id);
    650   if (iter == sync_message_tasks_.end())
    651     return false;
    652 
    653   if (iter->second.empty()) {
    654     sync_message_tasks_.erase(iter);
    655     return false;
    656   }
    657 
    658   return true;
    659 }
    660 
    661 bool MultiplexRouter::ProcessNotifyErrorTask(
    662     Task* task,
    663     ClientCallBehavior client_call_behavior,
    664     base::SingleThreadTaskRunner* current_task_runner) {
    665   DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
    666   lock_.AssertAcquired();
    667   InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
    668   if (!endpoint->client())
    669     return true;
    670 
    671   if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS ||
    672       endpoint->task_runner() != current_task_runner) {
    673     MaybePostToProcessTasks(endpoint->task_runner());
    674     return false;
    675   }
    676 
    677   DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
    678 
    679   InterfaceEndpointClient* client = endpoint->client();
    680   {
    681     // We must unlock before calling into |client| because it may call this
    682     // object within NotifyError(). Holding the lock will lead to deadlock.
    683     //
    684     // It is safe to call into |client| without the lock. Because |client| is
    685     // always accessed on the same thread, including DetachEndpointClient().
    686     base::AutoUnlock unlocker(lock_);
    687     client->NotifyError();
    688   }
    689   return true;
    690 }
    691 
    692 bool MultiplexRouter::ProcessIncomingMessage(
    693     Message* message,
    694     ClientCallBehavior client_call_behavior,
    695     base::SingleThreadTaskRunner* current_task_runner) {
    696   DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
    697   lock_.AssertAcquired();
    698 
    699   if (!message) {
    700     // This is a sync message and has been processed during sync handle
    701     // watching.
    702     return true;
    703   }
    704 
    705   if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
    706     if (!control_message_handler_.Accept(message))
    707       RaiseErrorInNonTestingMode();
    708     return true;
    709   }
    710 
    711   InterfaceId id = message->interface_id();
    712   DCHECK(IsValidInterfaceId(id));
    713 
    714   bool inserted = false;
    715   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
    716   if (inserted) {
    717     // Currently, it is legitimate to receive messages for an endpoint
    718     // that is not registered. For example, the endpoint is transferred in
    719     // a message that is discarded. Once we add support to specify all
    720     // enclosing endpoints in message header, we should be able to remove
    721     // this.
    722     UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
    723 
    724     // It is also possible that this newly-inserted endpoint is the master
    725     // endpoint. When the master InterfacePtr/Binding goes away, the message
    726     // pipe is closed and we explicitly trigger a pipe connection error. The
    727     // error updates all the endpoints, including the master endpoint, with
    728     // PEER_ENDPOINT_CLOSED and removes the master endpoint from the
    729     // registration. We continue to process remaining tasks in the queue, as
    730     // long as there are refs keeping the router alive. If there are remaining
    731     // messages for the master endpoint, we will get here.
    732     if (!IsMasterInterfaceId(id))
    733       control_message_proxy_.NotifyPeerEndpointClosed(id);
    734     return true;
    735   }
    736 
    737   if (endpoint->closed())
    738     return true;
    739 
    740   if (!endpoint->client()) {
    741     // We need to wait until a client is attached in order to dispatch further
    742     // messages.
    743     return false;
    744   }
    745 
    746   bool can_direct_call;
    747   if (message->has_flag(Message::kFlagIsSync)) {
    748     can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
    749                       endpoint->task_runner()->BelongsToCurrentThread();
    750   } else {
    751     can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS &&
    752                       endpoint->task_runner() == current_task_runner;
    753   }
    754 
    755   if (!can_direct_call) {
    756     MaybePostToProcessTasks(endpoint->task_runner());
    757     return false;
    758   }
    759 
    760   DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
    761 
    762   InterfaceEndpointClient* client = endpoint->client();
    763   bool result = false;
    764   {
    765     // We must unlock before calling into |client| because it may call this
    766     // object within HandleIncomingMessage(). Holding the lock will lead to
    767     // deadlock.
    768     //
    769     // It is safe to call into |client| without the lock. Because |client| is
    770     // always accessed on the same thread, including DetachEndpointClient().
    771     base::AutoUnlock unlocker(lock_);
    772     result = client->HandleIncomingMessage(message);
    773   }
    774   if (!result)
    775     RaiseErrorInNonTestingMode();
    776 
    777   return true;
    778 }
    779 
    780 void MultiplexRouter::MaybePostToProcessTasks(
    781     base::SingleThreadTaskRunner* task_runner) {
    782   lock_.AssertAcquired();
    783   if (posted_to_process_tasks_)
    784     return;
    785 
    786   posted_to_process_tasks_ = true;
    787   posted_to_task_runner_ = task_runner;
    788   task_runner->PostTask(
    789       FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
    790 }
    791 
    792 void MultiplexRouter::LockAndCallProcessTasks() {
    793   // There is no need to hold a ref to this class in this case because this is
    794   // always called using base::Bind(), which holds a ref.
    795   base::AutoLock locker(lock_);
    796   posted_to_process_tasks_ = false;
    797   scoped_refptr<base::SingleThreadTaskRunner> runner(
    798       std::move(posted_to_task_runner_));
    799   ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get());
    800 }
    801 
    802 void MultiplexRouter::UpdateEndpointStateMayRemove(
    803     InterfaceEndpoint* endpoint,
    804     EndpointStateUpdateType type) {
    805   switch (type) {
    806     case ENDPOINT_CLOSED:
    807       endpoint->set_closed();
    808       break;
    809     case PEER_ENDPOINT_CLOSED:
    810       endpoint->set_peer_closed();
    811       // If the interface endpoint is performing a sync watch, this makes sure
    812       // it is notified and eventually exits the sync watch.
    813       endpoint->SignalSyncMessageEvent();
    814       break;
    815   }
    816   if (endpoint->closed() && endpoint->peer_closed())
    817     endpoints_.erase(endpoint->id());
    818 }
    819 
    820 void MultiplexRouter::RaiseErrorInNonTestingMode() {
    821   lock_.AssertAcquired();
    822   if (!testing_mode_)
    823     RaiseError();
    824 }
    825 
    826 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
    827     InterfaceId id,
    828     bool* inserted) {
    829   lock_.AssertAcquired();
    830   // Either |inserted| is nullptr or it points to a boolean initialized as
    831   // false.
    832   DCHECK(!inserted || !*inserted);
    833 
    834   auto iter = endpoints_.find(id);
    835   InterfaceEndpoint* endpoint;
    836   if (iter == endpoints_.end()) {
    837     endpoint = new InterfaceEndpoint(this, id);
    838     endpoints_[id] = endpoint;
    839     if (inserted)
    840       *inserted = true;
    841   } else {
    842     endpoint = iter->second.get();
    843   }
    844 
    845   return endpoint;
    846 }
    847 
    848 }  // namespace internal
    849 }  // namespace mojo
    850