Home | History | Annotate | Download | only in core
      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/core/node_controller.h"
      6 
      7 #include <algorithm>
      8 #include <limits>
      9 #include <vector>
     10 
     11 #include "base/bind.h"
     12 #include "base/containers/queue.h"
     13 #include "base/location.h"
     14 #include "base/logging.h"
     15 #include "base/macros.h"
     16 #include "base/message_loop/message_loop_current.h"
     17 #include "base/metrics/histogram_macros.h"
     18 #include "base/process/process_handle.h"
     19 #include "base/rand_util.h"
     20 #include "base/time/time.h"
     21 #include "base/timer/elapsed_timer.h"
     22 #include "mojo/core/broker.h"
     23 #include "mojo/core/broker_host.h"
     24 #include "mojo/core/configuration.h"
     25 #include "mojo/core/core.h"
     26 #include "mojo/core/request_context.h"
     27 #include "mojo/core/user_message_impl.h"
     28 #include "mojo/public/cpp/platform/named_platform_channel.h"
     29 #include "mojo/public/cpp/platform/platform_channel.h"
     30 
     31 #if defined(OS_WIN)
     32 #include <windows.h>
     33 #endif
     34 
     35 #if defined(OS_MACOSX) && !defined(OS_IOS)
     36 #include "mojo/core/mach_port_relay.h"
     37 #endif
     38 
     39 #if !defined(OS_NACL)
     40 #include "crypto/random.h"
     41 #endif
     42 
     43 namespace mojo {
     44 namespace core {
     45 
     46 namespace {
     47 
     48 #if defined(OS_NACL)
     49 template <typename T>
     50 void GenerateRandomName(T* out) {
     51   base::RandBytes(out, sizeof(T));
     52 }
     53 #else
     54 template <typename T>
     55 void GenerateRandomName(T* out) {
     56   crypto::RandBytes(out, sizeof(T));
     57 }
     58 #endif
     59 
     60 ports::NodeName GetRandomNodeName() {
     61   ports::NodeName name;
     62   GenerateRandomName(&name);
     63   return name;
     64 }
     65 
     66 Channel::MessagePtr SerializeEventMessage(ports::ScopedEvent event) {
     67   if (event->type() == ports::Event::Type::kUserMessage) {
     68     // User message events must already be partially serialized.
     69     return UserMessageImpl::FinalizeEventMessage(
     70         ports::Event::Cast<ports::UserMessageEvent>(&event));
     71   }
     72 
     73   void* data;
     74   size_t size = event->GetSerializedSize();
     75   auto message = NodeChannel::CreateEventMessage(size, size, &data, 0);
     76   event->Serialize(data);
     77   return message;
     78 }
     79 
     80 ports::ScopedEvent DeserializeEventMessage(
     81     const ports::NodeName& from_node,
     82     Channel::MessagePtr channel_message) {
     83   void* data;
     84   size_t size;
     85   NodeChannel::GetEventMessageData(channel_message.get(), &data, &size);
     86   auto event = ports::Event::Deserialize(data, size);
     87   if (!event)
     88     return nullptr;
     89 
     90   if (event->type() != ports::Event::Type::kUserMessage)
     91     return event;
     92 
     93   // User messages require extra parsing.
     94   const size_t event_size = event->GetSerializedSize();
     95 
     96   // Note that if this weren't true, the event couldn't have been deserialized
     97   // in the first place.
     98   DCHECK_LE(event_size, size);
     99 
    100   auto message_event = ports::Event::Cast<ports::UserMessageEvent>(&event);
    101   auto message = UserMessageImpl::CreateFromChannelMessage(
    102       message_event.get(), std::move(channel_message),
    103       static_cast<uint8_t*>(data) + event_size, size - event_size);
    104   message->set_source_node(from_node);
    105 
    106   message_event->AttachMessage(std::move(message));
    107   return std::move(message_event);
    108 }
    109 
    110 // Used by NodeController to watch for shutdown. Since no IO can happen once
    111 // the IO thread is killed, the NodeController can cleanly drop all its peers
    112 // at that time.
    113 class ThreadDestructionObserver
    114     : public base::MessageLoopCurrent::DestructionObserver {
    115  public:
    116   static void Create(scoped_refptr<base::TaskRunner> task_runner,
    117                      const base::Closure& callback) {
    118     if (task_runner->RunsTasksInCurrentSequence()) {
    119       // Owns itself.
    120       new ThreadDestructionObserver(callback);
    121     } else {
    122       task_runner->PostTask(FROM_HERE,
    123                             base::Bind(&Create, task_runner, callback));
    124     }
    125   }
    126 
    127  private:
    128   explicit ThreadDestructionObserver(const base::Closure& callback)
    129       : callback_(callback) {
    130     base::MessageLoopCurrent::Get()->AddDestructionObserver(this);
    131   }
    132 
    133   ~ThreadDestructionObserver() override {
    134     base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this);
    135   }
    136 
    137   // base::MessageLoopCurrent::DestructionObserver:
    138   void WillDestroyCurrentMessageLoop() override {
    139     callback_.Run();
    140     delete this;
    141   }
    142 
    143   const base::Closure callback_;
    144 
    145   DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver);
    146 };
    147 
    148 }  // namespace
    149 
    150 NodeController::~NodeController() {}
    151 
    152 NodeController::NodeController(Core* core)
    153     : core_(core),
    154       name_(GetRandomNodeName()),
    155       node_(new ports::Node(name_, this)) {
    156   DVLOG(1) << "Initializing node " << name_;
    157 }
    158 
    159 #if defined(OS_MACOSX) && !defined(OS_IOS)
    160 void NodeController::CreateMachPortRelay(base::PortProvider* port_provider) {
    161   base::AutoLock lock(mach_port_relay_lock_);
    162   DCHECK(!mach_port_relay_);
    163   mach_port_relay_.reset(new MachPortRelay(port_provider));
    164 }
    165 #endif
    166 
    167 void NodeController::SetIOTaskRunner(
    168     scoped_refptr<base::TaskRunner> task_runner) {
    169   io_task_runner_ = task_runner;
    170   ThreadDestructionObserver::Create(
    171       io_task_runner_,
    172       base::Bind(&NodeController::DropAllPeers, base::Unretained(this)));
    173 }
    174 
    175 void NodeController::SendBrokerClientInvitation(
    176     base::ProcessHandle target_process,
    177     ConnectionParams connection_params,
    178     const std::vector<std::pair<std::string, ports::PortRef>>& attached_ports,
    179     const ProcessErrorCallback& process_error_callback) {
    180   // Generate the temporary remote node name here so that it can be associated
    181   // with the ports "attached" to this invitation.
    182   ports::NodeName temporary_node_name;
    183   GenerateRandomName(&temporary_node_name);
    184 
    185   {
    186     base::AutoLock lock(reserved_ports_lock_);
    187     PortMap& port_map = reserved_ports_[temporary_node_name];
    188     for (auto& entry : attached_ports) {
    189       auto result = port_map.emplace(entry.first, entry.second);
    190       DCHECK(result.second) << "Duplicate attachment: " << entry.first;
    191     }
    192   }
    193 
    194   ScopedProcessHandle scoped_target_process =
    195       ScopedProcessHandle::CloneFrom(target_process);
    196   io_task_runner_->PostTask(
    197       FROM_HERE,
    198       base::BindOnce(&NodeController::SendBrokerClientInvitationOnIOThread,
    199                      base::Unretained(this), std::move(scoped_target_process),
    200                      std::move(connection_params), temporary_node_name,
    201                      process_error_callback));
    202 }
    203 
    204 void NodeController::AcceptBrokerClientInvitation(
    205     ConnectionParams connection_params) {
    206   DCHECK(!GetConfiguration().is_broker_process);
    207 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) && !defined(OS_FUCHSIA)
    208   // Use the bootstrap channel for the broker and receive the node's channel
    209   // synchronously as the first message from the broker.
    210   DCHECK(connection_params.endpoint().is_valid());
    211   base::ElapsedTimer timer;
    212   broker_ = std::make_unique<Broker>(
    213       connection_params.TakeEndpoint().TakePlatformHandle());
    214   PlatformChannelEndpoint endpoint = broker_->GetInviterEndpoint();
    215 
    216   if (!endpoint.is_valid()) {
    217     // Most likely the inviter's side of the channel has already been closed and
    218     // the broker was unable to negotiate a NodeChannel pipe. In this case we
    219     // can cancel our connection to our inviter.
    220     DVLOG(1) << "Cannot connect to invalid inviter channel.";
    221     CancelPendingPortMerges();
    222     return;
    223   }
    224   connection_params = ConnectionParams(std::move(endpoint));
    225 #endif
    226 
    227   io_task_runner_->PostTask(
    228       FROM_HERE,
    229       base::BindOnce(&NodeController::AcceptBrokerClientInvitationOnIOThread,
    230                      base::Unretained(this), std::move(connection_params)));
    231 }
    232 
    233 void NodeController::ConnectIsolated(ConnectionParams connection_params,
    234                                      const ports::PortRef& port,
    235                                      base::StringPiece connection_name) {
    236   io_task_runner_->PostTask(
    237       FROM_HERE,
    238       base::BindOnce(&NodeController::ConnectIsolatedOnIOThread,
    239                      base::Unretained(this), base::Passed(&connection_params),
    240                      port, connection_name.as_string()));
    241 }
    242 
    243 void NodeController::SetPortObserver(const ports::PortRef& port,
    244                                      scoped_refptr<PortObserver> observer) {
    245   node_->SetUserData(port, std::move(observer));
    246 }
    247 
    248 void NodeController::ClosePort(const ports::PortRef& port) {
    249   SetPortObserver(port, nullptr);
    250   int rv = node_->ClosePort(port);
    251   DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name();
    252 }
    253 
    254 int NodeController::SendUserMessage(
    255     const ports::PortRef& port,
    256     std::unique_ptr<ports::UserMessageEvent> message) {
    257   return node_->SendUserMessage(port, std::move(message));
    258 }
    259 
    260 void NodeController::MergePortIntoInviter(const std::string& name,
    261                                           const ports::PortRef& port) {
    262   scoped_refptr<NodeChannel> inviter;
    263   bool reject_merge = false;
    264   {
    265     // Hold |pending_port_merges_lock_| while getting |inviter|. Otherwise,
    266     // there is a race where the inviter can be set, and |pending_port_merges_|
    267     // be processed between retrieving |inviter| and adding the merge to
    268     // |pending_port_merges_|.
    269     base::AutoLock lock(pending_port_merges_lock_);
    270     inviter = GetInviterChannel();
    271     if (reject_pending_merges_) {
    272       reject_merge = true;
    273     } else if (!inviter) {
    274       pending_port_merges_.push_back(std::make_pair(name, port));
    275       return;
    276     }
    277   }
    278   if (reject_merge) {
    279     node_->ClosePort(port);
    280     DVLOG(2) << "Rejecting port merge for name " << name
    281              << " due to closed inviter channel.";
    282     return;
    283   }
    284 
    285   inviter->RequestPortMerge(port.name(), name);
    286 }
    287 
    288 int NodeController::MergeLocalPorts(const ports::PortRef& port0,
    289                                     const ports::PortRef& port1) {
    290   return node_->MergeLocalPorts(port0, port1);
    291 }
    292 
    293 base::WritableSharedMemoryRegion NodeController::CreateSharedBuffer(
    294     size_t num_bytes) {
    295 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) && !defined(OS_FUCHSIA)
    296   // Shared buffer creation failure is fatal, so always use the broker when we
    297   // have one; unless of course the embedder forces us not to.
    298   if (!GetConfiguration().force_direct_shared_memory_allocation && broker_)
    299     return broker_->GetWritableSharedMemoryRegion(num_bytes);
    300 #endif
    301   return base::WritableSharedMemoryRegion::Create(num_bytes);
    302 }
    303 
    304 void NodeController::RequestShutdown(const base::Closure& callback) {
    305   {
    306     base::AutoLock lock(shutdown_lock_);
    307     shutdown_callback_ = callback;
    308     shutdown_callback_flag_.Set(true);
    309   }
    310 
    311   AttemptShutdownIfRequested();
    312 }
    313 
    314 void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node,
    315                                           const std::string& error) {
    316   scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node);
    317   if (peer)
    318     peer->NotifyBadMessage(error);
    319 }
    320 
    321 void NodeController::SendBrokerClientInvitationOnIOThread(
    322     ScopedProcessHandle target_process,
    323     ConnectionParams connection_params,
    324     ports::NodeName temporary_node_name,
    325     const ProcessErrorCallback& process_error_callback) {
    326   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    327 
    328 #if !defined(OS_MACOSX) && !defined(OS_NACL) && !defined(OS_FUCHSIA)
    329   PlatformChannel node_channel;
    330   ConnectionParams node_connection_params(node_channel.TakeLocalEndpoint());
    331   // BrokerHost owns itself.
    332   BrokerHost* broker_host =
    333       new BrokerHost(target_process.get(), std::move(connection_params),
    334                      process_error_callback);
    335   bool channel_ok = broker_host->SendChannel(
    336       node_channel.TakeRemoteEndpoint().TakePlatformHandle());
    337 
    338 #if defined(OS_WIN)
    339   if (!channel_ok) {
    340     // On Windows the above operation may fail if the channel is crossing a
    341     // session boundary. In that case we fall back to a named pipe.
    342     NamedPlatformChannel::Options options;
    343     NamedPlatformChannel named_channel(options);
    344     node_connection_params =
    345         ConnectionParams(named_channel.TakeServerEndpoint());
    346     broker_host->SendNamedChannel(named_channel.GetServerName());
    347   }
    348 #else
    349   CHECK(channel_ok);
    350 #endif  // defined(OS_WIN)
    351 
    352   scoped_refptr<NodeChannel> channel =
    353       NodeChannel::Create(this, std::move(node_connection_params),
    354                           io_task_runner_, process_error_callback);
    355 
    356 #else   // !defined(OS_MACOSX) && !defined(OS_NACL)
    357   scoped_refptr<NodeChannel> channel =
    358       NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
    359                           process_error_callback);
    360 #endif  // !defined(OS_MACOSX) && !defined(OS_NACL)
    361 
    362   // We set up the invitee channel with a temporary name so it can be identified
    363   // as a pending invitee if it writes any messages to the channel. We may start
    364   // receiving messages from it (though we shouldn't) as soon as Start() is
    365   // called below.
    366 
    367   pending_invitations_.insert(std::make_pair(temporary_node_name, channel));
    368 
    369   channel->SetRemoteNodeName(temporary_node_name);
    370   channel->SetRemoteProcessHandle(std::move(target_process));
    371   channel->Start();
    372 
    373   channel->AcceptInvitee(name_, temporary_node_name);
    374 }
    375 
    376 void NodeController::AcceptBrokerClientInvitationOnIOThread(
    377     ConnectionParams connection_params) {
    378   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    379 
    380   {
    381     base::AutoLock lock(inviter_lock_);
    382     DCHECK(inviter_name_ == ports::kInvalidNodeName);
    383 
    384     // At this point we don't know the inviter's name, so we can't yet insert it
    385     // into our |peers_| map. That will happen as soon as we receive an
    386     // AcceptInvitee message from them.
    387     bootstrap_inviter_channel_ =
    388         NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
    389                             ProcessErrorCallback());
    390     // Prevent the inviter pipe handle from being closed on shutdown. Pipe
    391     // closure may be used by the inviter to detect the invitee process has
    392     // exited.
    393     bootstrap_inviter_channel_->LeakHandleOnShutdown();
    394   }
    395   bootstrap_inviter_channel_->Start();
    396 }
    397 
    398 void NodeController::ConnectIsolatedOnIOThread(
    399     ConnectionParams connection_params,
    400     ports::PortRef port,
    401     const std::string& connection_name) {
    402   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    403 
    404   scoped_refptr<NodeChannel> channel = NodeChannel::Create(
    405       this, std::move(connection_params), io_task_runner_, {});
    406 
    407   RequestContext request_context;
    408   ports::NodeName token;
    409   GenerateRandomName(&token);
    410   pending_isolated_connections_.emplace(
    411       token, IsolatedConnection{channel, port, connection_name});
    412   if (!connection_name.empty()) {
    413     // If a connection already exists with this name, drop it.
    414     auto it = named_isolated_connections_.find(connection_name);
    415     if (it != named_isolated_connections_.end()) {
    416       ports::NodeName connection_node = it->second;
    417       if (connection_node != name_) {
    418         DropPeer(connection_node, nullptr);
    419       } else {
    420         auto pending_it = pending_isolated_connections_.find(connection_node);
    421         if (pending_it != pending_isolated_connections_.end()) {
    422           node_->ClosePort(pending_it->second.local_port);
    423           pending_isolated_connections_.erase(pending_it);
    424         }
    425         named_isolated_connections_.erase(it);
    426       }
    427     }
    428     named_isolated_connections_.emplace(connection_name, token);
    429   }
    430 
    431   channel->SetRemoteNodeName(token);
    432   channel->Start();
    433 
    434   channel->AcceptPeer(name_, token, port.name());
    435 }
    436 
    437 scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
    438     const ports::NodeName& name) {
    439   base::AutoLock lock(peers_lock_);
    440   auto it = peers_.find(name);
    441   if (it == peers_.end())
    442     return nullptr;
    443   return it->second;
    444 }
    445 
    446 scoped_refptr<NodeChannel> NodeController::GetInviterChannel() {
    447   ports::NodeName inviter_name;
    448   {
    449     base::AutoLock lock(inviter_lock_);
    450     inviter_name = inviter_name_;
    451   }
    452   return GetPeerChannel(inviter_name);
    453 }
    454 
    455 scoped_refptr<NodeChannel> NodeController::GetBrokerChannel() {
    456   if (GetConfiguration().is_broker_process)
    457     return nullptr;
    458 
    459   ports::NodeName broker_name;
    460   {
    461     base::AutoLock lock(broker_lock_);
    462     broker_name = broker_name_;
    463   }
    464   return GetPeerChannel(broker_name);
    465 }
    466 
    467 void NodeController::AddPeer(const ports::NodeName& name,
    468                              scoped_refptr<NodeChannel> channel,
    469                              bool start_channel) {
    470   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    471 
    472   DCHECK(name != ports::kInvalidNodeName);
    473   DCHECK(channel);
    474 
    475   channel->SetRemoteNodeName(name);
    476 
    477   OutgoingMessageQueue pending_messages;
    478   {
    479     base::AutoLock lock(peers_lock_);
    480     if (peers_.find(name) != peers_.end()) {
    481       // This can happen normally if two nodes race to be introduced to each
    482       // other. The losing pipe will be silently closed and introduction should
    483       // not be affected.
    484       DVLOG(1) << "Ignoring duplicate peer name " << name;
    485       return;
    486     }
    487 
    488     auto result = peers_.insert(std::make_pair(name, channel));
    489     DCHECK(result.second);
    490 
    491     DVLOG(2) << "Accepting new peer " << name << " on node " << name_;
    492 
    493     auto it = pending_peer_messages_.find(name);
    494     if (it != pending_peer_messages_.end()) {
    495       std::swap(pending_messages, it->second);
    496       pending_peer_messages_.erase(it);
    497     }
    498   }
    499 
    500   if (start_channel)
    501     channel->Start();
    502 
    503   // Flush any queued message we need to deliver to this node.
    504   while (!pending_messages.empty()) {
    505     channel->SendChannelMessage(std::move(pending_messages.front()));
    506     pending_messages.pop();
    507   }
    508 }
    509 
    510 void NodeController::DropPeer(const ports::NodeName& name,
    511                               NodeChannel* channel) {
    512   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    513 
    514   {
    515     base::AutoLock lock(peers_lock_);
    516     auto it = peers_.find(name);
    517 
    518     if (it != peers_.end()) {
    519       ports::NodeName peer = it->first;
    520       peers_.erase(it);
    521       DVLOG(1) << "Dropped peer " << peer;
    522     }
    523 
    524     pending_peer_messages_.erase(name);
    525     pending_invitations_.erase(name);
    526   }
    527 
    528   std::vector<ports::PortRef> ports_to_close;
    529   {
    530     // Clean up any reserved ports.
    531     base::AutoLock lock(reserved_ports_lock_);
    532     auto it = reserved_ports_.find(name);
    533     if (it != reserved_ports_.end()) {
    534       for (auto& entry : it->second)
    535         ports_to_close.emplace_back(entry.second);
    536       reserved_ports_.erase(it);
    537     }
    538   }
    539 
    540   bool is_inviter;
    541   {
    542     base::AutoLock lock(inviter_lock_);
    543     is_inviter = (name == inviter_name_ ||
    544                   (channel && channel == bootstrap_inviter_channel_));
    545   }
    546 
    547   // If the error comes from the inviter channel, we also need to cancel any
    548   // port merge requests, so that errors can be propagated to the message
    549   // pipes.
    550   if (is_inviter)
    551     CancelPendingPortMerges();
    552 
    553   auto connection_it = pending_isolated_connections_.find(name);
    554   if (connection_it != pending_isolated_connections_.end()) {
    555     IsolatedConnection& connection = connection_it->second;
    556     ports_to_close.push_back(connection.local_port);
    557     if (!connection.name.empty())
    558       named_isolated_connections_.erase(connection.name);
    559     pending_isolated_connections_.erase(connection_it);
    560   }
    561 
    562   for (const auto& port : ports_to_close)
    563     node_->ClosePort(port);
    564 
    565   node_->LostConnectionToNode(name);
    566   AttemptShutdownIfRequested();
    567 }
    568 
    569 void NodeController::SendPeerEvent(const ports::NodeName& name,
    570                                    ports::ScopedEvent event) {
    571   Channel::MessagePtr event_message = SerializeEventMessage(std::move(event));
    572   if (!event_message)
    573     return;
    574   scoped_refptr<NodeChannel> peer = GetPeerChannel(name);
    575 #if defined(OS_WIN)
    576   if (event_message->has_handles()) {
    577     // If we're sending a message with handles we aren't the destination
    578     // node's inviter or broker (i.e. we don't know its process handle), ask
    579     // the broker to relay for us.
    580     scoped_refptr<NodeChannel> broker = GetBrokerChannel();
    581     if (!peer || !peer->HasRemoteProcessHandle()) {
    582       if (!GetConfiguration().is_broker_process && broker) {
    583         broker->RelayEventMessage(name, std::move(event_message));
    584       } else {
    585         base::AutoLock lock(broker_lock_);
    586         pending_relay_messages_[name].emplace(std::move(event_message));
    587       }
    588       return;
    589     }
    590   }
    591 #elif defined(OS_MACOSX) && !defined(OS_IOS)
    592   if (event_message->has_mach_ports()) {
    593     // Messages containing Mach ports are always routed through the broker, even
    594     // if the broker process is the intended recipient.
    595     bool use_broker = false;
    596     if (!GetConfiguration().is_broker_process) {
    597       base::AutoLock lock(inviter_lock_);
    598       use_broker = (bootstrap_inviter_channel_ ||
    599                     inviter_name_ != ports::kInvalidNodeName);
    600     }
    601 
    602     if (use_broker) {
    603       scoped_refptr<NodeChannel> broker = GetBrokerChannel();
    604       if (broker) {
    605         broker->RelayEventMessage(name, std::move(event_message));
    606       } else {
    607         base::AutoLock lock(broker_lock_);
    608         pending_relay_messages_[name].emplace(std::move(event_message));
    609       }
    610       return;
    611     }
    612   }
    613 #endif  // defined(OS_WIN)
    614 
    615   if (peer) {
    616     peer->SendChannelMessage(std::move(event_message));
    617     return;
    618   }
    619 
    620   // If we don't know who the peer is and we are the broker, we can only assume
    621   // the peer is invalid, i.e., it's either a junk name or has already been
    622   // disconnected.
    623   scoped_refptr<NodeChannel> broker = GetBrokerChannel();
    624   if (!broker) {
    625     DVLOG(1) << "Dropping message for unknown peer: " << name;
    626     return;
    627   }
    628 
    629   // If we aren't the broker, assume we just need to be introduced and queue
    630   // until that can be either confirmed or denied by the broker.
    631   bool needs_introduction = false;
    632   {
    633     base::AutoLock lock(peers_lock_);
    634     // We may have been introduced on another thread by the time we get here.
    635     // Double-check to be safe.
    636     auto it = peers_.find(name);
    637     if (it == peers_.end()) {
    638       auto& queue = pending_peer_messages_[name];
    639       needs_introduction = queue.empty();
    640       queue.emplace(std::move(event_message));
    641     } else {
    642       peer = it->second;
    643     }
    644   }
    645   if (needs_introduction)
    646     broker->RequestIntroduction(name);
    647   else if (peer)
    648     peer->SendChannelMessage(std::move(event_message));
    649 }
    650 
    651 void NodeController::DropAllPeers() {
    652   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    653 
    654   std::vector<scoped_refptr<NodeChannel>> all_peers;
    655   {
    656     base::AutoLock lock(inviter_lock_);
    657     if (bootstrap_inviter_channel_) {
    658       // |bootstrap_inviter_channel_| isn't null'd here becuase we rely on its
    659       // existence to determine whether or not this is the root node. Once
    660       // bootstrap_inviter_channel_->ShutDown() has been called,
    661       // |bootstrap_inviter_channel_| is essentially a dead object and it
    662       // doesn't matter if it's deleted now or when |this| is deleted. Note:
    663       // |bootstrap_inviter_channel_| is only modified on the IO thread.
    664       all_peers.push_back(bootstrap_inviter_channel_);
    665     }
    666   }
    667 
    668   {
    669     base::AutoLock lock(peers_lock_);
    670     for (const auto& peer : peers_)
    671       all_peers.push_back(peer.second);
    672     for (const auto& peer : pending_invitations_)
    673       all_peers.push_back(peer.second);
    674     peers_.clear();
    675     pending_invitations_.clear();
    676     pending_peer_messages_.clear();
    677     pending_isolated_connections_.clear();
    678     named_isolated_connections_.clear();
    679   }
    680 
    681   for (const auto& peer : all_peers)
    682     peer->ShutDown();
    683 
    684   if (destroy_on_io_thread_shutdown_)
    685     delete this;
    686 }
    687 
    688 void NodeController::ForwardEvent(const ports::NodeName& node,
    689                                   ports::ScopedEvent event) {
    690   DCHECK(event);
    691   if (node == name_)
    692     node_->AcceptEvent(std::move(event));
    693   else
    694     SendPeerEvent(node, std::move(event));
    695 
    696   AttemptShutdownIfRequested();
    697 }
    698 
    699 void NodeController::BroadcastEvent(ports::ScopedEvent event) {
    700   Channel::MessagePtr channel_message = SerializeEventMessage(std::move(event));
    701   DCHECK(channel_message && !channel_message->has_handles());
    702 
    703   scoped_refptr<NodeChannel> broker = GetBrokerChannel();
    704   if (broker)
    705     broker->Broadcast(std::move(channel_message));
    706   else
    707     OnBroadcast(name_, std::move(channel_message));
    708 }
    709 
    710 void NodeController::PortStatusChanged(const ports::PortRef& port) {
    711   scoped_refptr<ports::UserData> user_data;
    712   node_->GetUserData(port, &user_data);
    713 
    714   PortObserver* observer = static_cast<PortObserver*>(user_data.get());
    715   if (observer) {
    716     observer->OnPortStatusChanged();
    717   } else {
    718     DVLOG(2) << "Ignoring status change for " << port.name() << " because it "
    719              << "doesn't have an observer.";
    720   }
    721 }
    722 
    723 void NodeController::OnAcceptInvitee(const ports::NodeName& from_node,
    724                                      const ports::NodeName& inviter_name,
    725                                      const ports::NodeName& token) {
    726   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    727 
    728   scoped_refptr<NodeChannel> inviter;
    729   {
    730     base::AutoLock lock(inviter_lock_);
    731     if (bootstrap_inviter_channel_ &&
    732         inviter_name_ == ports::kInvalidNodeName) {
    733       inviter_name_ = inviter_name;
    734       inviter = bootstrap_inviter_channel_;
    735     }
    736   }
    737 
    738   if (!inviter) {
    739     DLOG(ERROR) << "Unexpected AcceptInvitee message from " << from_node;
    740     DropPeer(from_node, nullptr);
    741     return;
    742   }
    743 
    744   inviter->SetRemoteNodeName(inviter_name);
    745   inviter->AcceptInvitation(token, name_);
    746 
    747   // NOTE: The invitee does not actually add its inviter as a peer until
    748   // receiving an AcceptBrokerClient message from the broker. The inviter will
    749   // request that said message be sent upon receiving AcceptInvitation.
    750 
    751   DVLOG(1) << "Broker client " << name_ << " accepting invitation from "
    752            << inviter_name;
    753 }
    754 
    755 void NodeController::OnAcceptInvitation(const ports::NodeName& from_node,
    756                                         const ports::NodeName& token,
    757                                         const ports::NodeName& invitee_name) {
    758   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    759 
    760   auto it = pending_invitations_.find(from_node);
    761   if (it == pending_invitations_.end() || token != from_node) {
    762     DLOG(ERROR) << "Received unexpected AcceptInvitation message from "
    763                 << from_node;
    764     DropPeer(from_node, nullptr);
    765     return;
    766   }
    767 
    768   {
    769     base::AutoLock lock(reserved_ports_lock_);
    770     auto it = reserved_ports_.find(from_node);
    771     if (it != reserved_ports_.end()) {
    772       // Swap the temporary node name's reserved ports into an entry keyed by
    773       // the real node name.
    774       auto result =
    775           reserved_ports_.emplace(invitee_name, std::move(it->second));
    776       DCHECK(result.second);
    777       reserved_ports_.erase(it);
    778     }
    779   }
    780 
    781   scoped_refptr<NodeChannel> channel = it->second;
    782   pending_invitations_.erase(it);
    783 
    784   DCHECK(channel);
    785 
    786   DVLOG(1) << "Node " << name_ << " accepted invitee " << invitee_name;
    787 
    788   AddPeer(invitee_name, channel, false /* start_channel */);
    789 
    790   // TODO(rockot): We could simplify invitee initialization if we could
    791   // synchronously get a new async broker channel from the broker. For now we do
    792   // it asynchronously since it's only used to facilitate handle passing, not
    793   // handle creation.
    794   scoped_refptr<NodeChannel> broker = GetBrokerChannel();
    795   if (broker) {
    796     // Inform the broker of this new client.
    797     broker->AddBrokerClient(invitee_name, channel->CloneRemoteProcessHandle());
    798   } else {
    799     // If we have no broker, either we need to wait for one, or we *are* the
    800     // broker.
    801     scoped_refptr<NodeChannel> inviter = GetInviterChannel();
    802     if (!inviter) {
    803       base::AutoLock lock(inviter_lock_);
    804       inviter = bootstrap_inviter_channel_;
    805     }
    806 
    807     if (!inviter) {
    808       // Yes, we're the broker. We can initialize the client directly.
    809       channel->AcceptBrokerClient(name_, PlatformHandle());
    810     } else {
    811       // We aren't the broker, so wait for a broker connection.
    812       base::AutoLock lock(broker_lock_);
    813       pending_broker_clients_.push(invitee_name);
    814     }
    815   }
    816 }
    817 
    818 void NodeController::OnAddBrokerClient(const ports::NodeName& from_node,
    819                                        const ports::NodeName& client_name,
    820                                        base::ProcessHandle process_handle) {
    821   ScopedProcessHandle scoped_process_handle(process_handle);
    822 
    823   scoped_refptr<NodeChannel> sender = GetPeerChannel(from_node);
    824   if (!sender) {
    825     DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender.";
    826     return;
    827   }
    828 
    829   if (GetPeerChannel(client_name)) {
    830     DLOG(ERROR) << "Ignoring AddBrokerClient for known client.";
    831     DropPeer(from_node, nullptr);
    832     return;
    833   }
    834 
    835   PlatformChannel broker_channel;
    836   ConnectionParams connection_params(broker_channel.TakeLocalEndpoint());
    837   scoped_refptr<NodeChannel> client =
    838       NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
    839                           ProcessErrorCallback());
    840 
    841 #if defined(OS_WIN)
    842   // The broker must have a working handle to the client process in order to
    843   // properly copy other handles to and from the client.
    844   if (!scoped_process_handle.is_valid()) {
    845     DLOG(ERROR) << "Broker rejecting client with invalid process handle.";
    846     return;
    847   }
    848 #endif
    849   client->SetRemoteProcessHandle(std::move(scoped_process_handle));
    850 
    851   AddPeer(client_name, client, true /* start_channel */);
    852 
    853   DVLOG(1) << "Broker " << name_ << " accepting client " << client_name
    854            << " from peer " << from_node;
    855 
    856   sender->BrokerClientAdded(
    857       client_name, broker_channel.TakeRemoteEndpoint().TakePlatformHandle());
    858 }
    859 
    860 void NodeController::OnBrokerClientAdded(const ports::NodeName& from_node,
    861                                          const ports::NodeName& client_name,
    862                                          PlatformHandle broker_channel) {
    863   scoped_refptr<NodeChannel> client = GetPeerChannel(client_name);
    864   if (!client) {
    865     DLOG(ERROR) << "BrokerClientAdded for unknown client " << client_name;
    866     return;
    867   }
    868 
    869   // This should have come from our own broker.
    870   if (GetBrokerChannel() != GetPeerChannel(from_node)) {
    871     DLOG(ERROR) << "BrokerClientAdded from non-broker node " << from_node;
    872     return;
    873   }
    874 
    875   DVLOG(1) << "Client " << client_name << " accepted by broker " << from_node;
    876 
    877   client->AcceptBrokerClient(from_node, std::move(broker_channel));
    878 }
    879 
    880 void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node,
    881                                           const ports::NodeName& broker_name,
    882                                           PlatformHandle broker_channel) {
    883   DCHECK(!GetConfiguration().is_broker_process);
    884 
    885   // This node should already have an inviter in bootstrap mode.
    886   ports::NodeName inviter_name;
    887   scoped_refptr<NodeChannel> inviter;
    888   {
    889     base::AutoLock lock(inviter_lock_);
    890     inviter_name = inviter_name_;
    891     inviter = bootstrap_inviter_channel_;
    892     bootstrap_inviter_channel_ = nullptr;
    893   }
    894   DCHECK(inviter_name == from_node);
    895   DCHECK(inviter);
    896 
    897   base::queue<ports::NodeName> pending_broker_clients;
    898   std::unordered_map<ports::NodeName, OutgoingMessageQueue>
    899       pending_relay_messages;
    900   {
    901     base::AutoLock lock(broker_lock_);
    902     broker_name_ = broker_name;
    903     std::swap(pending_broker_clients, pending_broker_clients_);
    904     std::swap(pending_relay_messages, pending_relay_messages_);
    905   }
    906   DCHECK(broker_name != ports::kInvalidNodeName);
    907 
    908   // It's now possible to add both the broker and the inviter as peers.
    909   // Note that the broker and inviter may be the same node.
    910   scoped_refptr<NodeChannel> broker;
    911   if (broker_name == inviter_name) {
    912     DCHECK(!broker_channel.is_valid());
    913     broker = inviter;
    914   } else {
    915     DCHECK(broker_channel.is_valid());
    916     broker = NodeChannel::Create(
    917         this,
    918         ConnectionParams(PlatformChannelEndpoint(std::move(broker_channel))),
    919         io_task_runner_, ProcessErrorCallback());
    920     AddPeer(broker_name, broker, true /* start_channel */);
    921   }
    922 
    923   AddPeer(inviter_name, inviter, false /* start_channel */);
    924 
    925   {
    926     // Complete any port merge requests we have waiting for the inviter.
    927     base::AutoLock lock(pending_port_merges_lock_);
    928     for (const auto& request : pending_port_merges_)
    929       inviter->RequestPortMerge(request.second.name(), request.first);
    930     pending_port_merges_.clear();
    931   }
    932 
    933   // Feed the broker any pending invitees of our own.
    934   while (!pending_broker_clients.empty()) {
    935     const ports::NodeName& invitee_name = pending_broker_clients.front();
    936     auto it = pending_invitations_.find(invitee_name);
    937     // If for any reason we don't have a pending invitation for the invitee,
    938     // there's nothing left to do: we've already swapped the relevant state into
    939     // the stack.
    940     if (it != pending_invitations_.end()) {
    941       broker->AddBrokerClient(invitee_name,
    942                               it->second->CloneRemoteProcessHandle());
    943     }
    944     pending_broker_clients.pop();
    945   }
    946 
    947 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
    948   // Have the broker relay any messages we have waiting.
    949   for (auto& entry : pending_relay_messages) {
    950     const ports::NodeName& destination = entry.first;
    951     auto& message_queue = entry.second;
    952     while (!message_queue.empty()) {
    953       broker->RelayEventMessage(destination, std::move(message_queue.front()));
    954       message_queue.pop();
    955     }
    956   }
    957 #endif
    958 
    959   DVLOG(1) << "Client " << name_ << " accepted by broker " << broker_name;
    960 }
    961 
    962 void NodeController::OnEventMessage(const ports::NodeName& from_node,
    963                                     Channel::MessagePtr channel_message) {
    964   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    965 
    966   auto event = DeserializeEventMessage(from_node, std::move(channel_message));
    967   if (!event) {
    968     // We silently ignore unparseable events, as they may come from a process
    969     // running a newer version of Mojo.
    970     DVLOG(1) << "Ignoring invalid or unknown event from " << from_node;
    971     return;
    972   }
    973 
    974   node_->AcceptEvent(std::move(event));
    975 
    976   AttemptShutdownIfRequested();
    977 }
    978 
    979 void NodeController::OnRequestPortMerge(
    980     const ports::NodeName& from_node,
    981     const ports::PortName& connector_port_name,
    982     const std::string& name) {
    983   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    984 
    985   DVLOG(2) << "Node " << name_ << " received RequestPortMerge for name " << name
    986            << " and port " << connector_port_name << "@" << from_node;
    987 
    988   ports::PortRef local_port;
    989   {
    990     base::AutoLock lock(reserved_ports_lock_);
    991     auto it = reserved_ports_.find(from_node);
    992     // TODO(https://crbug.com/822034): We should send a notification back to the
    993     // requestor so they can clean up their dangling port in this failure case.
    994     // This requires changes to the internal protocol, which can't be made yet.
    995     // Until this is done, pipes from |MojoExtractMessagePipeFromInvitation()|
    996     // will never break if the given name was invalid.
    997     if (it == reserved_ports_.end()) {
    998       DVLOG(1) << "Ignoring port merge request from node " << from_node << ". "
    999                << "No ports reserved for that node.";
   1000       return;
   1001     }
   1002 
   1003     PortMap& port_map = it->second;
   1004     auto port_it = port_map.find(name);
   1005     if (port_it == port_map.end()) {
   1006       DVLOG(1) << "Ignoring request to connect to port for unknown name "
   1007                << name << " from node " << from_node;
   1008       return;
   1009     }
   1010     local_port = port_it->second;
   1011     port_map.erase(port_it);
   1012     if (port_map.empty())
   1013       reserved_ports_.erase(it);
   1014   }
   1015 
   1016   int rv = node_->MergePorts(local_port, from_node, connector_port_name);
   1017   if (rv != ports::OK)
   1018     DLOG(ERROR) << "MergePorts failed: " << rv;
   1019 }
   1020 
   1021 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
   1022                                            const ports::NodeName& name) {
   1023   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
   1024 
   1025   scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
   1026   if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
   1027     DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
   1028                 << from_node;
   1029     DropPeer(from_node, nullptr);
   1030     return;
   1031   }
   1032 
   1033   scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name);
   1034   if (!new_friend) {
   1035     // We don't know who they're talking about!
   1036     requestor->Introduce(name, PlatformHandle());
   1037   } else {
   1038     PlatformChannel new_channel;
   1039     requestor->Introduce(name,
   1040                          new_channel.TakeLocalEndpoint().TakePlatformHandle());
   1041     new_friend->Introduce(
   1042         from_node, new_channel.TakeRemoteEndpoint().TakePlatformHandle());
   1043   }
   1044 }
   1045 
   1046 void NodeController::OnIntroduce(const ports::NodeName& from_node,
   1047                                  const ports::NodeName& name,
   1048                                  PlatformHandle channel_handle) {
   1049   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
   1050 
   1051   if (!channel_handle.is_valid()) {
   1052     node_->LostConnectionToNode(name);
   1053 
   1054     DVLOG(1) << "Could not be introduced to peer " << name;
   1055     base::AutoLock lock(peers_lock_);
   1056     pending_peer_messages_.erase(name);
   1057     return;
   1058   }
   1059 
   1060   scoped_refptr<NodeChannel> channel = NodeChannel::Create(
   1061       this,
   1062       ConnectionParams(PlatformChannelEndpoint(std::move(channel_handle))),
   1063       io_task_runner_, ProcessErrorCallback());
   1064 
   1065   DVLOG(1) << "Adding new peer " << name << " via broker introduction.";
   1066   AddPeer(name, channel, true /* start_channel */);
   1067 }
   1068 
   1069 void NodeController::OnBroadcast(const ports::NodeName& from_node,
   1070                                  Channel::MessagePtr message) {
   1071   DCHECK(!message->has_handles());
   1072 
   1073   auto event = DeserializeEventMessage(from_node, std::move(message));
   1074   if (!event) {
   1075     // We silently ignore unparseable events, as they may come from a process
   1076     // running a newer version of Mojo.
   1077     DVLOG(1) << "Ignoring request to broadcast invalid or unknown event from "
   1078              << from_node;
   1079     return;
   1080   }
   1081 
   1082   base::AutoLock lock(peers_lock_);
   1083   for (auto& iter : peers_) {
   1084     // Clone and send the event to each known peer. Events which cannot be
   1085     // cloned cannot be broadcast.
   1086     ports::ScopedEvent clone = event->Clone();
   1087     if (!clone) {
   1088       DVLOG(1) << "Ignoring request to broadcast invalid event from "
   1089                << from_node << " [type=" << static_cast<uint32_t>(event->type())
   1090                << "]";
   1091       return;
   1092     }
   1093 
   1094     iter.second->SendChannelMessage(SerializeEventMessage(std::move(clone)));
   1095   }
   1096 }
   1097 
   1098 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
   1099 void NodeController::OnRelayEventMessage(const ports::NodeName& from_node,
   1100                                          base::ProcessHandle from_process,
   1101                                          const ports::NodeName& destination,
   1102                                          Channel::MessagePtr message) {
   1103   // The broker should always know which process this came from.
   1104   DCHECK(from_process != base::kNullProcessHandle);
   1105   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
   1106 
   1107   if (GetBrokerChannel()) {
   1108     // Only the broker should be asked to relay a message.
   1109     LOG(ERROR) << "Non-broker refusing to relay message.";
   1110     DropPeer(from_node, nullptr);
   1111     return;
   1112   }
   1113 
   1114   if (destination == name_) {
   1115     // Great, we can deliver this message locally.
   1116     OnEventMessage(from_node, std::move(message));
   1117     return;
   1118   }
   1119 
   1120   scoped_refptr<NodeChannel> peer = GetPeerChannel(destination);
   1121   if (peer)
   1122     peer->EventMessageFromRelay(from_node, std::move(message));
   1123   else
   1124     DLOG(ERROR) << "Dropping relay message for unknown node " << destination;
   1125 }
   1126 
   1127 void NodeController::OnEventMessageFromRelay(const ports::NodeName& from_node,
   1128                                              const ports::NodeName& source_node,
   1129                                              Channel::MessagePtr message) {
   1130   if (GetPeerChannel(from_node) != GetBrokerChannel()) {
   1131     LOG(ERROR) << "Refusing relayed message from non-broker node.";
   1132     DropPeer(from_node, nullptr);
   1133     return;
   1134   }
   1135 
   1136   OnEventMessage(source_node, std::move(message));
   1137 }
   1138 #endif
   1139 
   1140 void NodeController::OnAcceptPeer(const ports::NodeName& from_node,
   1141                                   const ports::NodeName& token,
   1142                                   const ports::NodeName& peer_name,
   1143                                   const ports::PortName& port_name) {
   1144   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
   1145 
   1146   auto it = pending_isolated_connections_.find(from_node);
   1147   if (it == pending_isolated_connections_.end()) {
   1148     DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node;
   1149     DropPeer(from_node, nullptr);
   1150     return;
   1151   }
   1152 
   1153   IsolatedConnection& connection = it->second;
   1154   scoped_refptr<NodeChannel> channel = std::move(connection.channel);
   1155   ports::PortRef local_port = connection.local_port;
   1156   if (!connection.name.empty())
   1157     named_isolated_connections_[connection.name] = peer_name;
   1158   pending_isolated_connections_.erase(it);
   1159   DCHECK(channel);
   1160 
   1161   if (name_ != peer_name) {
   1162     // It's possible (e.g. in tests) that we may "connect" to ourself, in which
   1163     // case we skip this |AddPeer()| call and go straight to merging ports.
   1164     // Note that we explicitly drop any prior connection to the same peer so
   1165     // that new isolated connections can replace old ones.
   1166     DropPeer(peer_name, nullptr);
   1167     AddPeer(peer_name, channel, false /* start_channel */);
   1168     DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name;
   1169   }
   1170 
   1171   // We need to choose one side to initiate the port merge. It doesn't matter
   1172   // who does it as long as they don't both try. Simple solution: pick the one
   1173   // with the "smaller" port name.
   1174   if (local_port.name() < port_name)
   1175     node()->MergePorts(local_port, peer_name, port_name);
   1176 }
   1177 
   1178 void NodeController::OnChannelError(const ports::NodeName& from_node,
   1179                                     NodeChannel* channel) {
   1180   if (io_task_runner_->RunsTasksInCurrentSequence()) {
   1181     RequestContext request_context(RequestContext::Source::SYSTEM);
   1182     DropPeer(from_node, channel);
   1183   } else {
   1184     io_task_runner_->PostTask(
   1185         FROM_HERE,
   1186         base::Bind(&NodeController::OnChannelError, base::Unretained(this),
   1187                    from_node, base::RetainedRef(channel)));
   1188   }
   1189 }
   1190 
   1191 #if defined(OS_MACOSX) && !defined(OS_IOS)
   1192 MachPortRelay* NodeController::GetMachPortRelay() {
   1193   {
   1194     base::AutoLock lock(inviter_lock_);
   1195     // Return null if we're not the root.
   1196     if (bootstrap_inviter_channel_ || inviter_name_ != ports::kInvalidNodeName)
   1197       return nullptr;
   1198   }
   1199 
   1200   base::AutoLock lock(mach_port_relay_lock_);
   1201   return mach_port_relay_.get();
   1202 }
   1203 #endif
   1204 
   1205 void NodeController::CancelPendingPortMerges() {
   1206   std::vector<ports::PortRef> ports_to_close;
   1207 
   1208   {
   1209     base::AutoLock lock(pending_port_merges_lock_);
   1210     reject_pending_merges_ = true;
   1211     for (const auto& port : pending_port_merges_)
   1212       ports_to_close.push_back(port.second);
   1213     pending_port_merges_.clear();
   1214   }
   1215 
   1216   for (const auto& port : ports_to_close)
   1217     node_->ClosePort(port);
   1218 }
   1219 
   1220 void NodeController::DestroyOnIOThreadShutdown() {
   1221   destroy_on_io_thread_shutdown_ = true;
   1222 }
   1223 
   1224 void NodeController::AttemptShutdownIfRequested() {
   1225   if (!shutdown_callback_flag_)
   1226     return;
   1227 
   1228   base::Closure callback;
   1229   {
   1230     base::AutoLock lock(shutdown_lock_);
   1231     if (shutdown_callback_.is_null())
   1232       return;
   1233     if (!node_->CanShutdownCleanly(
   1234             ports::Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)) {
   1235       DVLOG(2) << "Unable to cleanly shut down node " << name_;
   1236       return;
   1237     }
   1238 
   1239     callback = shutdown_callback_;
   1240     shutdown_callback_.Reset();
   1241     shutdown_callback_flag_.Set(false);
   1242   }
   1243 
   1244   DCHECK(!callback.is_null());
   1245 
   1246   callback.Run();
   1247 }
   1248 
   1249 NodeController::IsolatedConnection::IsolatedConnection() = default;
   1250 
   1251 NodeController::IsolatedConnection::IsolatedConnection(
   1252     const IsolatedConnection& other) = default;
   1253 
   1254 NodeController::IsolatedConnection::IsolatedConnection(
   1255     IsolatedConnection&& other) = default;
   1256 
   1257 NodeController::IsolatedConnection::IsolatedConnection(
   1258     scoped_refptr<NodeChannel> channel,
   1259     const ports::PortRef& local_port,
   1260     base::StringPiece name)
   1261     : channel(std::move(channel)), local_port(local_port), name(name) {}
   1262 
   1263 NodeController::IsolatedConnection::~IsolatedConnection() = default;
   1264 
   1265 NodeController::IsolatedConnection& NodeController::IsolatedConnection::
   1266 operator=(const IsolatedConnection& other) = default;
   1267 
   1268 NodeController::IsolatedConnection& NodeController::IsolatedConnection::
   1269 operator=(IsolatedConnection&& other) = default;
   1270 
   1271 }  // namespace core
   1272 }  // namespace mojo
   1273