Home | History | Annotate | Download | only in system
      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/edk/system/node_controller.h"
      6 
      7 #include <algorithm>
      8 #include <limits>
      9 
     10 #include "base/bind.h"
     11 #include "base/location.h"
     12 #include "base/logging.h"
     13 #include "base/macros.h"
     14 #include "base/message_loop/message_loop.h"
     15 #include "base/metrics/histogram_macros.h"
     16 #include "base/process/process_handle.h"
     17 #include "base/rand_util.h"
     18 #include "base/time/time.h"
     19 #include "base/timer/elapsed_timer.h"
     20 #include "mojo/edk/embedder/embedder_internal.h"
     21 #include "mojo/edk/embedder/named_platform_channel_pair.h"
     22 #include "mojo/edk/embedder/named_platform_handle.h"
     23 #include "mojo/edk/embedder/platform_channel_pair.h"
     24 #include "mojo/edk/system/broker.h"
     25 #include "mojo/edk/system/broker_host.h"
     26 #include "mojo/edk/system/core.h"
     27 #include "mojo/edk/system/ports_message.h"
     28 #include "mojo/edk/system/request_context.h"
     29 
     30 #if defined(OS_MACOSX) && !defined(OS_IOS)
     31 #include "mojo/edk/system/mach_port_relay.h"
     32 #endif
     33 
     34 #if !defined(OS_NACL)
     35 #include "crypto/random.h"
     36 #endif
     37 
     38 namespace mojo {
     39 namespace edk {
     40 
     41 namespace {
     42 
     43 #if defined(OS_NACL)
     44 template <typename T>
     45 void GenerateRandomName(T* out) { base::RandBytes(out, sizeof(T)); }
     46 #else
     47 template <typename T>
     48 void GenerateRandomName(T* out) { crypto::RandBytes(out, sizeof(T)); }
     49 #endif
     50 
     51 ports::NodeName GetRandomNodeName() {
     52   ports::NodeName name;
     53   GenerateRandomName(&name);
     54   return name;
     55 }
     56 
     57 void RecordPeerCount(size_t count) {
     58   DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
     59 
     60   // 8k is the maximum number of file descriptors allowed in Chrome.
     61   UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.ConnectedPeers",
     62                               static_cast<int32_t>(count),
     63                               1 /* min */,
     64                               8000 /* max */,
     65                               50 /* bucket count */);
     66 }
     67 
     68 void RecordPendingChildCount(size_t count) {
     69   DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
     70 
     71   // 8k is the maximum number of file descriptors allowed in Chrome.
     72   UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.PendingChildren",
     73                               static_cast<int32_t>(count),
     74                               1 /* min */,
     75                               8000 /* max */,
     76                               50 /* bucket count */);
     77 }
     78 
     79 bool ParsePortsMessage(Channel::Message* message,
     80                        void** data,
     81                        size_t* num_data_bytes,
     82                        size_t* num_header_bytes,
     83                        size_t* num_payload_bytes,
     84                        size_t* num_ports_bytes) {
     85   DCHECK(data && num_data_bytes && num_header_bytes && num_payload_bytes &&
     86          num_ports_bytes);
     87 
     88   NodeChannel::GetPortsMessageData(message, data, num_data_bytes);
     89   if (!*num_data_bytes)
     90     return false;
     91 
     92   if (!ports::Message::Parse(*data, *num_data_bytes, num_header_bytes,
     93                              num_payload_bytes, num_ports_bytes)) {
     94     return false;
     95   }
     96 
     97   return true;
     98 }
     99 
    100 // Used by NodeController to watch for shutdown. Since no IO can happen once
    101 // the IO thread is killed, the NodeController can cleanly drop all its peers
    102 // at that time.
    103 class ThreadDestructionObserver :
    104     public base::MessageLoop::DestructionObserver {
    105  public:
    106   static void Create(scoped_refptr<base::TaskRunner> task_runner,
    107                      const base::Closure& callback) {
    108     if (task_runner->RunsTasksOnCurrentThread()) {
    109       // Owns itself.
    110       new ThreadDestructionObserver(callback);
    111     } else {
    112       task_runner->PostTask(FROM_HERE,
    113                             base::Bind(&Create, task_runner, callback));
    114     }
    115   }
    116 
    117  private:
    118   explicit ThreadDestructionObserver(const base::Closure& callback)
    119       : callback_(callback) {
    120     base::MessageLoop::current()->AddDestructionObserver(this);
    121   }
    122 
    123   ~ThreadDestructionObserver() override {
    124     base::MessageLoop::current()->RemoveDestructionObserver(this);
    125   }
    126 
    127   // base::MessageLoop::DestructionObserver:
    128   void WillDestroyCurrentMessageLoop() override {
    129     callback_.Run();
    130     delete this;
    131   }
    132 
    133   const base::Closure callback_;
    134 
    135   DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver);
    136 };
    137 
    138 }  // namespace
    139 
    140 NodeController::~NodeController() {}
    141 
    142 NodeController::NodeController(Core* core)
    143     : core_(core),
    144       name_(GetRandomNodeName()),
    145       node_(new ports::Node(name_, this)) {
    146   DVLOG(1) << "Initializing node " << name_;
    147 }
    148 
    149 #if defined(OS_MACOSX) && !defined(OS_IOS)
    150 void NodeController::CreateMachPortRelay(
    151     base::PortProvider* port_provider) {
    152   base::AutoLock lock(mach_port_relay_lock_);
    153   DCHECK(!mach_port_relay_);
    154   mach_port_relay_.reset(new MachPortRelay(port_provider));
    155 }
    156 #endif
    157 
    158 void NodeController::SetIOTaskRunner(
    159     scoped_refptr<base::TaskRunner> task_runner) {
    160   io_task_runner_ = task_runner;
    161   ThreadDestructionObserver::Create(
    162       io_task_runner_,
    163       base::Bind(&NodeController::DropAllPeers, base::Unretained(this)));
    164 }
    165 
    166 void NodeController::ConnectToChild(
    167     base::ProcessHandle process_handle,
    168     ConnectionParams connection_params,
    169     const std::string& child_token,
    170     const ProcessErrorCallback& process_error_callback) {
    171   // Generate the temporary remote node name here so that it can be associated
    172   // with the embedder's child_token. If an error occurs in the child process
    173   // after it is launched, but before any reserved ports are connected, this can
    174   // be used to clean up any dangling ports.
    175   ports::NodeName node_name;
    176   GenerateRandomName(&node_name);
    177 
    178   {
    179     base::AutoLock lock(reserved_ports_lock_);
    180     bool inserted = pending_child_tokens_.insert(
    181         std::make_pair(node_name, child_token)).second;
    182     DCHECK(inserted);
    183   }
    184 
    185 #if defined(OS_WIN)
    186   // On Windows, we need to duplicate the process handle because we have no
    187   // control over its lifetime and it may become invalid by the time the posted
    188   // task runs.
    189   HANDLE dup_handle = INVALID_HANDLE_VALUE;
    190   BOOL ok = ::DuplicateHandle(
    191       base::GetCurrentProcessHandle(), process_handle,
    192       base::GetCurrentProcessHandle(), &dup_handle,
    193       0, FALSE, DUPLICATE_SAME_ACCESS);
    194   DPCHECK(ok);
    195   process_handle = dup_handle;
    196 #endif
    197 
    198   io_task_runner_->PostTask(
    199       FROM_HERE, base::Bind(&NodeController::ConnectToChildOnIOThread,
    200                             base::Unretained(this), process_handle,
    201                             base::Passed(&connection_params), node_name,
    202                             process_error_callback));
    203 }
    204 
    205 void NodeController::CloseChildPorts(const std::string& child_token) {
    206   std::vector<ports::PortRef> ports_to_close;
    207   {
    208     std::vector<std::string> port_tokens;
    209     base::AutoLock lock(reserved_ports_lock_);
    210     for (const auto& port : reserved_ports_) {
    211       if (port.second.child_token == child_token) {
    212         DVLOG(1) << "Closing reserved port " << port.second.port.name();
    213         ports_to_close.push_back(port.second.port);
    214         port_tokens.push_back(port.first);
    215       }
    216     }
    217 
    218     for (const auto& token : port_tokens)
    219       reserved_ports_.erase(token);
    220   }
    221 
    222   for (const auto& port : ports_to_close)
    223     node_->ClosePort(port);
    224 
    225   // Ensure local port closure messages are processed.
    226   AcceptIncomingMessages();
    227 }
    228 
    229 void NodeController::ClosePeerConnection(const std::string& peer_token) {
    230   io_task_runner_->PostTask(
    231       FROM_HERE, base::Bind(&NodeController::ClosePeerConnectionOnIOThread,
    232                             base::Unretained(this), peer_token));
    233 }
    234 
    235 void NodeController::ConnectToParent(ConnectionParams connection_params) {
    236 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
    237   // Use the bootstrap channel for the broker and receive the node's channel
    238   // synchronously as the first message from the broker.
    239   base::ElapsedTimer timer;
    240   broker_.reset(new Broker(connection_params.TakeChannelHandle()));
    241   ScopedPlatformHandle platform_handle = broker_->GetParentPlatformHandle();
    242   UMA_HISTOGRAM_TIMES("Mojo.System.GetParentPlatformHandleSyncTime",
    243                       timer.Elapsed());
    244 
    245   if (!platform_handle.is_valid()) {
    246     // Most likely the browser side of the channel has already been closed and
    247     // the broker was unable to negotiate a NodeChannel pipe. In this case we
    248     // can cancel parent connection.
    249     DVLOG(1) << "Cannot connect to invalid parent channel.";
    250     CancelPendingPortMerges();
    251     return;
    252   }
    253   connection_params = ConnectionParams(std::move(platform_handle));
    254 #endif
    255 
    256   io_task_runner_->PostTask(
    257       FROM_HERE,
    258       base::Bind(&NodeController::ConnectToParentOnIOThread,
    259                  base::Unretained(this), base::Passed(&connection_params)));
    260 }
    261 
    262 void NodeController::ConnectToPeer(ConnectionParams connection_params,
    263                                    const ports::PortRef& port,
    264                                    const std::string& peer_token) {
    265   ports::NodeName node_name;
    266   GenerateRandomName(&node_name);
    267   io_task_runner_->PostTask(
    268       FROM_HERE,
    269       base::Bind(&NodeController::ConnectToPeerOnIOThread,
    270                  base::Unretained(this), base::Passed(&connection_params),
    271                  node_name, port, peer_token));
    272 }
    273 
    274 void NodeController::SetPortObserver(const ports::PortRef& port,
    275                                      scoped_refptr<PortObserver> observer) {
    276   node_->SetUserData(port, std::move(observer));
    277 }
    278 
    279 void NodeController::ClosePort(const ports::PortRef& port) {
    280   SetPortObserver(port, nullptr);
    281   int rv = node_->ClosePort(port);
    282   DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name();
    283 
    284   AcceptIncomingMessages();
    285 }
    286 
    287 int NodeController::SendMessage(const ports::PortRef& port,
    288                                 std::unique_ptr<PortsMessage> message) {
    289   ports::ScopedMessage ports_message(message.release());
    290   int rv = node_->SendMessage(port, std::move(ports_message));
    291 
    292   AcceptIncomingMessages();
    293   return rv;
    294 }
    295 
    296 void NodeController::ReservePort(const std::string& token,
    297                                  const ports::PortRef& port,
    298                                  const std::string& child_token) {
    299   DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
    300            << token;
    301 
    302   base::AutoLock lock(reserved_ports_lock_);
    303   auto result = reserved_ports_.insert(
    304       std::make_pair(token, ReservedPort{port, child_token}));
    305   DCHECK(result.second);
    306 }
    307 
    308 void NodeController::MergePortIntoParent(const std::string& token,
    309                                          const ports::PortRef& port) {
    310   bool was_merged = false;
    311   {
    312     // This request may be coming from within the process that reserved the
    313     // "parent" side (e.g. for Chrome single-process mode), so if this token is
    314     // reserved locally, merge locally instead.
    315     base::AutoLock lock(reserved_ports_lock_);
    316     auto it = reserved_ports_.find(token);
    317     if (it != reserved_ports_.end()) {
    318       node_->MergePorts(port, name_, it->second.port.name());
    319       reserved_ports_.erase(it);
    320       was_merged = true;
    321     }
    322   }
    323   if (was_merged) {
    324     AcceptIncomingMessages();
    325     return;
    326   }
    327 
    328   scoped_refptr<NodeChannel> parent;
    329   bool reject_merge = false;
    330   {
    331     // Hold |pending_port_merges_lock_| while getting |parent|. Otherwise,
    332     // there is a race where the parent can be set, and |pending_port_merges_|
    333     // be processed between retrieving |parent| and adding the merge to
    334     // |pending_port_merges_|.
    335     base::AutoLock lock(pending_port_merges_lock_);
    336     parent = GetParentChannel();
    337     if (reject_pending_merges_) {
    338       reject_merge = true;
    339     } else if (!parent) {
    340       pending_port_merges_.push_back(std::make_pair(token, port));
    341       return;
    342     }
    343   }
    344   if (reject_merge) {
    345     node_->ClosePort(port);
    346     DVLOG(2) << "Rejecting port merge for token " << token
    347              << " due to closed parent channel.";
    348     AcceptIncomingMessages();
    349     return;
    350   }
    351 
    352   parent->RequestPortMerge(port.name(), token);
    353 }
    354 
    355 int NodeController::MergeLocalPorts(const ports::PortRef& port0,
    356                                     const ports::PortRef& port1) {
    357   int rv = node_->MergeLocalPorts(port0, port1);
    358   AcceptIncomingMessages();
    359   return rv;
    360 }
    361 
    362 scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
    363     size_t num_bytes) {
    364 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
    365   // Shared buffer creation failure is fatal, so always use the broker when we
    366   // have one. This does mean that a non-root process that has children will use
    367   // the broker for shared buffer creation even though that process is
    368   // privileged.
    369   if (broker_) {
    370     return broker_->GetSharedBuffer(num_bytes);
    371   }
    372 #endif
    373   return PlatformSharedBuffer::Create(num_bytes);
    374 }
    375 
    376 void NodeController::RequestShutdown(const base::Closure& callback) {
    377   {
    378     base::AutoLock lock(shutdown_lock_);
    379     shutdown_callback_ = callback;
    380     shutdown_callback_flag_.Set(true);
    381   }
    382 
    383   AttemptShutdownIfRequested();
    384 }
    385 
    386 void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node,
    387                                           const std::string& error) {
    388   scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node);
    389   if (peer)
    390     peer->NotifyBadMessage(error);
    391 }
    392 
    393 void NodeController::ConnectToChildOnIOThread(
    394     base::ProcessHandle process_handle,
    395     ConnectionParams connection_params,
    396     ports::NodeName token,
    397     const ProcessErrorCallback& process_error_callback) {
    398   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    399 
    400 #if !defined(OS_MACOSX) && !defined(OS_NACL)
    401   PlatformChannelPair node_channel;
    402   ScopedPlatformHandle server_handle = node_channel.PassServerHandle();
    403   // BrokerHost owns itself.
    404   BrokerHost* broker_host =
    405       new BrokerHost(process_handle, connection_params.TakeChannelHandle());
    406   bool channel_ok = broker_host->SendChannel(node_channel.PassClientHandle());
    407 
    408 #if defined(OS_WIN)
    409   if (!channel_ok) {
    410     // On Windows the above operation may fail if the channel is crossing a
    411     // session boundary. In that case we fall back to a named pipe.
    412     NamedPlatformChannelPair named_channel;
    413     server_handle = named_channel.PassServerHandle();
    414     broker_host->SendNamedChannel(named_channel.handle().name);
    415   }
    416 #else
    417   CHECK(channel_ok);
    418 #endif  // defined(OS_WIN)
    419 
    420   scoped_refptr<NodeChannel> channel =
    421       NodeChannel::Create(this, ConnectionParams(std::move(server_handle)),
    422                           io_task_runner_, process_error_callback);
    423 
    424 #else  // !defined(OS_MACOSX) && !defined(OS_NACL)
    425   scoped_refptr<NodeChannel> channel =
    426       NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
    427                           process_error_callback);
    428 #endif  // !defined(OS_MACOSX) && !defined(OS_NACL)
    429 
    430   // We set up the child channel with a temporary name so it can be identified
    431   // as a pending child if it writes any messages to the channel. We may start
    432   // receiving messages from it (though we shouldn't) as soon as Start() is
    433   // called below.
    434 
    435   pending_children_.insert(std::make_pair(token, channel));
    436   RecordPendingChildCount(pending_children_.size());
    437 
    438   channel->SetRemoteNodeName(token);
    439   channel->SetRemoteProcessHandle(process_handle);
    440   channel->Start();
    441 
    442   channel->AcceptChild(name_, token);
    443 }
    444 
    445 void NodeController::ConnectToParentOnIOThread(
    446     ConnectionParams connection_params) {
    447   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    448 
    449   {
    450     base::AutoLock lock(parent_lock_);
    451     DCHECK(parent_name_ == ports::kInvalidNodeName);
    452 
    453     // At this point we don't know the parent's name, so we can't yet insert it
    454     // into our |peers_| map. That will happen as soon as we receive an
    455     // AcceptChild message from them.
    456     bootstrap_parent_channel_ =
    457         NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
    458                             ProcessErrorCallback());
    459     // Prevent the parent pipe handle from being closed on shutdown. Pipe
    460     // closure is used by the parent to detect the child process has exited.
    461     // Relying on message pipes to be closed is not enough because the parent
    462     // may see the message pipe closure before the child is dead, causing the
    463     // child process to be unexpectedly SIGKILL'd.
    464     bootstrap_parent_channel_->LeakHandleOnShutdown();
    465   }
    466   bootstrap_parent_channel_->Start();
    467 }
    468 
    469 void NodeController::ConnectToPeerOnIOThread(ConnectionParams connection_params,
    470                                              ports::NodeName token,
    471                                              ports::PortRef port,
    472                                              const std::string& peer_token) {
    473   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    474 
    475   scoped_refptr<NodeChannel> channel = NodeChannel::Create(
    476       this, std::move(connection_params), io_task_runner_, {});
    477   peer_connections_.insert(
    478       {token, PeerConnection{channel, port, peer_token}});
    479   peers_by_token_.insert({peer_token, token});
    480 
    481   channel->SetRemoteNodeName(token);
    482   channel->Start();
    483 
    484   channel->AcceptPeer(name_, token, port.name());
    485 }
    486 
    487 void NodeController::ClosePeerConnectionOnIOThread(
    488     const std::string& peer_token) {
    489   RequestContext request_context(RequestContext::Source::SYSTEM);
    490   auto peer = peers_by_token_.find(peer_token);
    491   // The connection may already be closed.
    492   if (peer == peers_by_token_.end())
    493     return;
    494 
    495   // |peer| may be removed so make a copy of |name|.
    496   ports::NodeName name = peer->second;
    497   DropPeer(name, nullptr);
    498 }
    499 
    500 scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
    501     const ports::NodeName& name) {
    502   base::AutoLock lock(peers_lock_);
    503   auto it = peers_.find(name);
    504   if (it == peers_.end())
    505     return nullptr;
    506   return it->second;
    507 }
    508 
    509 scoped_refptr<NodeChannel> NodeController::GetParentChannel() {
    510   ports::NodeName parent_name;
    511   {
    512     base::AutoLock lock(parent_lock_);
    513     parent_name = parent_name_;
    514   }
    515   return GetPeerChannel(parent_name);
    516 }
    517 
    518 scoped_refptr<NodeChannel> NodeController::GetBrokerChannel() {
    519   ports::NodeName broker_name;
    520   {
    521     base::AutoLock lock(broker_lock_);
    522     broker_name = broker_name_;
    523   }
    524   return GetPeerChannel(broker_name);
    525 }
    526 
    527 void NodeController::AddPeer(const ports::NodeName& name,
    528                              scoped_refptr<NodeChannel> channel,
    529                              bool start_channel) {
    530   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    531 
    532   DCHECK(name != ports::kInvalidNodeName);
    533   DCHECK(channel);
    534 
    535   channel->SetRemoteNodeName(name);
    536 
    537   OutgoingMessageQueue pending_messages;
    538   {
    539     base::AutoLock lock(peers_lock_);
    540     if (peers_.find(name) != peers_.end()) {
    541       // This can happen normally if two nodes race to be introduced to each
    542       // other. The losing pipe will be silently closed and introduction should
    543       // not be affected.
    544       DVLOG(1) << "Ignoring duplicate peer name " << name;
    545       return;
    546     }
    547 
    548     auto result = peers_.insert(std::make_pair(name, channel));
    549     DCHECK(result.second);
    550 
    551     DVLOG(2) << "Accepting new peer " << name << " on node " << name_;
    552 
    553     RecordPeerCount(peers_.size());
    554 
    555     auto it = pending_peer_messages_.find(name);
    556     if (it != pending_peer_messages_.end()) {
    557       std::swap(pending_messages, it->second);
    558       pending_peer_messages_.erase(it);
    559     }
    560   }
    561 
    562   if (start_channel)
    563     channel->Start();
    564 
    565   // Flush any queued message we need to deliver to this node.
    566   while (!pending_messages.empty()) {
    567     channel->PortsMessage(std::move(pending_messages.front()));
    568     pending_messages.pop();
    569   }
    570 }
    571 
    572 void NodeController::DropPeer(const ports::NodeName& name,
    573                               NodeChannel* channel) {
    574   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    575 
    576   {
    577     base::AutoLock lock(peers_lock_);
    578     auto it = peers_.find(name);
    579 
    580     if (it != peers_.end()) {
    581       ports::NodeName peer = it->first;
    582       peers_.erase(it);
    583       DVLOG(1) << "Dropped peer " << peer;
    584     }
    585 
    586     pending_peer_messages_.erase(name);
    587     pending_children_.erase(name);
    588 
    589     RecordPeerCount(peers_.size());
    590     RecordPendingChildCount(pending_children_.size());
    591   }
    592 
    593   std::vector<ports::PortRef> ports_to_close;
    594   {
    595     // Clean up any reserved ports.
    596     base::AutoLock lock(reserved_ports_lock_);
    597     auto it = pending_child_tokens_.find(name);
    598     if (it != pending_child_tokens_.end()) {
    599       const std::string& child_token = it->second;
    600 
    601       std::vector<std::string> port_tokens;
    602       for (const auto& port : reserved_ports_) {
    603         if (port.second.child_token == child_token) {
    604           DVLOG(1) << "Closing reserved port: " << port.second.port.name();
    605           ports_to_close.push_back(port.second.port);
    606           port_tokens.push_back(port.first);
    607         }
    608       }
    609 
    610       // We have to erase reserved ports in a two-step manner because the usual
    611       // manner of using the returned iterator from map::erase isn't technically
    612       // valid in C++11 (although it is in C++14).
    613       for (const auto& token : port_tokens)
    614         reserved_ports_.erase(token);
    615 
    616       pending_child_tokens_.erase(it);
    617     }
    618   }
    619 
    620   bool is_parent;
    621   {
    622     base::AutoLock lock(parent_lock_);
    623     is_parent = (name == parent_name_ || channel == bootstrap_parent_channel_);
    624   }
    625 
    626   // If the error comes from the parent channel, we also need to cancel any
    627   // port merge requests, so that errors can be propagated to the message
    628   // pipes.
    629   if (is_parent)
    630     CancelPendingPortMerges();
    631 
    632   auto peer = peer_connections_.find(name);
    633   if (peer != peer_connections_.end()) {
    634     peers_by_token_.erase(peer->second.peer_token);
    635     ports_to_close.push_back(peer->second.local_port);
    636     peer_connections_.erase(peer);
    637   }
    638 
    639   for (const auto& port : ports_to_close)
    640     node_->ClosePort(port);
    641 
    642   node_->LostConnectionToNode(name);
    643 
    644   AcceptIncomingMessages();
    645 }
    646 
    647 void NodeController::SendPeerMessage(const ports::NodeName& name,
    648                                      ports::ScopedMessage message) {
    649   Channel::MessagePtr channel_message =
    650       static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
    651 
    652   scoped_refptr<NodeChannel> peer = GetPeerChannel(name);
    653 #if defined(OS_WIN)
    654   if (channel_message->has_handles()) {
    655     // If we're sending a message with handles we aren't the destination
    656     // node's parent or broker (i.e. we don't know its process handle), ask
    657     // the broker to relay for us.
    658     scoped_refptr<NodeChannel> broker = GetBrokerChannel();
    659     if (!peer || !peer->HasRemoteProcessHandle()) {
    660       if (broker) {
    661         broker->RelayPortsMessage(name, std::move(channel_message));
    662       } else {
    663         base::AutoLock lock(broker_lock_);
    664         pending_relay_messages_[name].emplace(std::move(channel_message));
    665       }
    666       return;
    667     }
    668   }
    669 #elif defined(OS_MACOSX) && !defined(OS_IOS)
    670   if (channel_message->has_mach_ports()) {
    671     // Messages containing Mach ports are always routed through the broker, even
    672     // if the broker process is the intended recipient.
    673     bool use_broker = false;
    674     {
    675       base::AutoLock lock(parent_lock_);
    676       use_broker = (bootstrap_parent_channel_ ||
    677                     parent_name_ != ports::kInvalidNodeName);
    678     }
    679     if (use_broker) {
    680       scoped_refptr<NodeChannel> broker = GetBrokerChannel();
    681       if (broker) {
    682         broker->RelayPortsMessage(name, std::move(channel_message));
    683       } else {
    684         base::AutoLock lock(broker_lock_);
    685         pending_relay_messages_[name].emplace(std::move(channel_message));
    686       }
    687       return;
    688     }
    689   }
    690 #endif  // defined(OS_WIN)
    691 
    692   if (peer) {
    693     peer->PortsMessage(std::move(channel_message));
    694     return;
    695   }
    696 
    697   // If we don't know who the peer is, queue the message for delivery. If this
    698   // is the first message queued for the peer, we also ask the broker to
    699   // introduce us to them.
    700 
    701   bool needs_introduction = false;
    702   {
    703     base::AutoLock lock(peers_lock_);
    704     auto& queue = pending_peer_messages_[name];
    705     needs_introduction = queue.empty();
    706     queue.emplace(std::move(channel_message));
    707   }
    708 
    709   if (needs_introduction) {
    710     scoped_refptr<NodeChannel> broker = GetBrokerChannel();
    711     if (!broker) {
    712       DVLOG(1) << "Dropping message for unknown peer: " << name;
    713       return;
    714     }
    715     broker->RequestIntroduction(name);
    716   }
    717 }
    718 
    719 void NodeController::AcceptIncomingMessages() {
    720   // This is an impactically large value which should never be reached in
    721   // practice. See the CHECK below for usage.
    722   constexpr size_t kMaxAcceptedMessages = 1000000;
    723 
    724   size_t num_messages_accepted = 0;
    725   while (incoming_messages_flag_) {
    726     // TODO: We may need to be more careful to avoid starving the rest of the
    727     // thread here. Revisit this if it turns out to be a problem. One
    728     // alternative would be to schedule a task to continue pumping messages
    729     // after flushing once.
    730 
    731     messages_lock_.Acquire();
    732     if (incoming_messages_.empty()) {
    733       messages_lock_.Release();
    734       break;
    735     }
    736 
    737     // libstdc++'s deque creates an internal buffer on construction, even when
    738     // the size is 0. So avoid creating it until it is necessary.
    739     std::queue<ports::ScopedMessage> messages;
    740     std::swap(messages, incoming_messages_);
    741     incoming_messages_flag_.Set(false);
    742     messages_lock_.Release();
    743 
    744     num_messages_accepted += messages.size();
    745     while (!messages.empty()) {
    746       node_->AcceptMessage(std::move(messages.front()));
    747       messages.pop();
    748     }
    749 
    750     // This is effectively a safeguard against potential bugs which might lead
    751     // to runaway message cycles. If any such cycles arise, we'll start seeing
    752     // crash reports from this location.
    753     CHECK_LE(num_messages_accepted, kMaxAcceptedMessages);
    754   }
    755 
    756   if (num_messages_accepted >= 4) {
    757     // Note: We avoid logging this histogram for the vast majority of cases.
    758     // See https://crbug.com/685763 for more context.
    759     UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.MessagesAcceptedPerEvent",
    760                                 static_cast<int32_t>(num_messages_accepted),
    761                                 1 /* min */,
    762                                 500 /* max */,
    763                                 50 /* bucket count */);
    764   }
    765 
    766   AttemptShutdownIfRequested();
    767 }
    768 
    769 void NodeController::ProcessIncomingMessages() {
    770   RequestContext request_context(RequestContext::Source::SYSTEM);
    771 
    772   {
    773     base::AutoLock lock(messages_lock_);
    774     // Allow a new incoming messages processing task to be posted. This can't be
    775     // done after AcceptIncomingMessages() otherwise a message might be missed.
    776     // Doing it here may result in at most two tasks existing at the same time;
    777     // this running one, and one pending in the task runner.
    778     incoming_messages_task_posted_ = false;
    779   }
    780 
    781   AcceptIncomingMessages();
    782 }
    783 
    784 void NodeController::DropAllPeers() {
    785   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    786 
    787   std::vector<scoped_refptr<NodeChannel>> all_peers;
    788   {
    789     base::AutoLock lock(parent_lock_);
    790     if (bootstrap_parent_channel_) {
    791       // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its
    792       // existence to determine whether or not this is the root node. Once
    793       // bootstrap_parent_channel_->ShutDown() has been called,
    794       // |bootstrap_parent_channel_| is essentially a dead object and it doesn't
    795       // matter if it's deleted now or when |this| is deleted.
    796       // Note: |bootstrap_parent_channel_| is only modified on the IO thread.
    797       all_peers.push_back(bootstrap_parent_channel_);
    798     }
    799   }
    800 
    801   {
    802     base::AutoLock lock(peers_lock_);
    803     for (const auto& peer : peers_)
    804       all_peers.push_back(peer.second);
    805     for (const auto& peer : pending_children_)
    806       all_peers.push_back(peer.second);
    807     peers_.clear();
    808     pending_children_.clear();
    809     pending_peer_messages_.clear();
    810     peer_connections_.clear();
    811   }
    812 
    813   for (const auto& peer : all_peers)
    814     peer->ShutDown();
    815 
    816   if (destroy_on_io_thread_shutdown_)
    817     delete this;
    818 }
    819 
    820 void NodeController::GenerateRandomPortName(ports::PortName* port_name) {
    821   GenerateRandomName(port_name);
    822 }
    823 
    824 void NodeController::AllocMessage(size_t num_header_bytes,
    825                                   ports::ScopedMessage* message) {
    826   message->reset(new PortsMessage(num_header_bytes, 0, 0, nullptr));
    827 }
    828 
    829 void NodeController::ForwardMessage(const ports::NodeName& node,
    830                                     ports::ScopedMessage message) {
    831   DCHECK(message);
    832   bool schedule_pump_task = false;
    833   if (node == name_) {
    834     // NOTE: We need to avoid re-entering the Node instance within
    835     // ForwardMessage. Because ForwardMessage is only ever called
    836     // (synchronously) in response to Node's ClosePort, SendMessage, or
    837     // AcceptMessage, we flush the queue after calling any of those methods.
    838     base::AutoLock lock(messages_lock_);
    839     // |io_task_runner_| may be null in tests or processes that don't require
    840     // multi-process Mojo.
    841     schedule_pump_task = incoming_messages_.empty() && io_task_runner_ &&
    842         !incoming_messages_task_posted_;
    843     incoming_messages_task_posted_ |= schedule_pump_task;
    844     incoming_messages_.emplace(std::move(message));
    845     incoming_messages_flag_.Set(true);
    846   } else {
    847     SendPeerMessage(node, std::move(message));
    848   }
    849 
    850   if (schedule_pump_task) {
    851     // Normally, the queue is processed after the action that added the local
    852     // message is done (i.e. SendMessage, ClosePort, etc). However, it's also
    853     // possible for a local message to be added as a result of a remote message,
    854     // and OnChannelMessage() doesn't process this queue (although
    855     // OnPortsMessage() does). There may also be other code paths, now or added
    856     // in the future, which cause local messages to be added but don't process
    857     // this message queue.
    858     //
    859     // Instead of adding a call to AcceptIncomingMessages() on every possible
    860     // code path, post a task to the IO thread to process the queue. If the
    861     // current call stack processes the queue, this may end up doing nothing.
    862     io_task_runner_->PostTask(
    863         FROM_HERE,
    864         base::Bind(&NodeController::ProcessIncomingMessages,
    865                    base::Unretained(this)));
    866   }
    867 }
    868 
    869 void NodeController::BroadcastMessage(ports::ScopedMessage message) {
    870   CHECK_EQ(message->num_ports(), 0u);
    871   Channel::MessagePtr channel_message =
    872       static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
    873   CHECK(!channel_message->has_handles());
    874 
    875   scoped_refptr<NodeChannel> broker = GetBrokerChannel();
    876   if (broker)
    877     broker->Broadcast(std::move(channel_message));
    878   else
    879     OnBroadcast(name_, std::move(channel_message));
    880 }
    881 
    882 void NodeController::PortStatusChanged(const ports::PortRef& port) {
    883   scoped_refptr<ports::UserData> user_data;
    884   node_->GetUserData(port, &user_data);
    885 
    886   PortObserver* observer = static_cast<PortObserver*>(user_data.get());
    887   if (observer) {
    888     observer->OnPortStatusChanged();
    889   } else {
    890     DVLOG(2) << "Ignoring status change for " << port.name() << " because it "
    891              << "doesn't have an observer.";
    892   }
    893 }
    894 
    895 void NodeController::OnAcceptChild(const ports::NodeName& from_node,
    896                                    const ports::NodeName& parent_name,
    897                                    const ports::NodeName& token) {
    898   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    899 
    900   scoped_refptr<NodeChannel> parent;
    901   {
    902     base::AutoLock lock(parent_lock_);
    903     if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) {
    904       parent_name_ = parent_name;
    905       parent = bootstrap_parent_channel_;
    906     }
    907   }
    908 
    909   if (!parent) {
    910     DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node;
    911     DropPeer(from_node, nullptr);
    912     return;
    913   }
    914 
    915   parent->SetRemoteNodeName(parent_name);
    916   parent->AcceptParent(token, name_);
    917 
    918   // NOTE: The child does not actually add its parent as a peer until
    919   // receiving an AcceptBrokerClient message from the broker. The parent
    920   // will request that said message be sent upon receiving AcceptParent.
    921 
    922   DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
    923 }
    924 
    925 void NodeController::OnAcceptParent(const ports::NodeName& from_node,
    926                                     const ports::NodeName& token,
    927                                     const ports::NodeName& child_name) {
    928   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    929 
    930   auto it = pending_children_.find(from_node);
    931   if (it == pending_children_.end() || token != from_node) {
    932     DLOG(ERROR) << "Received unexpected AcceptParent message from "
    933                 << from_node;
    934     DropPeer(from_node, nullptr);
    935     return;
    936   }
    937 
    938   scoped_refptr<NodeChannel> channel = it->second;
    939   pending_children_.erase(it);
    940 
    941   DCHECK(channel);
    942 
    943   DVLOG(1) << "Parent " << name_ << " accepted child " << child_name;
    944 
    945   AddPeer(child_name, channel, false /* start_channel */);
    946 
    947   // TODO(rockot/amistry): We could simplify child initialization if we could
    948   // synchronously get a new async broker channel from the broker. For now we do
    949   // it asynchronously since it's only used to facilitate handle passing, not
    950   // handle creation.
    951   scoped_refptr<NodeChannel> broker = GetBrokerChannel();
    952   if (broker) {
    953     // Inform the broker of this new child.
    954     broker->AddBrokerClient(child_name, channel->CopyRemoteProcessHandle());
    955   } else {
    956     // If we have no broker, either we need to wait for one, or we *are* the
    957     // broker.
    958     scoped_refptr<NodeChannel> parent = GetParentChannel();
    959     if (!parent) {
    960       base::AutoLock lock(parent_lock_);
    961       parent = bootstrap_parent_channel_;
    962     }
    963 
    964     if (!parent) {
    965       // Yes, we're the broker. We can initialize the child directly.
    966       channel->AcceptBrokerClient(name_, ScopedPlatformHandle());
    967     } else {
    968       // We aren't the broker, so wait for a broker connection.
    969       base::AutoLock lock(broker_lock_);
    970       pending_broker_clients_.push(child_name);
    971     }
    972   }
    973 }
    974 
    975 void NodeController::OnAddBrokerClient(const ports::NodeName& from_node,
    976                                        const ports::NodeName& client_name,
    977                                        base::ProcessHandle process_handle) {
    978 #if defined(OS_WIN)
    979   // Scoped handle to avoid leaks on error.
    980   ScopedPlatformHandle scoped_process_handle =
    981       ScopedPlatformHandle(PlatformHandle(process_handle));
    982 #endif
    983   scoped_refptr<NodeChannel> sender = GetPeerChannel(from_node);
    984   if (!sender) {
    985     DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender.";
    986     return;
    987   }
    988 
    989   if (GetPeerChannel(client_name)) {
    990     DLOG(ERROR) << "Ignoring AddBrokerClient for known client.";
    991     DropPeer(from_node, nullptr);
    992     return;
    993   }
    994 
    995   PlatformChannelPair broker_channel;
    996   ConnectionParams connection_params(broker_channel.PassServerHandle());
    997   scoped_refptr<NodeChannel> client =
    998       NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
    999                           ProcessErrorCallback());
   1000 
   1001 #if defined(OS_WIN)
   1002   // The broker must have a working handle to the client process in order to
   1003   // properly copy other handles to and from the client.
   1004   if (!scoped_process_handle.is_valid()) {
   1005     DLOG(ERROR) << "Broker rejecting client with invalid process handle.";
   1006     return;
   1007   }
   1008   client->SetRemoteProcessHandle(scoped_process_handle.release().handle);
   1009 #else
   1010   client->SetRemoteProcessHandle(process_handle);
   1011 #endif
   1012 
   1013   AddPeer(client_name, client, true /* start_channel */);
   1014 
   1015   DVLOG(1) << "Broker " << name_ << " accepting client " << client_name
   1016            << " from peer " << from_node;
   1017 
   1018   sender->BrokerClientAdded(client_name, broker_channel.PassClientHandle());
   1019 }
   1020 
   1021 void NodeController::OnBrokerClientAdded(const ports::NodeName& from_node,
   1022                                          const ports::NodeName& client_name,
   1023                                          ScopedPlatformHandle broker_channel) {
   1024   scoped_refptr<NodeChannel> client = GetPeerChannel(client_name);
   1025   if (!client) {
   1026     DLOG(ERROR) << "BrokerClientAdded for unknown child " << client_name;
   1027     return;
   1028   }
   1029 
   1030   // This should have come from our own broker.
   1031   if (GetBrokerChannel() != GetPeerChannel(from_node)) {
   1032     DLOG(ERROR) << "BrokerClientAdded from non-broker node " << from_node;
   1033     return;
   1034   }
   1035 
   1036   DVLOG(1) << "Child " << client_name << " accepted by broker " << from_node;
   1037 
   1038   client->AcceptBrokerClient(from_node, std::move(broker_channel));
   1039 }
   1040 
   1041 void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node,
   1042                                           const ports::NodeName& broker_name,
   1043                                           ScopedPlatformHandle broker_channel) {
   1044   // This node should already have a parent in bootstrap mode.
   1045   ports::NodeName parent_name;
   1046   scoped_refptr<NodeChannel> parent;
   1047   {
   1048     base::AutoLock lock(parent_lock_);
   1049     parent_name = parent_name_;
   1050     parent = bootstrap_parent_channel_;
   1051     bootstrap_parent_channel_ = nullptr;
   1052   }
   1053   DCHECK(parent_name == from_node);
   1054   DCHECK(parent);
   1055 
   1056   std::queue<ports::NodeName> pending_broker_clients;
   1057   std::unordered_map<ports::NodeName, OutgoingMessageQueue>
   1058       pending_relay_messages;
   1059   {
   1060     base::AutoLock lock(broker_lock_);
   1061     broker_name_ = broker_name;
   1062     std::swap(pending_broker_clients, pending_broker_clients_);
   1063     std::swap(pending_relay_messages, pending_relay_messages_);
   1064   }
   1065   DCHECK(broker_name != ports::kInvalidNodeName);
   1066 
   1067   // It's now possible to add both the broker and the parent as peers.
   1068   // Note that the broker and parent may be the same node.
   1069   scoped_refptr<NodeChannel> broker;
   1070   if (broker_name == parent_name) {
   1071     DCHECK(!broker_channel.is_valid());
   1072     broker = parent;
   1073   } else {
   1074     DCHECK(broker_channel.is_valid());
   1075     broker =
   1076         NodeChannel::Create(this, ConnectionParams(std::move(broker_channel)),
   1077                             io_task_runner_, ProcessErrorCallback());
   1078     AddPeer(broker_name, broker, true /* start_channel */);
   1079   }
   1080 
   1081   AddPeer(parent_name, parent, false /* start_channel */);
   1082 
   1083   {
   1084     // Complete any port merge requests we have waiting for the parent.
   1085     base::AutoLock lock(pending_port_merges_lock_);
   1086     for (const auto& request : pending_port_merges_)
   1087       parent->RequestPortMerge(request.second.name(), request.first);
   1088     pending_port_merges_.clear();
   1089   }
   1090 
   1091   // Feed the broker any pending children of our own.
   1092   while (!pending_broker_clients.empty()) {
   1093     const ports::NodeName& child_name = pending_broker_clients.front();
   1094     auto it = pending_children_.find(child_name);
   1095     DCHECK(it != pending_children_.end());
   1096     broker->AddBrokerClient(child_name, it->second->CopyRemoteProcessHandle());
   1097     pending_broker_clients.pop();
   1098   }
   1099 
   1100 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
   1101   // Have the broker relay any messages we have waiting.
   1102   for (auto& entry : pending_relay_messages) {
   1103     const ports::NodeName& destination = entry.first;
   1104     auto& message_queue = entry.second;
   1105     while (!message_queue.empty()) {
   1106       broker->RelayPortsMessage(destination, std::move(message_queue.front()));
   1107       message_queue.pop();
   1108     }
   1109   }
   1110 #endif
   1111 
   1112   DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name;
   1113 }
   1114 
   1115 void NodeController::OnPortsMessage(const ports::NodeName& from_node,
   1116                                     Channel::MessagePtr channel_message) {
   1117   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
   1118 
   1119   void* data;
   1120   size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
   1121   if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes,
   1122                          &num_header_bytes, &num_payload_bytes,
   1123                          &num_ports_bytes)) {
   1124     DropPeer(from_node, nullptr);
   1125     return;
   1126   }
   1127 
   1128   CHECK(channel_message);
   1129   std::unique_ptr<PortsMessage> ports_message(
   1130       new PortsMessage(num_header_bytes,
   1131                        num_payload_bytes,
   1132                        num_ports_bytes,
   1133                        std::move(channel_message)));
   1134   ports_message->set_source_node(from_node);
   1135   node_->AcceptMessage(ports::ScopedMessage(ports_message.release()));
   1136   AcceptIncomingMessages();
   1137 }
   1138 
   1139 void NodeController::OnRequestPortMerge(
   1140     const ports::NodeName& from_node,
   1141     const ports::PortName& connector_port_name,
   1142     const std::string& token) {
   1143   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
   1144 
   1145   DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token "
   1146            << token << " and port " << connector_port_name << "@" << from_node;
   1147 
   1148   ports::PortRef local_port;
   1149   {
   1150     base::AutoLock lock(reserved_ports_lock_);
   1151     auto it = reserved_ports_.find(token);
   1152     if (it == reserved_ports_.end()) {
   1153       DVLOG(1) << "Ignoring request to connect to port for unknown token "
   1154                << token;
   1155       return;
   1156     }
   1157     local_port = it->second.port;
   1158   }
   1159 
   1160   int rv = node_->MergePorts(local_port, from_node, connector_port_name);
   1161   if (rv != ports::OK)
   1162     DLOG(ERROR) << "MergePorts failed: " << rv;
   1163 
   1164   AcceptIncomingMessages();
   1165 }
   1166 
   1167 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
   1168                                            const ports::NodeName& name) {
   1169   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
   1170 
   1171   scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
   1172   if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
   1173     DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
   1174                 << from_node;
   1175     DropPeer(from_node, nullptr);
   1176     return;
   1177   }
   1178 
   1179   scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name);
   1180   if (!new_friend) {
   1181     // We don't know who they're talking about!
   1182     requestor->Introduce(name, ScopedPlatformHandle());
   1183   } else {
   1184     PlatformChannelPair new_channel;
   1185     requestor->Introduce(name, new_channel.PassServerHandle());
   1186     new_friend->Introduce(from_node, new_channel.PassClientHandle());
   1187   }
   1188 }
   1189 
   1190 void NodeController::OnIntroduce(const ports::NodeName& from_node,
   1191                                  const ports::NodeName& name,
   1192                                  ScopedPlatformHandle channel_handle) {
   1193   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
   1194 
   1195   if (!channel_handle.is_valid()) {
   1196     node_->LostConnectionToNode(name);
   1197 
   1198     DVLOG(1) << "Could not be introduced to peer " << name;
   1199     base::AutoLock lock(peers_lock_);
   1200     pending_peer_messages_.erase(name);
   1201     return;
   1202   }
   1203 
   1204   scoped_refptr<NodeChannel> channel =
   1205       NodeChannel::Create(this, ConnectionParams(std::move(channel_handle)),
   1206                           io_task_runner_, ProcessErrorCallback());
   1207 
   1208   DVLOG(1) << "Adding new peer " << name << " via parent introduction.";
   1209   AddPeer(name, channel, true /* start_channel */);
   1210 }
   1211 
   1212 void NodeController::OnBroadcast(const ports::NodeName& from_node,
   1213                                  Channel::MessagePtr message) {
   1214   DCHECK(!message->has_handles());
   1215 
   1216   void* data;
   1217   size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
   1218   if (!ParsePortsMessage(message.get(), &data, &num_data_bytes,
   1219                          &num_header_bytes, &num_payload_bytes,
   1220                          &num_ports_bytes)) {
   1221     DropPeer(from_node, nullptr);
   1222     return;
   1223   }
   1224 
   1225   // Broadcast messages must not contain ports.
   1226   if (num_ports_bytes > 0) {
   1227     DropPeer(from_node, nullptr);
   1228     return;
   1229   }
   1230 
   1231   base::AutoLock lock(peers_lock_);
   1232   for (auto& iter : peers_) {
   1233     // Copy and send the message to each known peer.
   1234     Channel::MessagePtr peer_message(
   1235         new Channel::Message(message->payload_size(), 0));
   1236     memcpy(peer_message->mutable_payload(), message->payload(),
   1237            message->payload_size());
   1238     iter.second->PortsMessage(std::move(peer_message));
   1239   }
   1240 }
   1241 
   1242 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
   1243 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node,
   1244                                          base::ProcessHandle from_process,
   1245                                          const ports::NodeName& destination,
   1246                                          Channel::MessagePtr message) {
   1247   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
   1248 
   1249   if (GetBrokerChannel()) {
   1250     // Only the broker should be asked to relay a message.
   1251     LOG(ERROR) << "Non-broker refusing to relay message.";
   1252     DropPeer(from_node, nullptr);
   1253     return;
   1254   }
   1255 
   1256   // The parent should always know which process this came from.
   1257   DCHECK(from_process != base::kNullProcessHandle);
   1258 
   1259 #if defined(OS_WIN)
   1260   // Rewrite the handles to this (the parent) process. If the message is
   1261   // destined for another child process, the handles will be rewritten to that
   1262   // process before going out (see NodeChannel::WriteChannelMessage).
   1263   //
   1264   // TODO: We could avoid double-duplication.
   1265   //
   1266   // Note that we explicitly mark the handles as being owned by the sending
   1267   // process before rewriting them, in order to accommodate RewriteHandles'
   1268   // internal sanity checks.
   1269   ScopedPlatformHandleVectorPtr handles = message->TakeHandles();
   1270   for (size_t i = 0; i < handles->size(); ++i)
   1271     (*handles)[i].owning_process = from_process;
   1272   if (!Channel::Message::RewriteHandles(from_process,
   1273                                         base::GetCurrentProcessHandle(),
   1274                                         handles.get())) {
   1275     DLOG(ERROR) << "Failed to relay one or more handles.";
   1276   }
   1277   message->SetHandles(std::move(handles));
   1278 #else
   1279   MachPortRelay* relay = GetMachPortRelay();
   1280   if (!relay) {
   1281     LOG(ERROR) << "Receiving Mach ports without a port relay from "
   1282                << from_node << ". Dropping message.";
   1283     return;
   1284   }
   1285   if (!relay->ExtractPortRights(message.get(), from_process)) {
   1286     // NodeChannel should ensure that MachPortRelay is ready for the remote
   1287     // process. At this point, if the port extraction failed, either something
   1288     // went wrong in the mach stuff, or the remote process died.
   1289     LOG(ERROR) << "Error on receiving Mach ports " << from_node
   1290                << ". Dropping message.";
   1291     return;
   1292   }
   1293 #endif  // defined(OS_WIN)
   1294 
   1295   if (destination == name_) {
   1296     // Great, we can deliver this message locally.
   1297     OnPortsMessage(from_node, std::move(message));
   1298     return;
   1299   }
   1300 
   1301   scoped_refptr<NodeChannel> peer = GetPeerChannel(destination);
   1302   if (peer)
   1303     peer->PortsMessageFromRelay(from_node, std::move(message));
   1304   else
   1305     DLOG(ERROR) << "Dropping relay message for unknown node " << destination;
   1306 }
   1307 
   1308 void NodeController::OnPortsMessageFromRelay(const ports::NodeName& from_node,
   1309                                              const ports::NodeName& source_node,
   1310                                              Channel::MessagePtr message) {
   1311   if (GetPeerChannel(from_node) != GetBrokerChannel()) {
   1312     LOG(ERROR) << "Refusing relayed message from non-broker node.";
   1313     DropPeer(from_node, nullptr);
   1314     return;
   1315   }
   1316 
   1317   OnPortsMessage(source_node, std::move(message));
   1318 }
   1319 #endif
   1320 
   1321 void NodeController::OnAcceptPeer(const ports::NodeName& from_node,
   1322                                   const ports::NodeName& token,
   1323                                   const ports::NodeName& peer_name,
   1324                                   const ports::PortName& port_name) {
   1325   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
   1326 
   1327   auto it = peer_connections_.find(from_node);
   1328   if (it == peer_connections_.end()) {
   1329     DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node;
   1330     DropPeer(from_node, nullptr);
   1331     return;
   1332   }
   1333 
   1334   scoped_refptr<NodeChannel> channel = std::move(it->second.channel);
   1335   ports::PortRef local_port = it->second.local_port;
   1336   std::string peer_token = std::move(it->second.peer_token);
   1337   peer_connections_.erase(it);
   1338   DCHECK(channel);
   1339 
   1340   // If the peer connection is a self connection (which is used in tests),
   1341   // drop the channel to it and skip straight to merging the ports.
   1342   if (name_ == peer_name) {
   1343     peers_by_token_.erase(peer_token);
   1344   } else {
   1345     peers_by_token_[peer_token] = peer_name;
   1346     peer_connections_.insert(
   1347         {peer_name, PeerConnection{nullptr, local_port, peer_token}});
   1348     DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name;
   1349 
   1350     AddPeer(peer_name, channel, false /* start_channel */);
   1351   }
   1352 
   1353   // We need to choose one side to initiate the port merge. It doesn't matter
   1354   // who does it as long as they don't both try. Simple solution: pick the one
   1355   // with the "smaller" port name.
   1356   if (local_port.name() < port_name) {
   1357     node()->MergePorts(local_port, peer_name, port_name);
   1358   }
   1359 }
   1360 
   1361 void NodeController::OnChannelError(const ports::NodeName& from_node,
   1362                                     NodeChannel* channel) {
   1363   if (io_task_runner_->RunsTasksOnCurrentThread()) {
   1364     DropPeer(from_node, channel);
   1365     // DropPeer may have caused local port closures, so be sure to process any
   1366     // pending local messages.
   1367     AcceptIncomingMessages();
   1368   } else {
   1369     io_task_runner_->PostTask(
   1370         FROM_HERE,
   1371         base::Bind(&NodeController::OnChannelError, base::Unretained(this),
   1372                    from_node, channel));
   1373   }
   1374 }
   1375 
   1376 #if defined(OS_MACOSX) && !defined(OS_IOS)
   1377 MachPortRelay* NodeController::GetMachPortRelay() {
   1378   {
   1379     base::AutoLock lock(parent_lock_);
   1380     // Return null if we're not the root.
   1381     if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName)
   1382       return nullptr;
   1383   }
   1384 
   1385   base::AutoLock lock(mach_port_relay_lock_);
   1386   return mach_port_relay_.get();
   1387 }
   1388 #endif
   1389 
   1390 void NodeController::CancelPendingPortMerges() {
   1391   std::vector<ports::PortRef> ports_to_close;
   1392 
   1393   {
   1394     base::AutoLock lock(pending_port_merges_lock_);
   1395     reject_pending_merges_ = true;
   1396     for (const auto& port : pending_port_merges_)
   1397       ports_to_close.push_back(port.second);
   1398     pending_port_merges_.clear();
   1399   }
   1400 
   1401   for (const auto& port : ports_to_close)
   1402     node_->ClosePort(port);
   1403 }
   1404 
   1405 void NodeController::DestroyOnIOThreadShutdown() {
   1406   destroy_on_io_thread_shutdown_ = true;
   1407 }
   1408 
   1409 void NodeController::AttemptShutdownIfRequested() {
   1410   if (!shutdown_callback_flag_)
   1411     return;
   1412 
   1413   base::Closure callback;
   1414   {
   1415     base::AutoLock lock(shutdown_lock_);
   1416     if (shutdown_callback_.is_null())
   1417       return;
   1418     if (!node_->CanShutdownCleanly(
   1419           ports::Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)) {
   1420       DVLOG(2) << "Unable to cleanly shut down node " << name_;
   1421       return;
   1422     }
   1423 
   1424     callback = shutdown_callback_;
   1425     shutdown_callback_.Reset();
   1426     shutdown_callback_flag_.Set(false);
   1427   }
   1428 
   1429   DCHECK(!callback.is_null());
   1430 
   1431   callback.Run();
   1432 }
   1433 
   1434 NodeController::PeerConnection::PeerConnection() = default;
   1435 
   1436 NodeController::PeerConnection::PeerConnection(
   1437     const PeerConnection& other) = default;
   1438 
   1439 NodeController::PeerConnection::PeerConnection(
   1440     PeerConnection&& other) = default;
   1441 
   1442 NodeController::PeerConnection::PeerConnection(
   1443     scoped_refptr<NodeChannel> channel,
   1444     const ports::PortRef& local_port,
   1445     const std::string& peer_token)
   1446     : channel(std::move(channel)),
   1447       local_port(local_port),
   1448       peer_token(peer_token) {}
   1449 
   1450 NodeController::PeerConnection::~PeerConnection() = default;
   1451 
   1452 NodeController::PeerConnection& NodeController::PeerConnection::
   1453 operator=(const PeerConnection& other) = default;
   1454 
   1455 NodeController::PeerConnection& NodeController::PeerConnection::
   1456 operator=(PeerConnection&& other) = default;
   1457 
   1458 }  // namespace edk
   1459 }  // namespace mojo
   1460