Home | History | Annotate | Download | only in ipc
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "ipc/ipc_mojo_bootstrap.h"
      6 
      7 #include <inttypes.h>
      8 #include <stdint.h>
      9 
     10 #include <map>
     11 #include <memory>
     12 #include <set>
     13 #include <utility>
     14 #include <vector>
     15 
     16 #include "base/callback.h"
     17 #include "base/containers/queue.h"
     18 #include "base/logging.h"
     19 #include "base/macros.h"
     20 #include "base/memory/ptr_util.h"
     21 #include "base/no_destructor.h"
     22 #include "base/sequenced_task_runner.h"
     23 #include "base/single_thread_task_runner.h"
     24 #include "base/strings/stringprintf.h"
     25 #include "base/synchronization/lock.h"
     26 #include "base/threading/thread_checker.h"
     27 #include "base/threading/thread_task_runner_handle.h"
     28 #include "base/trace_event/memory_allocator_dump.h"
     29 #include "base/trace_event/memory_dump_manager.h"
     30 #include "base/trace_event/memory_dump_provider.h"
     31 #include "ipc/ipc_channel.h"
     32 #include "mojo/public/cpp/bindings/associated_group.h"
     33 #include "mojo/public/cpp/bindings/associated_group_controller.h"
     34 #include "mojo/public/cpp/bindings/connector.h"
     35 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
     36 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
     37 #include "mojo/public/cpp/bindings/interface_id.h"
     38 #include "mojo/public/cpp/bindings/message.h"
     39 #include "mojo/public/cpp/bindings/message_header_validator.h"
     40 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
     41 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
     42 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
     43 #include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
     44 
     45 namespace IPC {
     46 
     47 namespace {
     48 
     49 class ChannelAssociatedGroupController;
     50 
     51 // Used to track some internal Channel state in pursuit of message leaks.
     52 //
     53 // TODO(https://crbug.com/813045): Remove this.
     54 class ControllerMemoryDumpProvider
     55     : public base::trace_event::MemoryDumpProvider {
     56  public:
     57   ControllerMemoryDumpProvider() {
     58     base::trace_event::MemoryDumpManager::GetInstance()->RegisterDumpProvider(
     59         this, "IPCChannel", nullptr);
     60   }
     61 
     62   ~ControllerMemoryDumpProvider() override {
     63     base::trace_event::MemoryDumpManager::GetInstance()->UnregisterDumpProvider(
     64         this);
     65   }
     66 
     67   void AddController(ChannelAssociatedGroupController* controller) {
     68     base::AutoLock lock(lock_);
     69     controllers_.insert(controller);
     70   }
     71 
     72   void RemoveController(ChannelAssociatedGroupController* controller) {
     73     base::AutoLock lock(lock_);
     74     controllers_.erase(controller);
     75   }
     76 
     77   // base::trace_event::MemoryDumpProvider:
     78   bool OnMemoryDump(const base::trace_event::MemoryDumpArgs& args,
     79                     base::trace_event::ProcessMemoryDump* pmd) override;
     80 
     81  private:
     82   base::Lock lock_;
     83   std::set<ChannelAssociatedGroupController*> controllers_;
     84 
     85   DISALLOW_COPY_AND_ASSIGN(ControllerMemoryDumpProvider);
     86 };
     87 
     88 ControllerMemoryDumpProvider& GetMemoryDumpProvider() {
     89   static base::NoDestructor<ControllerMemoryDumpProvider> provider;
     90   return *provider;
     91 }
     92 
     93 class ChannelAssociatedGroupController
     94     : public mojo::AssociatedGroupController,
     95       public mojo::MessageReceiver,
     96       public mojo::PipeControlMessageHandlerDelegate {
     97  public:
     98   ChannelAssociatedGroupController(
     99       bool set_interface_id_namespace_bit,
    100       const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
    101       const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
    102       : task_runner_(ipc_task_runner),
    103         proxy_task_runner_(proxy_task_runner),
    104         set_interface_id_namespace_bit_(set_interface_id_namespace_bit),
    105         filters_(this),
    106         control_message_handler_(this),
    107         control_message_proxy_thunk_(this),
    108         control_message_proxy_(&control_message_proxy_thunk_) {
    109     thread_checker_.DetachFromThread();
    110     control_message_handler_.SetDescription(
    111         "IPC::mojom::Bootstrap [master] PipeControlMessageHandler");
    112     filters_.Append<mojo::MessageHeaderValidator>(
    113         "IPC::mojom::Bootstrap [master] MessageHeaderValidator");
    114 
    115     GetMemoryDumpProvider().AddController(this);
    116   }
    117 
    118   size_t GetQueuedMessageCount() {
    119     base::AutoLock lock(outgoing_messages_lock_);
    120     return outgoing_messages_.size();
    121   }
    122 
    123   void Bind(mojo::ScopedMessagePipeHandle handle) {
    124     DCHECK(thread_checker_.CalledOnValidThread());
    125     DCHECK(task_runner_->BelongsToCurrentThread());
    126 
    127     connector_.reset(new mojo::Connector(
    128         std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
    129         task_runner_));
    130     connector_->set_incoming_receiver(&filters_);
    131     connector_->set_connection_error_handler(
    132         base::Bind(&ChannelAssociatedGroupController::OnPipeError,
    133                    base::Unretained(this)));
    134     connector_->set_enforce_errors_from_incoming_receiver(false);
    135     connector_->SetWatcherHeapProfilerTag("IPC Channel");
    136   }
    137 
    138   void Pause() {
    139     DCHECK(!paused_);
    140     paused_ = true;
    141   }
    142 
    143   void Unpause() {
    144     DCHECK(paused_);
    145     paused_ = false;
    146   }
    147 
    148   void FlushOutgoingMessages() {
    149     std::vector<mojo::Message> outgoing_messages;
    150     {
    151       base::AutoLock lock(outgoing_messages_lock_);
    152       std::swap(outgoing_messages, outgoing_messages_);
    153     }
    154     for (auto& message : outgoing_messages)
    155       SendMessage(&message);
    156   }
    157 
    158   void CreateChannelEndpoints(mojom::ChannelAssociatedPtr* sender,
    159                               mojom::ChannelAssociatedRequest* receiver) {
    160     mojo::InterfaceId sender_id, receiver_id;
    161     if (set_interface_id_namespace_bit_) {
    162       sender_id = 1 | mojo::kInterfaceIdNamespaceMask;
    163       receiver_id = 1;
    164     } else {
    165       sender_id = 1;
    166       receiver_id = 1 | mojo::kInterfaceIdNamespaceMask;
    167     }
    168 
    169     {
    170       base::AutoLock locker(lock_);
    171       Endpoint* sender_endpoint = new Endpoint(this, sender_id);
    172       Endpoint* receiver_endpoint = new Endpoint(this, receiver_id);
    173       endpoints_.insert({ sender_id, sender_endpoint });
    174       endpoints_.insert({ receiver_id, receiver_endpoint });
    175       sender_endpoint->set_handle_created();
    176       receiver_endpoint->set_handle_created();
    177     }
    178 
    179     mojo::ScopedInterfaceEndpointHandle sender_handle =
    180         CreateScopedInterfaceEndpointHandle(sender_id);
    181     mojo::ScopedInterfaceEndpointHandle receiver_handle =
    182         CreateScopedInterfaceEndpointHandle(receiver_id);
    183 
    184     sender->Bind(mojom::ChannelAssociatedPtrInfo(std::move(sender_handle), 0));
    185     *receiver = mojom::ChannelAssociatedRequest(std::move(receiver_handle));
    186   }
    187 
    188   void ShutDown() {
    189     DCHECK(thread_checker_.CalledOnValidThread());
    190     shut_down_ = true;
    191     connector_->CloseMessagePipe();
    192     OnPipeError();
    193     connector_.reset();
    194 
    195     base::AutoLock lock(outgoing_messages_lock_);
    196     outgoing_messages_.clear();
    197   }
    198 
    199   // mojo::AssociatedGroupController:
    200   mojo::InterfaceId AssociateInterface(
    201       mojo::ScopedInterfaceEndpointHandle handle_to_send) override {
    202     if (!handle_to_send.pending_association())
    203       return mojo::kInvalidInterfaceId;
    204 
    205     uint32_t id = 0;
    206     {
    207       base::AutoLock locker(lock_);
    208       do {
    209         if (next_interface_id_ >= mojo::kInterfaceIdNamespaceMask)
    210           next_interface_id_ = 2;
    211         id = next_interface_id_++;
    212         if (set_interface_id_namespace_bit_)
    213           id |= mojo::kInterfaceIdNamespaceMask;
    214       } while (ContainsKey(endpoints_, id));
    215 
    216       Endpoint* endpoint = new Endpoint(this, id);
    217       if (encountered_error_)
    218         endpoint->set_peer_closed();
    219       endpoint->set_handle_created();
    220       endpoints_.insert({id, endpoint});
    221     }
    222 
    223     if (!NotifyAssociation(&handle_to_send, id)) {
    224       // The peer handle of |handle_to_send|, which is supposed to join this
    225       // associated group, has been closed.
    226       {
    227         base::AutoLock locker(lock_);
    228         Endpoint* endpoint = FindEndpoint(id);
    229         if (endpoint)
    230           MarkClosedAndMaybeRemove(endpoint);
    231       }
    232 
    233       control_message_proxy_.NotifyPeerEndpointClosed(
    234           id, handle_to_send.disconnect_reason());
    235     }
    236     return id;
    237   }
    238 
    239   mojo::ScopedInterfaceEndpointHandle CreateLocalEndpointHandle(
    240       mojo::InterfaceId id) override {
    241     if (!mojo::IsValidInterfaceId(id))
    242       return mojo::ScopedInterfaceEndpointHandle();
    243 
    244     // Unless it is the master ID, |id| is from the remote side and therefore
    245     // its namespace bit is supposed to be different than the value that this
    246     // router would use.
    247     if (!mojo::IsMasterInterfaceId(id) &&
    248         set_interface_id_namespace_bit_ ==
    249             mojo::HasInterfaceIdNamespaceBitSet(id)) {
    250       return mojo::ScopedInterfaceEndpointHandle();
    251     }
    252 
    253     base::AutoLock locker(lock_);
    254     bool inserted = false;
    255     Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
    256     if (inserted) {
    257       DCHECK(!endpoint->handle_created());
    258       if (encountered_error_)
    259         endpoint->set_peer_closed();
    260     } else {
    261       if (endpoint->handle_created())
    262         return mojo::ScopedInterfaceEndpointHandle();
    263     }
    264 
    265     endpoint->set_handle_created();
    266     return CreateScopedInterfaceEndpointHandle(id);
    267   }
    268 
    269   void CloseEndpointHandle(
    270       mojo::InterfaceId id,
    271       const base::Optional<mojo::DisconnectReason>& reason) override {
    272     if (!mojo::IsValidInterfaceId(id))
    273       return;
    274     {
    275       base::AutoLock locker(lock_);
    276       DCHECK(ContainsKey(endpoints_, id));
    277       Endpoint* endpoint = endpoints_[id].get();
    278       DCHECK(!endpoint->client());
    279       DCHECK(!endpoint->closed());
    280       MarkClosedAndMaybeRemove(endpoint);
    281     }
    282 
    283     if (!mojo::IsMasterInterfaceId(id) || reason)
    284       control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
    285   }
    286 
    287   mojo::InterfaceEndpointController* AttachEndpointClient(
    288       const mojo::ScopedInterfaceEndpointHandle& handle,
    289       mojo::InterfaceEndpointClient* client,
    290       scoped_refptr<base::SequencedTaskRunner> runner) override {
    291     const mojo::InterfaceId id = handle.id();
    292 
    293     DCHECK(mojo::IsValidInterfaceId(id));
    294     DCHECK(client);
    295 
    296     base::AutoLock locker(lock_);
    297     DCHECK(ContainsKey(endpoints_, id));
    298 
    299     Endpoint* endpoint = endpoints_[id].get();
    300     endpoint->AttachClient(client, std::move(runner));
    301 
    302     if (endpoint->peer_closed())
    303       NotifyEndpointOfError(endpoint, true /* force_async */);
    304 
    305     return endpoint;
    306   }
    307 
    308   void DetachEndpointClient(
    309       const mojo::ScopedInterfaceEndpointHandle& handle) override {
    310     const mojo::InterfaceId id = handle.id();
    311 
    312     DCHECK(mojo::IsValidInterfaceId(id));
    313 
    314     base::AutoLock locker(lock_);
    315     DCHECK(ContainsKey(endpoints_, id));
    316 
    317     Endpoint* endpoint = endpoints_[id].get();
    318     endpoint->DetachClient();
    319   }
    320 
    321   void RaiseError() override {
    322     // We ignore errors on channel endpoints, leaving the pipe open. There are
    323     // good reasons for this:
    324     //
    325     //   * We should never close a channel endpoint in either process as long as
    326     //     the child process is still alive. The child's endpoint should only be
    327     //     closed implicitly by process death, and the browser's endpoint should
    328     //     only be closed after the child process is confirmed to be dead. Crash
    329     //     reporting logic in Chrome relies on this behavior in order to do the
    330     //     right thing.
    331     //
    332     //   * There are two interesting conditions under which RaiseError() can be
    333     //     implicitly reached: an incoming message fails validation, or the
    334     //     local endpoint drops a response callback without calling it.
    335     //
    336     //   * In the validation case, we also report the message as bad, and this
    337     //     will imminently trigger the common bad-IPC path in the browser,
    338     //     causing the browser to kill the offending renderer.
    339     //
    340     //   * In the dropped response callback case, the net result of ignoring the
    341     //     issue is generally innocuous. While indicative of programmer error,
    342     //     it's not a severe failure and is already covered by separate DCHECKs.
    343     //
    344     // See https://crbug.com/861607 for additional discussion.
    345   }
    346 
    347   bool PrefersSerializedMessages() override { return true; }
    348 
    349  private:
    350   class Endpoint;
    351   class ControlMessageProxyThunk;
    352   friend class Endpoint;
    353   friend class ControlMessageProxyThunk;
    354 
    355   // MessageWrapper objects are always destroyed under the controller's lock. On
    356   // destruction, if the message it wrappers contains
    357   // ScopedInterfaceEndpointHandles (which cannot be destructed under the
    358   // controller's lock), the wrapper unlocks to clean them up.
    359   class MessageWrapper {
    360    public:
    361     MessageWrapper() = default;
    362 
    363     MessageWrapper(ChannelAssociatedGroupController* controller,
    364                    mojo::Message message)
    365         : controller_(controller), value_(std::move(message)) {}
    366 
    367     MessageWrapper(MessageWrapper&& other)
    368         : controller_(other.controller_), value_(std::move(other.value_)) {}
    369 
    370     ~MessageWrapper() {
    371       if (value_.associated_endpoint_handles()->empty())
    372         return;
    373 
    374       controller_->lock_.AssertAcquired();
    375       {
    376         base::AutoUnlock unlocker(controller_->lock_);
    377         value_.mutable_associated_endpoint_handles()->clear();
    378       }
    379     }
    380 
    381     MessageWrapper& operator=(MessageWrapper&& other) {
    382       controller_ = other.controller_;
    383       value_ = std::move(other.value_);
    384       return *this;
    385     }
    386 
    387     mojo::Message& value() { return value_; }
    388 
    389    private:
    390     ChannelAssociatedGroupController* controller_ = nullptr;
    391     mojo::Message value_;
    392 
    393     DISALLOW_COPY_AND_ASSIGN(MessageWrapper);
    394   };
    395 
    396   class Endpoint : public base::RefCountedThreadSafe<Endpoint>,
    397                    public mojo::InterfaceEndpointController {
    398    public:
    399     Endpoint(ChannelAssociatedGroupController* controller, mojo::InterfaceId id)
    400         : controller_(controller), id_(id) {}
    401 
    402     mojo::InterfaceId id() const { return id_; }
    403 
    404     bool closed() const {
    405       controller_->lock_.AssertAcquired();
    406       return closed_;
    407     }
    408 
    409     void set_closed() {
    410       controller_->lock_.AssertAcquired();
    411       closed_ = true;
    412     }
    413 
    414     bool peer_closed() const {
    415       controller_->lock_.AssertAcquired();
    416       return peer_closed_;
    417     }
    418 
    419     void set_peer_closed() {
    420       controller_->lock_.AssertAcquired();
    421       peer_closed_ = true;
    422     }
    423 
    424     bool handle_created() const {
    425       controller_->lock_.AssertAcquired();
    426       return handle_created_;
    427     }
    428 
    429     void set_handle_created() {
    430       controller_->lock_.AssertAcquired();
    431       handle_created_ = true;
    432     }
    433 
    434     const base::Optional<mojo::DisconnectReason>& disconnect_reason() const {
    435       return disconnect_reason_;
    436     }
    437 
    438     void set_disconnect_reason(
    439         const base::Optional<mojo::DisconnectReason>& disconnect_reason) {
    440       disconnect_reason_ = disconnect_reason;
    441     }
    442 
    443     base::SequencedTaskRunner* task_runner() const {
    444       return task_runner_.get();
    445     }
    446 
    447     mojo::InterfaceEndpointClient* client() const {
    448       controller_->lock_.AssertAcquired();
    449       return client_;
    450     }
    451 
    452     void AttachClient(mojo::InterfaceEndpointClient* client,
    453                       scoped_refptr<base::SequencedTaskRunner> runner) {
    454       controller_->lock_.AssertAcquired();
    455       DCHECK(!client_);
    456       DCHECK(!closed_);
    457       DCHECK(runner->RunsTasksInCurrentSequence());
    458 
    459       task_runner_ = std::move(runner);
    460       client_ = client;
    461     }
    462 
    463     void DetachClient() {
    464       controller_->lock_.AssertAcquired();
    465       DCHECK(client_);
    466       DCHECK(task_runner_->RunsTasksInCurrentSequence());
    467       DCHECK(!closed_);
    468 
    469       task_runner_ = nullptr;
    470       client_ = nullptr;
    471       sync_watcher_.reset();
    472     }
    473 
    474     uint32_t EnqueueSyncMessage(MessageWrapper message) {
    475       controller_->lock_.AssertAcquired();
    476       uint32_t id = GenerateSyncMessageId();
    477       sync_messages_.emplace(id, std::move(message));
    478       SignalSyncMessageEvent();
    479       return id;
    480     }
    481 
    482     void SignalSyncMessageEvent() {
    483       controller_->lock_.AssertAcquired();
    484 
    485       if (sync_watcher_)
    486         sync_watcher_->SignalEvent();
    487     }
    488 
    489     MessageWrapper PopSyncMessage(uint32_t id) {
    490       controller_->lock_.AssertAcquired();
    491       if (sync_messages_.empty() || sync_messages_.front().first != id)
    492         return MessageWrapper();
    493       MessageWrapper message = std::move(sync_messages_.front().second);
    494       sync_messages_.pop();
    495       return message;
    496     }
    497 
    498     // mojo::InterfaceEndpointController:
    499     bool SendMessage(mojo::Message* message) override {
    500       DCHECK(task_runner_->RunsTasksInCurrentSequence());
    501       message->set_interface_id(id_);
    502       return controller_->SendMessage(message);
    503     }
    504 
    505     void AllowWokenUpBySyncWatchOnSameThread() override {
    506       DCHECK(task_runner_->RunsTasksInCurrentSequence());
    507 
    508       EnsureSyncWatcherExists();
    509       sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
    510     }
    511 
    512     bool SyncWatch(const bool* should_stop) override {
    513       DCHECK(task_runner_->RunsTasksInCurrentSequence());
    514 
    515       // It's not legal to make sync calls from the master endpoint's thread,
    516       // and in fact they must only happen from the proxy task runner.
    517       DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
    518       DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
    519 
    520       EnsureSyncWatcherExists();
    521       return sync_watcher_->SyncWatch(should_stop);
    522     }
    523 
    524    private:
    525     friend class base::RefCountedThreadSafe<Endpoint>;
    526 
    527     ~Endpoint() override {
    528       controller_->lock_.AssertAcquired();
    529       DCHECK(!client_);
    530       DCHECK(closed_);
    531       DCHECK(peer_closed_);
    532       DCHECK(!sync_watcher_);
    533     }
    534 
    535     void OnSyncMessageEventReady() {
    536       DCHECK(task_runner_->RunsTasksInCurrentSequence());
    537 
    538       scoped_refptr<Endpoint> keepalive(this);
    539       scoped_refptr<AssociatedGroupController> controller_keepalive(
    540           controller_);
    541       base::AutoLock locker(controller_->lock_);
    542       bool more_to_process = false;
    543       if (!sync_messages_.empty()) {
    544         MessageWrapper message_wrapper =
    545             std::move(sync_messages_.front().second);
    546         sync_messages_.pop();
    547 
    548         bool dispatch_succeeded;
    549         mojo::InterfaceEndpointClient* client = client_;
    550         {
    551           base::AutoUnlock unlocker(controller_->lock_);
    552           dispatch_succeeded =
    553               client->HandleIncomingMessage(&message_wrapper.value());
    554         }
    555 
    556         if (!sync_messages_.empty())
    557           more_to_process = true;
    558 
    559         if (!dispatch_succeeded)
    560           controller_->RaiseError();
    561       }
    562 
    563       if (!more_to_process)
    564         sync_watcher_->ResetEvent();
    565 
    566       // If there are no queued sync messages and the peer has closed, there
    567       // there won't be incoming sync messages in the future. If any
    568       // SyncWatch() calls are on the stack for this endpoint, resetting the
    569       // watcher will allow them to exit as the stack undwinds.
    570       if (!more_to_process && peer_closed_)
    571         sync_watcher_.reset();
    572     }
    573 
    574     void EnsureSyncWatcherExists() {
    575       DCHECK(task_runner_->RunsTasksInCurrentSequence());
    576       if (sync_watcher_)
    577         return;
    578 
    579       base::AutoLock locker(controller_->lock_);
    580       sync_watcher_ = std::make_unique<mojo::SequenceLocalSyncEventWatcher>(
    581           base::BindRepeating(&Endpoint::OnSyncMessageEventReady,
    582                               base::Unretained(this)));
    583       if (peer_closed_ || !sync_messages_.empty())
    584         SignalSyncMessageEvent();
    585     }
    586 
    587     uint32_t GenerateSyncMessageId() {
    588       // Overflow is fine.
    589       uint32_t id = next_sync_message_id_++;
    590       DCHECK(sync_messages_.empty() || sync_messages_.front().first != id);
    591       return id;
    592     }
    593 
    594     ChannelAssociatedGroupController* const controller_;
    595     const mojo::InterfaceId id_;
    596 
    597     bool closed_ = false;
    598     bool peer_closed_ = false;
    599     bool handle_created_ = false;
    600     base::Optional<mojo::DisconnectReason> disconnect_reason_;
    601     mojo::InterfaceEndpointClient* client_ = nullptr;
    602     scoped_refptr<base::SequencedTaskRunner> task_runner_;
    603     std::unique_ptr<mojo::SequenceLocalSyncEventWatcher> sync_watcher_;
    604     base::queue<std::pair<uint32_t, MessageWrapper>> sync_messages_;
    605     uint32_t next_sync_message_id_ = 0;
    606 
    607     DISALLOW_COPY_AND_ASSIGN(Endpoint);
    608   };
    609 
    610   class ControlMessageProxyThunk : public MessageReceiver {
    611    public:
    612     explicit ControlMessageProxyThunk(
    613         ChannelAssociatedGroupController* controller)
    614         : controller_(controller) {}
    615 
    616    private:
    617     // MessageReceiver:
    618     bool Accept(mojo::Message* message) override {
    619       return controller_->SendMessage(message);
    620     }
    621 
    622     ChannelAssociatedGroupController* controller_;
    623 
    624     DISALLOW_COPY_AND_ASSIGN(ControlMessageProxyThunk);
    625   };
    626 
    627   ~ChannelAssociatedGroupController() override {
    628     DCHECK(!connector_);
    629 
    630     base::AutoLock locker(lock_);
    631     for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
    632       Endpoint* endpoint = iter->second.get();
    633       ++iter;
    634 
    635       if (!endpoint->closed()) {
    636         // This happens when a NotifyPeerEndpointClosed message been received,
    637         // but the interface ID hasn't been used to create local endpoint
    638         // handle.
    639         DCHECK(!endpoint->client());
    640         DCHECK(endpoint->peer_closed());
    641         MarkClosedAndMaybeRemove(endpoint);
    642       } else {
    643         MarkPeerClosedAndMaybeRemove(endpoint);
    644       }
    645     }
    646 
    647     DCHECK(endpoints_.empty());
    648 
    649     GetMemoryDumpProvider().RemoveController(this);
    650   }
    651 
    652   bool SendMessage(mojo::Message* message) {
    653     if (task_runner_->BelongsToCurrentThread()) {
    654       DCHECK(thread_checker_.CalledOnValidThread());
    655       if (!connector_ || paused_) {
    656         if (!shut_down_) {
    657           base::AutoLock lock(outgoing_messages_lock_);
    658           outgoing_messages_.emplace_back(std::move(*message));
    659         }
    660         return true;
    661       }
    662       return connector_->Accept(message);
    663     } else {
    664       // Do a message size check here so we don't lose valuable stack
    665       // information to the task scheduler.
    666       CHECK_LE(message->data_num_bytes(), Channel::kMaximumMessageSize);
    667 
    668       // We always post tasks to the master endpoint thread when called from
    669       // other threads in order to simulate IPC::ChannelProxy::Send behavior.
    670       task_runner_->PostTask(
    671           FROM_HERE,
    672           base::Bind(
    673               &ChannelAssociatedGroupController::SendMessageOnMasterThread,
    674               this, base::Passed(message)));
    675       return true;
    676     }
    677   }
    678 
    679   void SendMessageOnMasterThread(mojo::Message message) {
    680     DCHECK(thread_checker_.CalledOnValidThread());
    681     if (!SendMessage(&message))
    682       RaiseError();
    683   }
    684 
    685   void OnPipeError() {
    686     DCHECK(thread_checker_.CalledOnValidThread());
    687 
    688     // We keep |this| alive here because it's possible for the notifications
    689     // below to release all other references.
    690     scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
    691 
    692     base::AutoLock locker(lock_);
    693     encountered_error_ = true;
    694 
    695     std::vector<scoped_refptr<Endpoint>> endpoints_to_notify;
    696     for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
    697       Endpoint* endpoint = iter->second.get();
    698       ++iter;
    699 
    700       if (endpoint->client())
    701         endpoints_to_notify.push_back(endpoint);
    702 
    703       MarkPeerClosedAndMaybeRemove(endpoint);
    704     }
    705 
    706     for (auto& endpoint : endpoints_to_notify) {
    707       // Because a notification may in turn detach any endpoint, we have to
    708       // check each client again here.
    709       if (endpoint->client())
    710         NotifyEndpointOfError(endpoint.get(), false /* force_async */);
    711     }
    712   }
    713 
    714   void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) {
    715     lock_.AssertAcquired();
    716     DCHECK(endpoint->task_runner() && endpoint->client());
    717     if (endpoint->task_runner()->RunsTasksInCurrentSequence() && !force_async) {
    718       mojo::InterfaceEndpointClient* client = endpoint->client();
    719       base::Optional<mojo::DisconnectReason> reason(
    720           endpoint->disconnect_reason());
    721 
    722       base::AutoUnlock unlocker(lock_);
    723       client->NotifyError(reason);
    724     } else {
    725       endpoint->task_runner()->PostTask(
    726           FROM_HERE,
    727           base::Bind(&ChannelAssociatedGroupController::
    728                          NotifyEndpointOfErrorOnEndpointThread,
    729                      this, endpoint->id(), base::Unretained(endpoint)));
    730     }
    731   }
    732 
    733   void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,
    734                                              Endpoint* endpoint) {
    735     base::AutoLock locker(lock_);
    736     auto iter = endpoints_.find(id);
    737     if (iter == endpoints_.end() || iter->second.get() != endpoint)
    738       return;
    739     if (!endpoint->client())
    740       return;
    741 
    742     DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
    743     NotifyEndpointOfError(endpoint, false /* force_async */);
    744   }
    745 
    746   void MarkClosedAndMaybeRemove(Endpoint* endpoint) {
    747     lock_.AssertAcquired();
    748     endpoint->set_closed();
    749     if (endpoint->closed() && endpoint->peer_closed())
    750       endpoints_.erase(endpoint->id());
    751   }
    752 
    753   void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) {
    754     lock_.AssertAcquired();
    755     endpoint->set_peer_closed();
    756     endpoint->SignalSyncMessageEvent();
    757     if (endpoint->closed() && endpoint->peer_closed())
    758       endpoints_.erase(endpoint->id());
    759   }
    760 
    761   Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) {
    762     lock_.AssertAcquired();
    763     DCHECK(!inserted || !*inserted);
    764 
    765     Endpoint* endpoint = FindEndpoint(id);
    766     if (!endpoint) {
    767       endpoint = new Endpoint(this, id);
    768       endpoints_.insert({id, endpoint});
    769       if (inserted)
    770         *inserted = true;
    771     }
    772     return endpoint;
    773   }
    774 
    775   Endpoint* FindEndpoint(mojo::InterfaceId id) {
    776     lock_.AssertAcquired();
    777     auto iter = endpoints_.find(id);
    778     return iter != endpoints_.end() ? iter->second.get() : nullptr;
    779   }
    780 
    781   // mojo::MessageReceiver:
    782   bool Accept(mojo::Message* message) override {
    783     DCHECK(thread_checker_.CalledOnValidThread());
    784 
    785     if (!message->DeserializeAssociatedEndpointHandles(this))
    786       return false;
    787 
    788     if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
    789       return control_message_handler_.Accept(message);
    790 
    791     mojo::InterfaceId id = message->interface_id();
    792     DCHECK(mojo::IsValidInterfaceId(id));
    793 
    794     base::AutoLock locker(lock_);
    795     Endpoint* endpoint = FindEndpoint(id);
    796     if (!endpoint)
    797       return true;
    798 
    799     mojo::InterfaceEndpointClient* client = endpoint->client();
    800     if (!client || !endpoint->task_runner()->RunsTasksInCurrentSequence()) {
    801       // No client has been bound yet or the client runs tasks on another
    802       // thread. We assume the other thread must always be the one on which
    803       // |proxy_task_runner_| runs tasks, since that's the only valid scenario.
    804       //
    805       // If the client is not yet bound, it must be bound by the time this task
    806       // runs or else it's programmer error.
    807       DCHECK(proxy_task_runner_);
    808 
    809       if (message->has_flag(mojo::Message::kFlagIsSync)) {
    810         MessageWrapper message_wrapper(this, std::move(*message));
    811         // Sync messages may need to be handled by the endpoint if it's blocking
    812         // on a sync reply. We pass ownership of the message to the endpoint's
    813         // sync message queue. If the endpoint was blocking, it will dequeue the
    814         // message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
    815         // call will dequeue the message and dispatch it.
    816         uint32_t message_id =
    817             endpoint->EnqueueSyncMessage(std::move(message_wrapper));
    818         proxy_task_runner_->PostTask(
    819             FROM_HERE,
    820             base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage,
    821                        this, id, message_id));
    822         return true;
    823       }
    824 
    825       proxy_task_runner_->PostTask(
    826           FROM_HERE,
    827           base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread,
    828                      this, base::Passed(message)));
    829       return true;
    830     }
    831 
    832     // We do not expect to receive sync responses on the master endpoint thread.
    833     // If it's happening, it's a bug.
    834     DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) ||
    835            !message->has_flag(mojo::Message::kFlagIsResponse));
    836 
    837     base::AutoUnlock unlocker(lock_);
    838     return client->HandleIncomingMessage(message);
    839   }
    840 
    841   void AcceptOnProxyThread(mojo::Message message) {
    842     DCHECK(proxy_task_runner_->BelongsToCurrentThread());
    843 
    844     mojo::InterfaceId id = message.interface_id();
    845     DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id));
    846 
    847     base::AutoLock locker(lock_);
    848     Endpoint* endpoint = FindEndpoint(id);
    849     if (!endpoint)
    850       return;
    851 
    852     mojo::InterfaceEndpointClient* client = endpoint->client();
    853     if (!client)
    854       return;
    855 
    856     DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
    857 
    858     // Sync messages should never make their way to this method.
    859     DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));
    860 
    861     bool result = false;
    862     {
    863       base::AutoUnlock unlocker(lock_);
    864       result = client->HandleIncomingMessage(&message);
    865     }
    866 
    867     if (!result)
    868       RaiseError();
    869   }
    870 
    871   void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) {
    872     DCHECK(proxy_task_runner_->BelongsToCurrentThread());
    873 
    874     base::AutoLock locker(lock_);
    875     Endpoint* endpoint = FindEndpoint(interface_id);
    876     if (!endpoint)
    877       return;
    878 
    879     // Careful, if the endpoint is detached its members are cleared. Check for
    880     // that before dereferencing.
    881     mojo::InterfaceEndpointClient* client = endpoint->client();
    882     if (!client)
    883       return;
    884 
    885     DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
    886     MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id);
    887 
    888     // The message must have already been dequeued by the endpoint waking up
    889     // from a sync wait. Nothing to do.
    890     if (message_wrapper.value().IsNull())
    891       return;
    892 
    893     bool result = false;
    894     {
    895       base::AutoUnlock unlocker(lock_);
    896       result = client->HandleIncomingMessage(&message_wrapper.value());
    897     }
    898 
    899     if (!result)
    900       RaiseError();
    901   }
    902 
    903   // mojo::PipeControlMessageHandlerDelegate:
    904   bool OnPeerAssociatedEndpointClosed(
    905       mojo::InterfaceId id,
    906       const base::Optional<mojo::DisconnectReason>& reason) override {
    907     DCHECK(thread_checker_.CalledOnValidThread());
    908 
    909     scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
    910     base::AutoLock locker(lock_);
    911     scoped_refptr<Endpoint> endpoint = FindOrInsertEndpoint(id, nullptr);
    912     if (reason)
    913       endpoint->set_disconnect_reason(reason);
    914     if (!endpoint->peer_closed()) {
    915       if (endpoint->client())
    916         NotifyEndpointOfError(endpoint.get(), false /* force_async */);
    917       MarkPeerClosedAndMaybeRemove(endpoint.get());
    918     }
    919 
    920     return true;
    921   }
    922 
    923   // Checked in places which must be run on the master endpoint's thread.
    924   base::ThreadChecker thread_checker_;
    925 
    926   scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
    927 
    928   scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
    929   const bool set_interface_id_namespace_bit_;
    930   bool paused_ = false;
    931   std::unique_ptr<mojo::Connector> connector_;
    932   mojo::FilterChain filters_;
    933   mojo::PipeControlMessageHandler control_message_handler_;
    934   ControlMessageProxyThunk control_message_proxy_thunk_;
    935 
    936   // NOTE: It is unsafe to call into this object while holding |lock_|.
    937   mojo::PipeControlMessageProxy control_message_proxy_;
    938 
    939   // Guards access to |outgoing_messages_| only. Used to support memory dumps
    940   // which may be triggered from any thread.
    941   base::Lock outgoing_messages_lock_;
    942 
    943   // Outgoing messages that were sent before this controller was bound to a
    944   // real message pipe.
    945   std::vector<mojo::Message> outgoing_messages_;
    946 
    947   // Guards the fields below for thread-safe access.
    948   base::Lock lock_;
    949 
    950   bool encountered_error_ = false;
    951   bool shut_down_ = false;
    952 
    953   // ID #1 is reserved for the mojom::Channel interface.
    954   uint32_t next_interface_id_ = 2;
    955 
    956   std::map<uint32_t, scoped_refptr<Endpoint>> endpoints_;
    957 
    958   DISALLOW_COPY_AND_ASSIGN(ChannelAssociatedGroupController);
    959 };
    960 
    961 bool ControllerMemoryDumpProvider::OnMemoryDump(
    962     const base::trace_event::MemoryDumpArgs& args,
    963     base::trace_event::ProcessMemoryDump* pmd) {
    964   base::AutoLock lock(lock_);
    965   for (auto* controller : controllers_) {
    966     base::trace_event::MemoryAllocatorDump* dump = pmd->CreateAllocatorDump(
    967         base::StringPrintf("mojo/queued_ipc_channel_message/0x%" PRIxPTR,
    968                            reinterpret_cast<uintptr_t>(controller)));
    969     dump->AddScalar(base::trace_event::MemoryAllocatorDump::kNameObjectCount,
    970                     base::trace_event::MemoryAllocatorDump::kUnitsObjects,
    971                     controller->GetQueuedMessageCount());
    972   }
    973 
    974   return true;
    975 }
    976 
    977 class MojoBootstrapImpl : public MojoBootstrap {
    978  public:
    979   MojoBootstrapImpl(
    980       mojo::ScopedMessagePipeHandle handle,
    981       const scoped_refptr<ChannelAssociatedGroupController> controller)
    982       : controller_(controller),
    983         associated_group_(controller),
    984         handle_(std::move(handle)) {}
    985 
    986   ~MojoBootstrapImpl() override {
    987     controller_->ShutDown();
    988   }
    989 
    990  private:
    991   void Connect(mojom::ChannelAssociatedPtr* sender,
    992                mojom::ChannelAssociatedRequest* receiver) override {
    993     controller_->Bind(std::move(handle_));
    994     controller_->CreateChannelEndpoints(sender, receiver);
    995   }
    996 
    997   void Pause() override {
    998     controller_->Pause();
    999   }
   1000 
   1001   void Unpause() override {
   1002     controller_->Unpause();
   1003   }
   1004 
   1005   void Flush() override {
   1006     controller_->FlushOutgoingMessages();
   1007   }
   1008 
   1009   mojo::AssociatedGroup* GetAssociatedGroup() override {
   1010     return &associated_group_;
   1011   }
   1012 
   1013   scoped_refptr<ChannelAssociatedGroupController> controller_;
   1014   mojo::AssociatedGroup associated_group_;
   1015 
   1016   mojo::ScopedMessagePipeHandle handle_;
   1017 
   1018   DISALLOW_COPY_AND_ASSIGN(MojoBootstrapImpl);
   1019 };
   1020 
   1021 }  // namespace
   1022 
   1023 // static
   1024 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create(
   1025     mojo::ScopedMessagePipeHandle handle,
   1026     Channel::Mode mode,
   1027     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
   1028     const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
   1029   return std::make_unique<MojoBootstrapImpl>(
   1030       std::move(handle),
   1031       new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER,
   1032                                            ipc_task_runner, proxy_task_runner));
   1033 }
   1034 
   1035 }  // namespace IPC
   1036