Home | History | Annotate | Download | only in system
      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/edk/system/node_controller.h"
      6 
      7 #include <algorithm>
      8 #include <limits>
      9 
     10 #include "base/bind.h"
     11 #include "base/location.h"
     12 #include "base/logging.h"
     13 #include "base/macros.h"
     14 #include "base/message_loop/message_loop.h"
     15 #include "base/metrics/histogram_macros.h"
     16 #include "base/process/process_handle.h"
     17 #include "base/rand_util.h"
     18 #include "base/time/time.h"
     19 #include "base/timer/elapsed_timer.h"
     20 #include "mojo/edk/embedder/embedder_internal.h"
     21 #include "mojo/edk/embedder/platform_channel_pair.h"
     22 #include "mojo/edk/system/broker.h"
     23 #include "mojo/edk/system/broker_host.h"
     24 #include "mojo/edk/system/core.h"
     25 #include "mojo/edk/system/ports_message.h"
     26 #include "mojo/edk/system/request_context.h"
     27 
     28 #if defined(OS_MACOSX) && !defined(OS_IOS)
     29 #include "mojo/edk/system/mach_port_relay.h"
     30 #endif
     31 
     32 #if !defined(OS_NACL)
     33 #include "crypto/random.h"
     34 #endif
     35 
     36 namespace mojo {
     37 namespace edk {
     38 
     39 namespace {
     40 
     41 #if defined(OS_NACL)
     42 template <typename T>
     43 void GenerateRandomName(T* out) { base::RandBytes(out, sizeof(T)); }
     44 #else
     45 template <typename T>
     46 void GenerateRandomName(T* out) { crypto::RandBytes(out, sizeof(T)); }
     47 #endif
     48 
     49 ports::NodeName GetRandomNodeName() {
     50   ports::NodeName name;
     51   GenerateRandomName(&name);
     52   return name;
     53 }
     54 
     55 void RecordPeerCount(size_t count) {
     56   DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
     57 
     58   // 8k is the maximum number of file descriptors allowed in Chrome.
     59   UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.ConnectedPeers",
     60                               static_cast<int32_t>(count),
     61                               0 /* min */,
     62                               8000 /* max */,
     63                               50 /* bucket count */);
     64 }
     65 
     66 void RecordPendingChildCount(size_t count) {
     67   DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
     68 
     69   // 8k is the maximum number of file descriptors allowed in Chrome.
     70   UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.PendingChildren",
     71                               static_cast<int32_t>(count),
     72                               0 /* min */,
     73                               8000 /* max */,
     74                               50 /* bucket count */);
     75 }
     76 
     77 bool ParsePortsMessage(Channel::Message* message,
     78                        void** data,
     79                        size_t* num_data_bytes,
     80                        size_t* num_header_bytes,
     81                        size_t* num_payload_bytes,
     82                        size_t* num_ports_bytes) {
     83   DCHECK(data && num_data_bytes && num_header_bytes && num_payload_bytes &&
     84          num_ports_bytes);
     85 
     86   NodeChannel::GetPortsMessageData(message, data, num_data_bytes);
     87   if (!*num_data_bytes)
     88     return false;
     89 
     90   if (!ports::Message::Parse(*data, *num_data_bytes, num_header_bytes,
     91                              num_payload_bytes, num_ports_bytes)) {
     92     return false;
     93   }
     94 
     95   return true;
     96 }
     97 
     98 // Used by NodeController to watch for shutdown. Since no IO can happen once
     99 // the IO thread is killed, the NodeController can cleanly drop all its peers
    100 // at that time.
    101 class ThreadDestructionObserver :
    102     public base::MessageLoop::DestructionObserver {
    103  public:
    104   static void Create(scoped_refptr<base::TaskRunner> task_runner,
    105                      const base::Closure& callback) {
    106     if (task_runner->RunsTasksOnCurrentThread()) {
    107       // Owns itself.
    108       new ThreadDestructionObserver(callback);
    109     } else {
    110       task_runner->PostTask(FROM_HERE,
    111                             base::Bind(&Create, task_runner, callback));
    112     }
    113   }
    114 
    115  private:
    116   explicit ThreadDestructionObserver(const base::Closure& callback)
    117       : callback_(callback) {
    118     base::MessageLoop::current()->AddDestructionObserver(this);
    119   }
    120 
    121   ~ThreadDestructionObserver() override {
    122     base::MessageLoop::current()->RemoveDestructionObserver(this);
    123   }
    124 
    125   // base::MessageLoop::DestructionObserver:
    126   void WillDestroyCurrentMessageLoop() override {
    127     callback_.Run();
    128     delete this;
    129   }
    130 
    131   const base::Closure callback_;
    132 
    133   DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver);
    134 };
    135 
    136 }  // namespace
    137 
    138 NodeController::~NodeController() {}
    139 
    140 NodeController::NodeController(Core* core)
    141     : core_(core),
    142       name_(GetRandomNodeName()),
    143       node_(new ports::Node(name_, this)) {
    144   DVLOG(1) << "Initializing node " << name_;
    145 }
    146 
    147 #if defined(OS_MACOSX) && !defined(OS_IOS)
    148 void NodeController::CreateMachPortRelay(
    149     base::PortProvider* port_provider) {
    150   base::AutoLock lock(mach_port_relay_lock_);
    151   DCHECK(!mach_port_relay_);
    152   mach_port_relay_.reset(new MachPortRelay(port_provider));
    153 }
    154 #endif
    155 
    156 void NodeController::SetIOTaskRunner(
    157     scoped_refptr<base::TaskRunner> task_runner) {
    158   io_task_runner_ = task_runner;
    159   ThreadDestructionObserver::Create(
    160       io_task_runner_,
    161       base::Bind(&NodeController::DropAllPeers, base::Unretained(this)));
    162 }
    163 
    164 void NodeController::ConnectToChild(
    165     base::ProcessHandle process_handle,
    166     ScopedPlatformHandle platform_handle,
    167     const std::string& child_token,
    168     const ProcessErrorCallback& process_error_callback) {
    169   // Generate the temporary remote node name here so that it can be associated
    170   // with the embedder's child_token. If an error occurs in the child process
    171   // after it is launched, but before any reserved ports are connected, this can
    172   // be used to clean up any dangling ports.
    173   ports::NodeName node_name;
    174   GenerateRandomName(&node_name);
    175 
    176   {
    177     base::AutoLock lock(reserved_ports_lock_);
    178     bool inserted = pending_child_tokens_.insert(
    179         std::make_pair(node_name, child_token)).second;
    180     DCHECK(inserted);
    181   }
    182 
    183 #if defined(OS_WIN)
    184   // On Windows, we need to duplicate the process handle because we have no
    185   // control over its lifetime and it may become invalid by the time the posted
    186   // task runs.
    187   HANDLE dup_handle = INVALID_HANDLE_VALUE;
    188   BOOL ok = ::DuplicateHandle(
    189       base::GetCurrentProcessHandle(), process_handle,
    190       base::GetCurrentProcessHandle(), &dup_handle,
    191       0, FALSE, DUPLICATE_SAME_ACCESS);
    192   DPCHECK(ok);
    193   process_handle = dup_handle;
    194 #endif
    195 
    196   io_task_runner_->PostTask(
    197       FROM_HERE,
    198       base::Bind(&NodeController::ConnectToChildOnIOThread,
    199                  base::Unretained(this),
    200                  process_handle,
    201                  base::Passed(&platform_handle),
    202                  node_name,
    203                  process_error_callback));
    204 }
    205 
    206 void NodeController::CloseChildPorts(const std::string& child_token) {
    207   std::vector<ports::PortRef> ports_to_close;
    208   {
    209     std::vector<std::string> port_tokens;
    210     base::AutoLock lock(reserved_ports_lock_);
    211     for (const auto& port : reserved_ports_) {
    212       if (port.second.child_token == child_token) {
    213         DVLOG(1) << "Closing reserved port " << port.second.port.name();
    214         ports_to_close.push_back(port.second.port);
    215         port_tokens.push_back(port.first);
    216       }
    217     }
    218 
    219     for (const auto& token : port_tokens)
    220       reserved_ports_.erase(token);
    221   }
    222 
    223   for (const auto& port : ports_to_close)
    224     node_->ClosePort(port);
    225 
    226   // Ensure local port closure messages are processed.
    227   AcceptIncomingMessages();
    228 }
    229 
    230 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) {
    231 // TODO(amistry): Consider the need for a broker on Windows.
    232 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
    233   // On posix, use the bootstrap channel for the broker and receive the node's
    234   // channel synchronously as the first message from the broker.
    235   base::ElapsedTimer timer;
    236   broker_.reset(new Broker(std::move(platform_handle)));
    237   platform_handle = broker_->GetParentPlatformHandle();
    238   UMA_HISTOGRAM_TIMES("Mojo.System.GetParentPlatformHandleSyncTime",
    239                       timer.Elapsed());
    240 
    241   if (!platform_handle.is_valid()) {
    242     // Most likely the browser side of the channel has already been closed and
    243     // the broker was unable to negotiate a NodeChannel pipe. In this case we
    244     // can cancel parent connection.
    245     DVLOG(1) << "Cannot connect to invalid parent channel.";
    246     return;
    247   }
    248 #endif
    249 
    250   io_task_runner_->PostTask(
    251       FROM_HERE,
    252       base::Bind(&NodeController::ConnectToParentOnIOThread,
    253                  base::Unretained(this),
    254                  base::Passed(&platform_handle)));
    255 }
    256 
    257 void NodeController::SetPortObserver(
    258     const ports::PortRef& port,
    259     const scoped_refptr<PortObserver>& observer) {
    260   node_->SetUserData(port, observer);
    261 }
    262 
    263 void NodeController::ClosePort(const ports::PortRef& port) {
    264   SetPortObserver(port, nullptr);
    265   int rv = node_->ClosePort(port);
    266   DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name();
    267 
    268   AcceptIncomingMessages();
    269 }
    270 
    271 int NodeController::SendMessage(const ports::PortRef& port,
    272                                 std::unique_ptr<PortsMessage> message) {
    273   ports::ScopedMessage ports_message(message.release());
    274   int rv = node_->SendMessage(port, std::move(ports_message));
    275 
    276   AcceptIncomingMessages();
    277   return rv;
    278 }
    279 
    280 void NodeController::ReservePort(const std::string& token,
    281                                  const ports::PortRef& port,
    282                                  const std::string& child_token) {
    283   DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
    284            << token;
    285 
    286   base::AutoLock lock(reserved_ports_lock_);
    287   auto result = reserved_ports_.insert(
    288       std::make_pair(token, ReservedPort{port, child_token}));
    289   DCHECK(result.second);
    290 }
    291 
    292 void NodeController::MergePortIntoParent(const std::string& token,
    293                                          const ports::PortRef& port) {
    294   bool was_merged = false;
    295   {
    296     // This request may be coming from within the process that reserved the
    297     // "parent" side (e.g. for Chrome single-process mode), so if this token is
    298     // reserved locally, merge locally instead.
    299     base::AutoLock lock(reserved_ports_lock_);
    300     auto it = reserved_ports_.find(token);
    301     if (it != reserved_ports_.end()) {
    302       node_->MergePorts(port, name_, it->second.port.name());
    303       reserved_ports_.erase(it);
    304       was_merged = true;
    305     }
    306   }
    307   if (was_merged) {
    308     AcceptIncomingMessages();
    309     return;
    310   }
    311 
    312   scoped_refptr<NodeChannel> parent;
    313   bool reject_merge = false;
    314   {
    315     // Hold |pending_port_merges_lock_| while getting |parent|. Otherwise,
    316     // there is a race where the parent can be set, and |pending_port_merges_|
    317     // be processed between retrieving |parent| and adding the merge to
    318     // |pending_port_merges_|.
    319     base::AutoLock lock(pending_port_merges_lock_);
    320     parent = GetParentChannel();
    321     if (reject_pending_merges_) {
    322       reject_merge = true;
    323     } else if (!parent) {
    324       pending_port_merges_.push_back(std::make_pair(token, port));
    325       return;
    326     }
    327   }
    328   if (reject_merge) {
    329     node_->ClosePort(port);
    330     DVLOG(2) << "Rejecting port merge for token " << token
    331              << " due to closed parent channel.";
    332     AcceptIncomingMessages();
    333     return;
    334   }
    335 
    336   parent->RequestPortMerge(port.name(), token);
    337 }
    338 
    339 int NodeController::MergeLocalPorts(const ports::PortRef& port0,
    340                                     const ports::PortRef& port1) {
    341   int rv = node_->MergeLocalPorts(port0, port1);
    342   AcceptIncomingMessages();
    343   return rv;
    344 }
    345 
    346 scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
    347     size_t num_bytes) {
    348 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
    349   // Shared buffer creation failure is fatal, so always use the broker when we
    350   // have one. This does mean that a non-root process that has children will use
    351   // the broker for shared buffer creation even though that process is
    352   // privileged.
    353   if (broker_) {
    354     return broker_->GetSharedBuffer(num_bytes);
    355   }
    356 #endif
    357   return PlatformSharedBuffer::Create(num_bytes);
    358 }
    359 
    360 void NodeController::RequestShutdown(const base::Closure& callback) {
    361   {
    362     base::AutoLock lock(shutdown_lock_);
    363     shutdown_callback_ = callback;
    364     shutdown_callback_flag_.Set(true);
    365   }
    366 
    367   AttemptShutdownIfRequested();
    368 }
    369 
    370 void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node,
    371                                           const std::string& error) {
    372   scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node);
    373   if (peer)
    374     peer->NotifyBadMessage(error);
    375 }
    376 
    377 void NodeController::ConnectToChildOnIOThread(
    378     base::ProcessHandle process_handle,
    379     ScopedPlatformHandle platform_handle,
    380     ports::NodeName token,
    381     const ProcessErrorCallback& process_error_callback) {
    382   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    383 
    384 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL)
    385   PlatformChannelPair node_channel;
    386   // BrokerHost owns itself.
    387   BrokerHost* broker_host = new BrokerHost(std::move(platform_handle));
    388   broker_host->SendChannel(node_channel.PassClientHandle());
    389   scoped_refptr<NodeChannel> channel = NodeChannel::Create(
    390       this, node_channel.PassServerHandle(), io_task_runner_,
    391       process_error_callback);
    392 #else
    393   scoped_refptr<NodeChannel> channel =
    394       NodeChannel::Create(this, std::move(platform_handle), io_task_runner_,
    395                           process_error_callback);
    396 #endif
    397 
    398   // We set up the child channel with a temporary name so it can be identified
    399   // as a pending child if it writes any messages to the channel. We may start
    400   // receiving messages from it (though we shouldn't) as soon as Start() is
    401   // called below.
    402 
    403   pending_children_.insert(std::make_pair(token, channel));
    404   RecordPendingChildCount(pending_children_.size());
    405 
    406   channel->SetRemoteNodeName(token);
    407   channel->SetRemoteProcessHandle(process_handle);
    408   channel->Start();
    409 
    410   channel->AcceptChild(name_, token);
    411 }
    412 
    413 void NodeController::ConnectToParentOnIOThread(
    414     ScopedPlatformHandle platform_handle) {
    415   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    416 
    417   {
    418     base::AutoLock lock(parent_lock_);
    419     DCHECK(parent_name_ == ports::kInvalidNodeName);
    420 
    421     // At this point we don't know the parent's name, so we can't yet insert it
    422     // into our |peers_| map. That will happen as soon as we receive an
    423     // AcceptChild message from them.
    424     bootstrap_parent_channel_ =
    425         NodeChannel::Create(this, std::move(platform_handle), io_task_runner_,
    426                             ProcessErrorCallback());
    427     // Prevent the parent pipe handle from being closed on shutdown. Pipe
    428     // closure is used by the parent to detect the child process has exited.
    429     // Relying on message pipes to be closed is not enough because the parent
    430     // may see the message pipe closure before the child is dead, causing the
    431     // child process to be unexpectedly SIGKILL'd.
    432     bootstrap_parent_channel_->LeakHandleOnShutdown();
    433   }
    434   bootstrap_parent_channel_->Start();
    435 }
    436 
    437 scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
    438     const ports::NodeName& name) {
    439   base::AutoLock lock(peers_lock_);
    440   auto it = peers_.find(name);
    441   if (it == peers_.end())
    442     return nullptr;
    443   return it->second;
    444 }
    445 
    446 scoped_refptr<NodeChannel> NodeController::GetParentChannel() {
    447   ports::NodeName parent_name;
    448   {
    449     base::AutoLock lock(parent_lock_);
    450     parent_name = parent_name_;
    451   }
    452   return GetPeerChannel(parent_name);
    453 }
    454 
    455 scoped_refptr<NodeChannel> NodeController::GetBrokerChannel() {
    456   ports::NodeName broker_name;
    457   {
    458     base::AutoLock lock(broker_lock_);
    459     broker_name = broker_name_;
    460   }
    461   return GetPeerChannel(broker_name);
    462 }
    463 
    464 void NodeController::AddPeer(const ports::NodeName& name,
    465                              scoped_refptr<NodeChannel> channel,
    466                              bool start_channel) {
    467   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    468 
    469   DCHECK(name != ports::kInvalidNodeName);
    470   DCHECK(channel);
    471 
    472   channel->SetRemoteNodeName(name);
    473 
    474   OutgoingMessageQueue pending_messages;
    475   {
    476     base::AutoLock lock(peers_lock_);
    477     if (peers_.find(name) != peers_.end()) {
    478       // This can happen normally if two nodes race to be introduced to each
    479       // other. The losing pipe will be silently closed and introduction should
    480       // not be affected.
    481       DVLOG(1) << "Ignoring duplicate peer name " << name;
    482       return;
    483     }
    484 
    485     auto result = peers_.insert(std::make_pair(name, channel));
    486     DCHECK(result.second);
    487 
    488     DVLOG(2) << "Accepting new peer " << name << " on node " << name_;
    489 
    490     RecordPeerCount(peers_.size());
    491 
    492     auto it = pending_peer_messages_.find(name);
    493     if (it != pending_peer_messages_.end()) {
    494       std::swap(pending_messages, it->second);
    495       pending_peer_messages_.erase(it);
    496     }
    497   }
    498 
    499   if (start_channel)
    500     channel->Start();
    501 
    502   // Flush any queued message we need to deliver to this node.
    503   while (!pending_messages.empty()) {
    504     channel->PortsMessage(std::move(pending_messages.front()));
    505     pending_messages.pop();
    506   }
    507 }
    508 
    509 void NodeController::DropPeer(const ports::NodeName& name,
    510                               NodeChannel* channel) {
    511   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    512 
    513   {
    514     base::AutoLock lock(peers_lock_);
    515     auto it = peers_.find(name);
    516 
    517     if (it != peers_.end()) {
    518       ports::NodeName peer = it->first;
    519       peers_.erase(it);
    520       DVLOG(1) << "Dropped peer " << peer;
    521     }
    522 
    523     pending_peer_messages_.erase(name);
    524     pending_children_.erase(name);
    525 
    526     RecordPeerCount(peers_.size());
    527     RecordPendingChildCount(pending_children_.size());
    528   }
    529 
    530   std::vector<ports::PortRef> ports_to_close;
    531   {
    532     // Clean up any reserved ports.
    533     base::AutoLock lock(reserved_ports_lock_);
    534     auto it = pending_child_tokens_.find(name);
    535     if (it != pending_child_tokens_.end()) {
    536       const std::string& child_token = it->second;
    537 
    538       std::vector<std::string> port_tokens;
    539       for (const auto& port : reserved_ports_) {
    540         if (port.second.child_token == child_token) {
    541           DVLOG(1) << "Closing reserved port: " << port.second.port.name();
    542           ports_to_close.push_back(port.second.port);
    543           port_tokens.push_back(port.first);
    544         }
    545       }
    546 
    547       // We have to erase reserved ports in a two-step manner because the usual
    548       // manner of using the returned iterator from map::erase isn't technically
    549       // valid in C++11 (although it is in C++14).
    550       for (const auto& token : port_tokens)
    551         reserved_ports_.erase(token);
    552 
    553       pending_child_tokens_.erase(it);
    554     }
    555   }
    556 
    557   bool is_parent;
    558   {
    559     base::AutoLock lock(parent_lock_);
    560     is_parent = (name == parent_name_ || channel == bootstrap_parent_channel_);
    561   }
    562   // If the error comes from the parent channel, we also need to cancel any
    563   // port merge requests, so that errors can be propagated to the message
    564   // pipes.
    565   if (is_parent) {
    566     base::AutoLock lock(pending_port_merges_lock_);
    567     reject_pending_merges_ = true;
    568 
    569     for (const auto& port : pending_port_merges_)
    570       ports_to_close.push_back(port.second);
    571     pending_port_merges_.clear();
    572   }
    573 
    574   for (const auto& port : ports_to_close)
    575     node_->ClosePort(port);
    576 
    577   node_->LostConnectionToNode(name);
    578 
    579   AcceptIncomingMessages();
    580 }
    581 
    582 void NodeController::SendPeerMessage(const ports::NodeName& name,
    583                                      ports::ScopedMessage message) {
    584   Channel::MessagePtr channel_message =
    585       static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
    586 
    587   scoped_refptr<NodeChannel> peer = GetPeerChannel(name);
    588 #if defined(OS_WIN)
    589   if (channel_message->has_handles()) {
    590     // If we're sending a message with handles we aren't the destination
    591     // node's parent or broker (i.e. we don't know its process handle), ask
    592     // the broker to relay for us.
    593     scoped_refptr<NodeChannel> broker = GetBrokerChannel();
    594     if (!peer || !peer->HasRemoteProcessHandle()) {
    595       if (broker) {
    596         broker->RelayPortsMessage(name, std::move(channel_message));
    597       } else {
    598         base::AutoLock lock(broker_lock_);
    599         pending_relay_messages_[name].emplace(std::move(channel_message));
    600       }
    601       return;
    602     }
    603   }
    604 #elif defined(OS_MACOSX) && !defined(OS_IOS)
    605   if (channel_message->has_mach_ports()) {
    606     // Messages containing Mach ports are always routed through the broker, even
    607     // if the broker process is the intended recipient.
    608     bool use_broker = false;
    609     {
    610       base::AutoLock lock(parent_lock_);
    611       use_broker = (bootstrap_parent_channel_ ||
    612                     parent_name_ != ports::kInvalidNodeName);
    613     }
    614     if (use_broker) {
    615       scoped_refptr<NodeChannel> broker = GetBrokerChannel();
    616       if (broker) {
    617         broker->RelayPortsMessage(name, std::move(channel_message));
    618       } else {
    619         base::AutoLock lock(broker_lock_);
    620         pending_relay_messages_[name].emplace(std::move(channel_message));
    621       }
    622       return;
    623     }
    624   }
    625 #endif  // defined(OS_WIN)
    626 
    627   if (peer) {
    628     peer->PortsMessage(std::move(channel_message));
    629     return;
    630   }
    631 
    632   // If we don't know who the peer is, queue the message for delivery. If this
    633   // is the first message queued for the peer, we also ask the broker to
    634   // introduce us to them.
    635 
    636   bool needs_introduction = false;
    637   {
    638     base::AutoLock lock(peers_lock_);
    639     auto& queue = pending_peer_messages_[name];
    640     needs_introduction = queue.empty();
    641     queue.emplace(std::move(channel_message));
    642   }
    643 
    644   if (needs_introduction) {
    645     scoped_refptr<NodeChannel> broker = GetBrokerChannel();
    646     if (!broker) {
    647       DVLOG(1) << "Dropping message for unknown peer: " << name;
    648       return;
    649     }
    650     broker->RequestIntroduction(name);
    651   }
    652 }
    653 
    654 void NodeController::AcceptIncomingMessages() {
    655   {
    656     base::AutoLock lock(messages_lock_);
    657     if (!incoming_messages_.empty()) {
    658       // libstdc++'s deque creates an internal buffer on construction, even when
    659       // the size is 0. So avoid creating it until it is necessary.
    660       std::queue<ports::ScopedMessage> messages;
    661       std::swap(messages, incoming_messages_);
    662       base::AutoUnlock unlock(messages_lock_);
    663 
    664       while (!messages.empty()) {
    665         node_->AcceptMessage(std::move(messages.front()));
    666         messages.pop();
    667       }
    668     }
    669   }
    670 
    671   AttemptShutdownIfRequested();
    672 }
    673 
    674 void NodeController::ProcessIncomingMessages() {
    675   RequestContext request_context(RequestContext::Source::SYSTEM);
    676 
    677   {
    678     base::AutoLock lock(messages_lock_);
    679     // Allow a new incoming messages processing task to be posted. This can't be
    680     // done after AcceptIncomingMessages() otherwise a message might be missed.
    681     // Doing it here may result in at most two tasks existing at the same time;
    682     // this running one, and one pending in the task runner.
    683     incoming_messages_task_posted_ = false;
    684   }
    685 
    686   AcceptIncomingMessages();
    687 }
    688 
    689 void NodeController::DropAllPeers() {
    690   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    691 
    692   std::vector<scoped_refptr<NodeChannel>> all_peers;
    693   {
    694     base::AutoLock lock(parent_lock_);
    695     if (bootstrap_parent_channel_) {
    696       // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its
    697       // existence to determine whether or not this is the root node. Once
    698       // bootstrap_parent_channel_->ShutDown() has been called,
    699       // |bootstrap_parent_channel_| is essentially a dead object and it doesn't
    700       // matter if it's deleted now or when |this| is deleted.
    701       // Note: |bootstrap_parent_channel_| is only modified on the IO thread.
    702       all_peers.push_back(bootstrap_parent_channel_);
    703     }
    704   }
    705 
    706   {
    707     base::AutoLock lock(peers_lock_);
    708     for (const auto& peer : peers_)
    709       all_peers.push_back(peer.second);
    710     for (const auto& peer : pending_children_)
    711       all_peers.push_back(peer.second);
    712     peers_.clear();
    713     pending_children_.clear();
    714     pending_peer_messages_.clear();
    715   }
    716 
    717   for (const auto& peer : all_peers)
    718     peer->ShutDown();
    719 
    720   if (destroy_on_io_thread_shutdown_)
    721     delete this;
    722 }
    723 
    724 void NodeController::GenerateRandomPortName(ports::PortName* port_name) {
    725   GenerateRandomName(port_name);
    726 }
    727 
    728 void NodeController::AllocMessage(size_t num_header_bytes,
    729                                   ports::ScopedMessage* message) {
    730   message->reset(new PortsMessage(num_header_bytes, 0, 0, nullptr));
    731 }
    732 
    733 void NodeController::ForwardMessage(const ports::NodeName& node,
    734                                     ports::ScopedMessage message) {
    735   DCHECK(message);
    736   bool schedule_pump_task = false;
    737   if (node == name_) {
    738     // NOTE: We need to avoid re-entering the Node instance within
    739     // ForwardMessage. Because ForwardMessage is only ever called
    740     // (synchronously) in response to Node's ClosePort, SendMessage, or
    741     // AcceptMessage, we flush the queue after calling any of those methods.
    742     base::AutoLock lock(messages_lock_);
    743     // |io_task_runner_| may be null in tests or processes that don't require
    744     // multi-process Mojo.
    745     schedule_pump_task = incoming_messages_.empty() && io_task_runner_ &&
    746         !incoming_messages_task_posted_;
    747     incoming_messages_task_posted_ |= schedule_pump_task;
    748     incoming_messages_.emplace(std::move(message));
    749   } else {
    750     SendPeerMessage(node, std::move(message));
    751   }
    752 
    753   if (schedule_pump_task) {
    754     // Normally, the queue is processed after the action that added the local
    755     // message is done (i.e. SendMessage, ClosePort, etc). However, it's also
    756     // possible for a local message to be added as a result of a remote message,
    757     // and OnChannelMessage() doesn't process this queue (although
    758     // OnPortsMessage() does). There may also be other code paths, now or added
    759     // in the future, which cause local messages to be added but don't process
    760     // this message queue.
    761     //
    762     // Instead of adding a call to AcceptIncomingMessages() on every possible
    763     // code path, post a task to the IO thread to process the queue. If the
    764     // current call stack processes the queue, this may end up doing nothing.
    765     io_task_runner_->PostTask(
    766         FROM_HERE,
    767         base::Bind(&NodeController::ProcessIncomingMessages,
    768                    base::Unretained(this)));
    769   }
    770 }
    771 
    772 void NodeController::BroadcastMessage(ports::ScopedMessage message) {
    773   CHECK_EQ(message->num_ports(), 0u);
    774   Channel::MessagePtr channel_message =
    775       static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
    776   CHECK(!channel_message->has_handles());
    777 
    778   scoped_refptr<NodeChannel> broker = GetBrokerChannel();
    779   if (broker)
    780     broker->Broadcast(std::move(channel_message));
    781   else
    782     OnBroadcast(name_, std::move(channel_message));
    783 }
    784 
    785 void NodeController::PortStatusChanged(const ports::PortRef& port) {
    786   scoped_refptr<ports::UserData> user_data;
    787   node_->GetUserData(port, &user_data);
    788 
    789   PortObserver* observer = static_cast<PortObserver*>(user_data.get());
    790   if (observer) {
    791     observer->OnPortStatusChanged();
    792   } else {
    793     DVLOG(2) << "Ignoring status change for " << port.name() << " because it "
    794              << "doesn't have an observer.";
    795   }
    796 }
    797 
    798 void NodeController::OnAcceptChild(const ports::NodeName& from_node,
    799                                    const ports::NodeName& parent_name,
    800                                    const ports::NodeName& token) {
    801   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    802 
    803   scoped_refptr<NodeChannel> parent;
    804   {
    805     base::AutoLock lock(parent_lock_);
    806     if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) {
    807       parent_name_ = parent_name;
    808       parent = bootstrap_parent_channel_;
    809     }
    810   }
    811 
    812   if (!parent) {
    813     DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node;
    814     DropPeer(from_node, nullptr);
    815     return;
    816   }
    817 
    818   parent->SetRemoteNodeName(parent_name);
    819   parent->AcceptParent(token, name_);
    820 
    821   // NOTE: The child does not actually add its parent as a peer until
    822   // receiving an AcceptBrokerClient message from the broker. The parent
    823   // will request that said message be sent upon receiving AcceptParent.
    824 
    825   DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
    826 }
    827 
    828 void NodeController::OnAcceptParent(const ports::NodeName& from_node,
    829                                     const ports::NodeName& token,
    830                                     const ports::NodeName& child_name) {
    831   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    832 
    833   auto it = pending_children_.find(from_node);
    834   if (it == pending_children_.end() || token != from_node) {
    835     DLOG(ERROR) << "Received unexpected AcceptParent message from "
    836                 << from_node;
    837     DropPeer(from_node, nullptr);
    838     return;
    839   }
    840 
    841   scoped_refptr<NodeChannel> channel = it->second;
    842   pending_children_.erase(it);
    843 
    844   DCHECK(channel);
    845 
    846   DVLOG(1) << "Parent " << name_ << " accepted child " << child_name;
    847 
    848   AddPeer(child_name, channel, false /* start_channel */);
    849 
    850   // TODO(rockot/amistry): We could simplify child initialization if we could
    851   // synchronously get a new async broker channel from the broker. For now we do
    852   // it asynchronously since it's only used to facilitate handle passing, not
    853   // handle creation.
    854   scoped_refptr<NodeChannel> broker = GetBrokerChannel();
    855   if (broker) {
    856     // Inform the broker of this new child.
    857     broker->AddBrokerClient(child_name, channel->CopyRemoteProcessHandle());
    858   } else {
    859     // If we have no broker, either we need to wait for one, or we *are* the
    860     // broker.
    861     scoped_refptr<NodeChannel> parent = GetParentChannel();
    862     if (!parent) {
    863       base::AutoLock lock(parent_lock_);
    864       parent = bootstrap_parent_channel_;
    865     }
    866 
    867     if (!parent) {
    868       // Yes, we're the broker. We can initialize the child directly.
    869       channel->AcceptBrokerClient(name_, ScopedPlatformHandle());
    870     } else {
    871       // We aren't the broker, so wait for a broker connection.
    872       base::AutoLock lock(broker_lock_);
    873       pending_broker_clients_.push(child_name);
    874     }
    875   }
    876 }
    877 
    878 void NodeController::OnAddBrokerClient(const ports::NodeName& from_node,
    879                                        const ports::NodeName& client_name,
    880                                        base::ProcessHandle process_handle) {
    881 #if defined(OS_WIN)
    882   // Scoped handle to avoid leaks on error.
    883   ScopedPlatformHandle scoped_process_handle =
    884       ScopedPlatformHandle(PlatformHandle(process_handle));
    885 #endif
    886   scoped_refptr<NodeChannel> sender = GetPeerChannel(from_node);
    887   if (!sender) {
    888     DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender.";
    889     return;
    890   }
    891 
    892   if (GetPeerChannel(client_name)) {
    893     DLOG(ERROR) << "Ignoring AddBrokerClient for known client.";
    894     DropPeer(from_node, nullptr);
    895     return;
    896   }
    897 
    898   PlatformChannelPair broker_channel;
    899   scoped_refptr<NodeChannel> client = NodeChannel::Create(
    900       this, broker_channel.PassServerHandle(), io_task_runner_,
    901       ProcessErrorCallback());
    902 
    903 #if defined(OS_WIN)
    904   // The broker must have a working handle to the client process in order to
    905   // properly copy other handles to and from the client.
    906   if (!scoped_process_handle.is_valid()) {
    907     DLOG(ERROR) << "Broker rejecting client with invalid process handle.";
    908     return;
    909   }
    910   client->SetRemoteProcessHandle(scoped_process_handle.release().handle);
    911 #else
    912   client->SetRemoteProcessHandle(process_handle);
    913 #endif
    914 
    915   AddPeer(client_name, client, true /* start_channel */);
    916 
    917   DVLOG(1) << "Broker " << name_ << " accepting client " << client_name
    918            << " from peer " << from_node;
    919 
    920   sender->BrokerClientAdded(client_name, broker_channel.PassClientHandle());
    921 }
    922 
    923 void NodeController::OnBrokerClientAdded(const ports::NodeName& from_node,
    924                                          const ports::NodeName& client_name,
    925                                          ScopedPlatformHandle broker_channel) {
    926   scoped_refptr<NodeChannel> client = GetPeerChannel(client_name);
    927   if (!client) {
    928     DLOG(ERROR) << "BrokerClientAdded for unknown child " << client_name;
    929     return;
    930   }
    931 
    932   // This should have come from our own broker.
    933   if (GetBrokerChannel() != GetPeerChannel(from_node)) {
    934     DLOG(ERROR) << "BrokerClientAdded from non-broker node " << from_node;
    935     return;
    936   }
    937 
    938   DVLOG(1) << "Child " << client_name << " accepted by broker " << from_node;
    939 
    940   client->AcceptBrokerClient(from_node, std::move(broker_channel));
    941 }
    942 
    943 void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node,
    944                                           const ports::NodeName& broker_name,
    945                                           ScopedPlatformHandle broker_channel) {
    946   // This node should already have a parent in bootstrap mode.
    947   ports::NodeName parent_name;
    948   scoped_refptr<NodeChannel> parent;
    949   {
    950     base::AutoLock lock(parent_lock_);
    951     parent_name = parent_name_;
    952     parent = bootstrap_parent_channel_;
    953     bootstrap_parent_channel_ = nullptr;
    954   }
    955   DCHECK(parent_name == from_node);
    956   DCHECK(parent);
    957 
    958   std::queue<ports::NodeName> pending_broker_clients;
    959   std::unordered_map<ports::NodeName, OutgoingMessageQueue>
    960       pending_relay_messages;
    961   {
    962     base::AutoLock lock(broker_lock_);
    963     broker_name_ = broker_name;
    964     std::swap(pending_broker_clients, pending_broker_clients_);
    965     std::swap(pending_relay_messages, pending_relay_messages_);
    966   }
    967   DCHECK(broker_name != ports::kInvalidNodeName);
    968 
    969   // It's now possible to add both the broker and the parent as peers.
    970   // Note that the broker and parent may be the same node.
    971   scoped_refptr<NodeChannel> broker;
    972   if (broker_name == parent_name) {
    973     DCHECK(!broker_channel.is_valid());
    974     broker = parent;
    975   } else {
    976     DCHECK(broker_channel.is_valid());
    977     broker = NodeChannel::Create(this, std::move(broker_channel),
    978                                  io_task_runner_, ProcessErrorCallback());
    979     AddPeer(broker_name, broker, true /* start_channel */);
    980   }
    981 
    982   AddPeer(parent_name, parent, false /* start_channel */);
    983 
    984   {
    985     // Complete any port merge requests we have waiting for the parent.
    986     base::AutoLock lock(pending_port_merges_lock_);
    987     for (const auto& request : pending_port_merges_)
    988       parent->RequestPortMerge(request.second.name(), request.first);
    989     pending_port_merges_.clear();
    990   }
    991 
    992   // Feed the broker any pending children of our own.
    993   while (!pending_broker_clients.empty()) {
    994     const ports::NodeName& child_name = pending_broker_clients.front();
    995     auto it = pending_children_.find(child_name);
    996     DCHECK(it != pending_children_.end());
    997     broker->AddBrokerClient(child_name, it->second->CopyRemoteProcessHandle());
    998     pending_broker_clients.pop();
    999   }
   1000 
   1001 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
   1002   // Have the broker relay any messages we have waiting.
   1003   for (auto& entry : pending_relay_messages) {
   1004     const ports::NodeName& destination = entry.first;
   1005     auto& message_queue = entry.second;
   1006     while (!message_queue.empty()) {
   1007       broker->RelayPortsMessage(destination, std::move(message_queue.front()));
   1008       message_queue.pop();
   1009     }
   1010   }
   1011 #endif
   1012 
   1013   DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name;
   1014 }
   1015 
   1016 void NodeController::OnPortsMessage(const ports::NodeName& from_node,
   1017                                     Channel::MessagePtr channel_message) {
   1018   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
   1019 
   1020   void* data;
   1021   size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
   1022   if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes,
   1023                          &num_header_bytes, &num_payload_bytes,
   1024                          &num_ports_bytes)) {
   1025     DropPeer(from_node, nullptr);
   1026     return;
   1027   }
   1028 
   1029   CHECK(channel_message);
   1030   std::unique_ptr<PortsMessage> ports_message(
   1031       new PortsMessage(num_header_bytes,
   1032                        num_payload_bytes,
   1033                        num_ports_bytes,
   1034                        std::move(channel_message)));
   1035   ports_message->set_source_node(from_node);
   1036   node_->AcceptMessage(ports::ScopedMessage(ports_message.release()));
   1037   AcceptIncomingMessages();
   1038 }
   1039 
   1040 void NodeController::OnRequestPortMerge(
   1041     const ports::NodeName& from_node,
   1042     const ports::PortName& connector_port_name,
   1043     const std::string& token) {
   1044   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
   1045 
   1046   DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token "
   1047            << token << " and port " << connector_port_name << "@" << from_node;
   1048 
   1049   ports::PortRef local_port;
   1050   {
   1051     base::AutoLock lock(reserved_ports_lock_);
   1052     auto it = reserved_ports_.find(token);
   1053     if (it == reserved_ports_.end()) {
   1054       DVLOG(1) << "Ignoring request to connect to port for unknown token "
   1055                << token;
   1056       return;
   1057     }
   1058     local_port = it->second.port;
   1059   }
   1060 
   1061   int rv = node_->MergePorts(local_port, from_node, connector_port_name);
   1062   if (rv != ports::OK)
   1063     DLOG(ERROR) << "MergePorts failed: " << rv;
   1064 
   1065   AcceptIncomingMessages();
   1066 }
   1067 
   1068 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
   1069                                            const ports::NodeName& name) {
   1070   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
   1071 
   1072   scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
   1073   if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
   1074     DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
   1075                 << from_node;
   1076     DropPeer(from_node, nullptr);
   1077     return;
   1078   }
   1079 
   1080   scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name);
   1081   if (!new_friend) {
   1082     // We don't know who they're talking about!
   1083     requestor->Introduce(name, ScopedPlatformHandle());
   1084   } else {
   1085     PlatformChannelPair new_channel;
   1086     requestor->Introduce(name, new_channel.PassServerHandle());
   1087     new_friend->Introduce(from_node, new_channel.PassClientHandle());
   1088   }
   1089 }
   1090 
   1091 void NodeController::OnIntroduce(const ports::NodeName& from_node,
   1092                                  const ports::NodeName& name,
   1093                                  ScopedPlatformHandle channel_handle) {
   1094   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
   1095 
   1096   if (!channel_handle.is_valid()) {
   1097     node_->LostConnectionToNode(name);
   1098 
   1099     DLOG(ERROR) << "Could not be introduced to peer " << name;
   1100     base::AutoLock lock(peers_lock_);
   1101     pending_peer_messages_.erase(name);
   1102     return;
   1103   }
   1104 
   1105   scoped_refptr<NodeChannel> channel =
   1106       NodeChannel::Create(this, std::move(channel_handle), io_task_runner_,
   1107                           ProcessErrorCallback());
   1108 
   1109   DVLOG(1) << "Adding new peer " << name << " via parent introduction.";
   1110   AddPeer(name, channel, true /* start_channel */);
   1111 }
   1112 
   1113 void NodeController::OnBroadcast(const ports::NodeName& from_node,
   1114                                  Channel::MessagePtr message) {
   1115   DCHECK(!message->has_handles());
   1116 
   1117   void* data;
   1118   size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
   1119   if (!ParsePortsMessage(message.get(), &data, &num_data_bytes,
   1120                          &num_header_bytes, &num_payload_bytes,
   1121                          &num_ports_bytes)) {
   1122     DropPeer(from_node, nullptr);
   1123     return;
   1124   }
   1125 
   1126   // Broadcast messages must not contain ports.
   1127   if (num_ports_bytes > 0) {
   1128     DropPeer(from_node, nullptr);
   1129     return;
   1130   }
   1131 
   1132   base::AutoLock lock(peers_lock_);
   1133   for (auto& iter : peers_) {
   1134     // Copy and send the message to each known peer.
   1135     Channel::MessagePtr peer_message(
   1136         new Channel::Message(message->payload_size(), 0));
   1137     memcpy(peer_message->mutable_payload(), message->payload(),
   1138            message->payload_size());
   1139     iter.second->PortsMessage(std::move(peer_message));
   1140   }
   1141 }
   1142 
   1143 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
   1144 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node,
   1145                                          base::ProcessHandle from_process,
   1146                                          const ports::NodeName& destination,
   1147                                          Channel::MessagePtr message) {
   1148   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
   1149 
   1150   if (GetBrokerChannel()) {
   1151     // Only the broker should be asked to relay a message.
   1152     LOG(ERROR) << "Non-broker refusing to relay message.";
   1153     DropPeer(from_node, nullptr);
   1154     return;
   1155   }
   1156 
   1157   // The parent should always know which process this came from.
   1158   DCHECK(from_process != base::kNullProcessHandle);
   1159 
   1160 #if defined(OS_WIN)
   1161   // Rewrite the handles to this (the parent) process. If the message is
   1162   // destined for another child process, the handles will be rewritten to that
   1163   // process before going out (see NodeChannel::WriteChannelMessage).
   1164   //
   1165   // TODO: We could avoid double-duplication.
   1166   //
   1167   // Note that we explicitly mark the handles as being owned by the sending
   1168   // process before rewriting them, in order to accommodate RewriteHandles'
   1169   // internal sanity checks.
   1170   ScopedPlatformHandleVectorPtr handles = message->TakeHandles();
   1171   for (size_t i = 0; i < handles->size(); ++i)
   1172     (*handles)[i].owning_process = from_process;
   1173   if (!Channel::Message::RewriteHandles(from_process,
   1174                                         base::GetCurrentProcessHandle(),
   1175                                         handles.get())) {
   1176     DLOG(ERROR) << "Failed to relay one or more handles.";
   1177   }
   1178   message->SetHandles(std::move(handles));
   1179 #else
   1180   MachPortRelay* relay = GetMachPortRelay();
   1181   if (!relay) {
   1182     LOG(ERROR) << "Receiving Mach ports without a port relay from "
   1183                << from_node << ". Dropping message.";
   1184     return;
   1185   }
   1186   if (!relay->ExtractPortRights(message.get(), from_process)) {
   1187     // NodeChannel should ensure that MachPortRelay is ready for the remote
   1188     // process. At this point, if the port extraction failed, either something
   1189     // went wrong in the mach stuff, or the remote process died.
   1190     LOG(ERROR) << "Error on receiving Mach ports " << from_node
   1191                << ". Dropping message.";
   1192     return;
   1193   }
   1194 #endif  // defined(OS_WIN)
   1195 
   1196   if (destination == name_) {
   1197     // Great, we can deliver this message locally.
   1198     OnPortsMessage(from_node, std::move(message));
   1199     return;
   1200   }
   1201 
   1202   scoped_refptr<NodeChannel> peer = GetPeerChannel(destination);
   1203   if (peer)
   1204     peer->PortsMessageFromRelay(from_node, std::move(message));
   1205   else
   1206     DLOG(ERROR) << "Dropping relay message for unknown node " << destination;
   1207 }
   1208 
   1209 void NodeController::OnPortsMessageFromRelay(const ports::NodeName& from_node,
   1210                                              const ports::NodeName& source_node,
   1211                                              Channel::MessagePtr message) {
   1212   if (GetPeerChannel(from_node) != GetBrokerChannel()) {
   1213     LOG(ERROR) << "Refusing relayed message from non-broker node.";
   1214     DropPeer(from_node, nullptr);
   1215     return;
   1216   }
   1217 
   1218   OnPortsMessage(source_node, std::move(message));
   1219 }
   1220 #endif
   1221 
   1222 void NodeController::OnChannelError(const ports::NodeName& from_node,
   1223                                     NodeChannel* channel) {
   1224   if (io_task_runner_->RunsTasksOnCurrentThread()) {
   1225     DropPeer(from_node, channel);
   1226     // DropPeer may have caused local port closures, so be sure to process any
   1227     // pending local messages.
   1228     AcceptIncomingMessages();
   1229   } else {
   1230     io_task_runner_->PostTask(
   1231         FROM_HERE,
   1232         base::Bind(&NodeController::OnChannelError, base::Unretained(this),
   1233                    from_node, channel));
   1234   }
   1235 }
   1236 
   1237 #if defined(OS_MACOSX) && !defined(OS_IOS)
   1238 MachPortRelay* NodeController::GetMachPortRelay() {
   1239   {
   1240     base::AutoLock lock(parent_lock_);
   1241     // Return null if we're not the root.
   1242     if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName)
   1243       return nullptr;
   1244   }
   1245 
   1246   base::AutoLock lock(mach_port_relay_lock_);
   1247   return mach_port_relay_.get();
   1248 }
   1249 #endif
   1250 
   1251 void NodeController::DestroyOnIOThreadShutdown() {
   1252   destroy_on_io_thread_shutdown_ = true;
   1253 }
   1254 
   1255 void NodeController::AttemptShutdownIfRequested() {
   1256   if (!shutdown_callback_flag_)
   1257     return;
   1258 
   1259   base::Closure callback;
   1260   {
   1261     base::AutoLock lock(shutdown_lock_);
   1262     if (shutdown_callback_.is_null())
   1263       return;
   1264     if (!node_->CanShutdownCleanly(true /* allow_local_ports */)) {
   1265       DVLOG(2) << "Unable to cleanly shut down node " << name_;
   1266       return;
   1267     }
   1268 
   1269     callback = shutdown_callback_;
   1270     shutdown_callback_.Reset();
   1271     shutdown_callback_flag_.Set(false);
   1272   }
   1273 
   1274   DCHECK(!callback.is_null());
   1275 
   1276   callback.Run();
   1277 }
   1278 
   1279 }  // namespace edk
   1280 }  // namespace mojo
   1281