Home | History | Annotate | Download | only in ports
      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/edk/system/ports/node.h"
      6 
      7 #include <string.h>
      8 
      9 #include <utility>
     10 
     11 #include "base/atomicops.h"
     12 #include "base/logging.h"
     13 #include "base/memory/ref_counted.h"
     14 #include "base/synchronization/lock.h"
     15 #include "mojo/edk/system/ports/node_delegate.h"
     16 
     17 namespace mojo {
     18 namespace edk {
     19 namespace ports {
     20 
     21 namespace {
     22 
     23 int DebugError(const char* message, int error_code) {
     24   CHECK(false) << "Oops: " << message;
     25   return error_code;
     26 }
     27 
     28 #define OOPS(x) DebugError(#x, x)
     29 
     30 bool CanAcceptMoreMessages(const Port* port) {
     31   // Have we already doled out the last message (i.e., do we expect to NOT
     32   // receive further messages)?
     33   uint64_t next_sequence_num = port->message_queue.next_sequence_num();
     34   if (port->state == Port::kClosed)
     35     return false;
     36   if (port->peer_closed || port->remove_proxy_on_last_message) {
     37     if (port->last_sequence_num_to_receive == next_sequence_num - 1)
     38       return false;
     39   }
     40   return true;
     41 }
     42 
     43 }  // namespace
     44 
     45 class Node::LockedPort {
     46  public:
     47   explicit LockedPort(Port* port) : port_(port) {
     48     port_->lock.AssertAcquired();
     49   }
     50 
     51   Port* get() const { return port_; }
     52   Port* operator->() const { return port_; }
     53 
     54  private:
     55   Port* const port_;
     56 };
     57 
     58 Node::Node(const NodeName& name, NodeDelegate* delegate)
     59     : name_(name),
     60       delegate_(delegate) {
     61 }
     62 
     63 Node::~Node() {
     64   if (!ports_.empty())
     65     DLOG(WARNING) << "Unclean shutdown for node " << name_;
     66 }
     67 
     68 bool Node::CanShutdownCleanly(ShutdownPolicy policy) {
     69   base::AutoLock ports_lock(ports_lock_);
     70 
     71   if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) {
     72 #if DCHECK_IS_ON()
     73     for (auto entry : ports_) {
     74       DVLOG(2) << "Port " << entry.first << " referencing node "
     75                << entry.second->peer_node_name << " is blocking shutdown of "
     76                << "node " << name_ << " (state=" << entry.second->state << ")";
     77     }
     78 #endif
     79     return ports_.empty();
     80   }
     81 
     82   DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS);
     83 
     84   // NOTE: This is not efficient, though it probably doesn't need to be since
     85   // relatively few ports should be open during shutdown and shutdown doesn't
     86   // need to be blazingly fast.
     87   bool can_shutdown = true;
     88   for (auto entry : ports_) {
     89     base::AutoLock lock(entry.second->lock);
     90     if (entry.second->peer_node_name != name_ &&
     91         entry.second->state != Port::kReceiving) {
     92       can_shutdown = false;
     93 #if DCHECK_IS_ON()
     94       DVLOG(2) << "Port " << entry.first << " referencing node "
     95                << entry.second->peer_node_name << " is blocking shutdown of "
     96                << "node " << name_ << " (state=" << entry.second->state << ")";
     97 #else
     98       // Exit early when not debugging.
     99       break;
    100 #endif
    101     }
    102   }
    103 
    104   return can_shutdown;
    105 }
    106 
    107 int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
    108   scoped_refptr<Port> port = GetPort(port_name);
    109   if (!port)
    110     return ERROR_PORT_UNKNOWN;
    111 
    112   *port_ref = PortRef(port_name, std::move(port));
    113   return OK;
    114 }
    115 
    116 int Node::CreateUninitializedPort(PortRef* port_ref) {
    117   PortName port_name;
    118   delegate_->GenerateRandomPortName(&port_name);
    119 
    120   scoped_refptr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum));
    121   int rv = AddPortWithName(port_name, port);
    122   if (rv != OK)
    123     return rv;
    124 
    125   *port_ref = PortRef(port_name, std::move(port));
    126   return OK;
    127 }
    128 
    129 int Node::InitializePort(const PortRef& port_ref,
    130                          const NodeName& peer_node_name,
    131                          const PortName& peer_port_name) {
    132   Port* port = port_ref.port();
    133 
    134   {
    135     base::AutoLock lock(port->lock);
    136     if (port->state != Port::kUninitialized)
    137       return ERROR_PORT_STATE_UNEXPECTED;
    138 
    139     port->state = Port::kReceiving;
    140     port->peer_node_name = peer_node_name;
    141     port->peer_port_name = peer_port_name;
    142   }
    143 
    144   delegate_->PortStatusChanged(port_ref);
    145 
    146   return OK;
    147 }
    148 
    149 int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
    150   int rv;
    151 
    152   rv = CreateUninitializedPort(port0_ref);
    153   if (rv != OK)
    154     return rv;
    155 
    156   rv = CreateUninitializedPort(port1_ref);
    157   if (rv != OK)
    158     return rv;
    159 
    160   rv = InitializePort(*port0_ref, name_, port1_ref->name());
    161   if (rv != OK)
    162     return rv;
    163 
    164   rv = InitializePort(*port1_ref, name_, port0_ref->name());
    165   if (rv != OK)
    166     return rv;
    167 
    168   return OK;
    169 }
    170 
    171 int Node::SetUserData(const PortRef& port_ref,
    172                       scoped_refptr<UserData> user_data) {
    173   Port* port = port_ref.port();
    174 
    175   base::AutoLock lock(port->lock);
    176   if (port->state == Port::kClosed)
    177     return ERROR_PORT_STATE_UNEXPECTED;
    178 
    179   port->user_data = std::move(user_data);
    180 
    181   return OK;
    182 }
    183 
    184 int Node::GetUserData(const PortRef& port_ref,
    185                       scoped_refptr<UserData>* user_data) {
    186   Port* port = port_ref.port();
    187 
    188   base::AutoLock lock(port->lock);
    189   if (port->state == Port::kClosed)
    190     return ERROR_PORT_STATE_UNEXPECTED;
    191 
    192   *user_data = port->user_data;
    193 
    194   return OK;
    195 }
    196 
    197 int Node::ClosePort(const PortRef& port_ref) {
    198   std::deque<PortName> referenced_port_names;
    199 
    200   ObserveClosureEventData data;
    201 
    202   NodeName peer_node_name;
    203   PortName peer_port_name;
    204   Port* port = port_ref.port();
    205   {
    206     // We may need to erase the port, which requires ports_lock_ to be held,
    207     // but ports_lock_ must be acquired before any individual port locks.
    208     base::AutoLock ports_lock(ports_lock_);
    209 
    210     base::AutoLock lock(port->lock);
    211     if (port->state == Port::kUninitialized) {
    212       // If the port was not yet initialized, there's nothing interesting to do.
    213       ErasePort_Locked(port_ref.name());
    214       return OK;
    215     }
    216 
    217     if (port->state != Port::kReceiving)
    218       return ERROR_PORT_STATE_UNEXPECTED;
    219 
    220     port->state = Port::kClosed;
    221 
    222     // We pass along the sequence number of the last message sent from this
    223     // port to allow the peer to have the opportunity to consume all inbound
    224     // messages before notifying the embedder that this port is closed.
    225     data.last_sequence_num = port->next_sequence_num_to_send - 1;
    226 
    227     peer_node_name = port->peer_node_name;
    228     peer_port_name = port->peer_port_name;
    229 
    230     // If the port being closed still has unread messages, then we need to take
    231     // care to close those ports so as to avoid leaking memory.
    232     port->message_queue.GetReferencedPorts(&referenced_port_names);
    233 
    234     ErasePort_Locked(port_ref.name());
    235   }
    236 
    237   DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@" << name_
    238            << " to " << peer_port_name << "@" << peer_node_name;
    239 
    240   delegate_->ForwardMessage(
    241       peer_node_name,
    242       NewInternalMessage(peer_port_name, EventType::kObserveClosure, data));
    243 
    244   for (const auto& name : referenced_port_names) {
    245     PortRef ref;
    246     if (GetPort(name, &ref) == OK)
    247       ClosePort(ref);
    248   }
    249   return OK;
    250 }
    251 
    252 int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
    253   Port* port = port_ref.port();
    254 
    255   base::AutoLock lock(port->lock);
    256 
    257   if (port->state != Port::kReceiving)
    258     return ERROR_PORT_STATE_UNEXPECTED;
    259 
    260   port_status->has_messages = port->message_queue.HasNextMessage();
    261   port_status->receiving_messages = CanAcceptMoreMessages(port);
    262   port_status->peer_closed = port->peer_closed;
    263   return OK;
    264 }
    265 
    266 int Node::GetMessage(const PortRef& port_ref,
    267                      ScopedMessage* message,
    268                      MessageFilter* filter) {
    269   *message = nullptr;
    270 
    271   DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_;
    272 
    273   Port* port = port_ref.port();
    274   {
    275     base::AutoLock lock(port->lock);
    276 
    277     // This could also be treated like the port being unknown since the
    278     // embedder should no longer be referring to a port that has been sent.
    279     if (port->state != Port::kReceiving)
    280       return ERROR_PORT_STATE_UNEXPECTED;
    281 
    282     // Let the embedder get messages until there are no more before reporting
    283     // that the peer closed its end.
    284     if (!CanAcceptMoreMessages(port))
    285       return ERROR_PORT_PEER_CLOSED;
    286 
    287     port->message_queue.GetNextMessage(message, filter);
    288   }
    289 
    290   // Allow referenced ports to trigger PortStatusChanged calls.
    291   if (*message) {
    292     for (size_t i = 0; i < (*message)->num_ports(); ++i) {
    293       const PortName& new_port_name = (*message)->ports()[i];
    294       scoped_refptr<Port> new_port = GetPort(new_port_name);
    295 
    296       DCHECK(new_port) << "Port " << new_port_name << "@" << name_
    297                        << " does not exist!";
    298 
    299       base::AutoLock lock(new_port->lock);
    300 
    301       DCHECK(new_port->state == Port::kReceiving);
    302       new_port->message_queue.set_signalable(true);
    303     }
    304   }
    305 
    306   return OK;
    307 }
    308 
    309 int Node::SendMessage(const PortRef& port_ref, ScopedMessage message) {
    310   int rv = SendMessageInternal(port_ref, &message);
    311   if (rv != OK) {
    312     // If send failed, close all carried ports. Note that we're careful not to
    313     // close the sending port itself if it happened to be one of the encoded
    314     // ports (an invalid but possible condition.)
    315     for (size_t i = 0; i < message->num_ports(); ++i) {
    316       if (message->ports()[i] == port_ref.name())
    317         continue;
    318 
    319       PortRef port;
    320       if (GetPort(message->ports()[i], &port) == OK)
    321         ClosePort(port);
    322     }
    323   }
    324   return rv;
    325 }
    326 
    327 int Node::AcceptMessage(ScopedMessage message) {
    328   const EventHeader* header = GetEventHeader(*message);
    329   switch (header->type) {
    330     case EventType::kUser:
    331       return OnUserMessage(std::move(message));
    332     case EventType::kPortAccepted:
    333       return OnPortAccepted(header->port_name);
    334     case EventType::kObserveProxy:
    335       return OnObserveProxy(
    336           header->port_name,
    337           *GetEventData<ObserveProxyEventData>(*message));
    338     case EventType::kObserveProxyAck:
    339       return OnObserveProxyAck(
    340           header->port_name,
    341           GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num);
    342     case EventType::kObserveClosure:
    343       return OnObserveClosure(
    344           header->port_name,
    345           GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
    346     case EventType::kMergePort:
    347       return OnMergePort(header->port_name,
    348                          *GetEventData<MergePortEventData>(*message));
    349   }
    350   return OOPS(ERROR_NOT_IMPLEMENTED);
    351 }
    352 
    353 int Node::MergePorts(const PortRef& port_ref,
    354                      const NodeName& destination_node_name,
    355                      const PortName& destination_port_name) {
    356   Port* port = port_ref.port();
    357   MergePortEventData data;
    358   {
    359     base::AutoLock lock(port->lock);
    360 
    361     DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
    362              << " to " << destination_port_name << "@" << destination_node_name;
    363 
    364     // Send the port-to-merge over to the destination node so it can be merged
    365     // into the port cycle atomically there.
    366     data.new_port_name = port_ref.name();
    367     WillSendPort(LockedPort(port), destination_node_name, &data.new_port_name,
    368                  &data.new_port_descriptor);
    369   }
    370   delegate_->ForwardMessage(
    371       destination_node_name,
    372       NewInternalMessage(destination_port_name,
    373                          EventType::kMergePort, data));
    374   return OK;
    375 }
    376 
    377 int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
    378   Port* port0 = port0_ref.port();
    379   Port* port1 = port1_ref.port();
    380   int rv;
    381   {
    382     // |ports_lock_| must be held when acquiring overlapping port locks.
    383     base::AutoLock ports_lock(ports_lock_);
    384     base::AutoLock port0_lock(port0->lock);
    385     base::AutoLock port1_lock(port1->lock);
    386 
    387     DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
    388              << " and " << port1_ref.name() << "@" << name_;
    389 
    390     if (port0->state != Port::kReceiving || port1->state != Port::kReceiving)
    391       rv = ERROR_PORT_STATE_UNEXPECTED;
    392     else
    393       rv = MergePorts_Locked(port0_ref, port1_ref);
    394   }
    395 
    396   if (rv != OK) {
    397     ClosePort(port0_ref);
    398     ClosePort(port1_ref);
    399   }
    400 
    401   return rv;
    402 }
    403 
    404 int Node::LostConnectionToNode(const NodeName& node_name) {
    405   // We can no longer send events to the given node. We also can't expect any
    406   // PortAccepted events.
    407 
    408   DVLOG(1) << "Observing lost connection from node " << name_
    409            << " to node " << node_name;
    410 
    411   DestroyAllPortsWithPeer(node_name, kInvalidPortName);
    412   return OK;
    413 }
    414 
    415 int Node::OnUserMessage(ScopedMessage message) {
    416   PortName port_name = GetEventHeader(*message)->port_name;
    417   const auto* event = GetEventData<UserEventData>(*message);
    418 
    419 #if DCHECK_IS_ON()
    420   std::ostringstream ports_buf;
    421   for (size_t i = 0; i < message->num_ports(); ++i) {
    422     if (i > 0)
    423       ports_buf << ",";
    424     ports_buf << message->ports()[i];
    425   }
    426 
    427   DVLOG(4) << "AcceptMessage " << event->sequence_num
    428              << " [ports=" << ports_buf.str() << "] at "
    429              << port_name << "@" << name_;
    430 #endif
    431 
    432   scoped_refptr<Port> port = GetPort(port_name);
    433 
    434   // Even if this port does not exist, cannot receive anymore messages or is
    435   // buffering or proxying messages, we still need these ports to be bound to
    436   // this node. When the message is forwarded, these ports will get transferred
    437   // following the usual method. If the message cannot be accepted, then the
    438   // newly bound ports will simply be closed.
    439 
    440   for (size_t i = 0; i < message->num_ports(); ++i) {
    441     int rv = AcceptPort(message->ports()[i], GetPortDescriptors(event)[i]);
    442     if (rv != OK)
    443       return rv;
    444   }
    445 
    446   bool has_next_message = false;
    447   bool message_accepted = false;
    448 
    449   if (port) {
    450     // We may want to forward messages once the port lock is held, so we must
    451     // acquire |ports_lock_| first.
    452     base::AutoLock ports_lock(ports_lock_);
    453     base::AutoLock lock(port->lock);
    454 
    455     // Reject spurious messages if we've already received the last expected
    456     // message.
    457     if (CanAcceptMoreMessages(port.get())) {
    458       message_accepted = true;
    459       port->message_queue.AcceptMessage(std::move(message), &has_next_message);
    460 
    461       if (port->state == Port::kBuffering) {
    462         has_next_message = false;
    463       } else if (port->state == Port::kProxying) {
    464         has_next_message = false;
    465 
    466         // Forward messages. We forward messages in sequential order here so
    467         // that we maintain the message queue's notion of next sequence number.
    468         // That's useful for the proxy removal process as we can tell when this
    469         // port has seen all of the messages it is expected to see.
    470         int rv = ForwardMessages_Locked(LockedPort(port.get()), port_name);
    471         if (rv != OK)
    472           return rv;
    473 
    474         MaybeRemoveProxy_Locked(LockedPort(port.get()), port_name);
    475       }
    476     }
    477   }
    478 
    479   if (!message_accepted) {
    480     DVLOG(2) << "Message not accepted!\n";
    481     // Close all newly accepted ports as they are effectively orphaned.
    482     for (size_t i = 0; i < message->num_ports(); ++i) {
    483       PortRef port_ref;
    484       if (GetPort(message->ports()[i], &port_ref) == OK) {
    485         ClosePort(port_ref);
    486       } else {
    487         DLOG(WARNING) << "Cannot close non-existent port!\n";
    488       }
    489     }
    490   } else if (has_next_message) {
    491     PortRef port_ref(port_name, port);
    492     delegate_->PortStatusChanged(port_ref);
    493   }
    494 
    495   return OK;
    496 }
    497 
    498 int Node::OnPortAccepted(const PortName& port_name) {
    499   scoped_refptr<Port> port = GetPort(port_name);
    500   if (!port)
    501     return ERROR_PORT_UNKNOWN;
    502 
    503   DVLOG(2) << "PortAccepted at " << port_name << "@" << name_
    504            << " pointing to "
    505            << port->peer_port_name << "@" << port->peer_node_name;
    506 
    507   return BeginProxying(PortRef(port_name, std::move(port)));
    508 }
    509 
    510 int Node::OnObserveProxy(const PortName& port_name,
    511                          const ObserveProxyEventData& event) {
    512   if (port_name == kInvalidPortName) {
    513     // An ObserveProxy with an invalid target port name is a broadcast used to
    514     // inform ports when their peer (which was itself a proxy) has become
    515     // defunct due to unexpected node disconnection.
    516     //
    517     // Receiving ports affected by this treat it as equivalent to peer closure.
    518     // Proxies affected by this can be removed and will in turn broadcast their
    519     // own death with a similar message.
    520     CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName);
    521     CHECK_EQ(event.proxy_to_port_name, kInvalidPortName);
    522     DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name);
    523     return OK;
    524   }
    525 
    526   // The port may have already been closed locally, in which case the
    527   // ObserveClosure message will contain the last_sequence_num field.
    528   // We can then silently ignore this message.
    529   scoped_refptr<Port> port = GetPort(port_name);
    530   if (!port) {
    531     DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found";
    532     return OK;
    533   }
    534 
    535   DVLOG(2) << "ObserveProxy at " << port_name << "@" << name_ << ", proxy at "
    536            << event.proxy_port_name << "@"
    537            << event.proxy_node_name << " pointing to "
    538            << event.proxy_to_port_name << "@"
    539            << event.proxy_to_node_name;
    540 
    541   {
    542     base::AutoLock lock(port->lock);
    543 
    544     if (port->peer_node_name == event.proxy_node_name &&
    545         port->peer_port_name == event.proxy_port_name) {
    546       if (port->state == Port::kReceiving) {
    547         port->peer_node_name = event.proxy_to_node_name;
    548         port->peer_port_name = event.proxy_to_port_name;
    549 
    550         ObserveProxyAckEventData ack;
    551         ack.last_sequence_num = port->next_sequence_num_to_send - 1;
    552 
    553         delegate_->ForwardMessage(
    554             event.proxy_node_name,
    555             NewInternalMessage(event.proxy_port_name,
    556                                EventType::kObserveProxyAck,
    557                                ack));
    558       } else {
    559         // As a proxy ourselves, we don't know how to honor the ObserveProxy
    560         // event or to populate the last_sequence_num field of ObserveProxyAck.
    561         // Afterall, another port could be sending messages to our peer now
    562         // that we've sent out our own ObserveProxy event.  Instead, we will
    563         // send an ObserveProxyAck indicating that the ObserveProxy event
    564         // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
    565         // However, this has to be done after we are removed as a proxy.
    566         // Otherwise, we might just find ourselves back here again, which
    567         // would be akin to a busy loop.
    568 
    569         DVLOG(2) << "Delaying ObserveProxyAck to "
    570                  << event.proxy_port_name << "@" << event.proxy_node_name;
    571 
    572         ObserveProxyAckEventData ack;
    573         ack.last_sequence_num = kInvalidSequenceNum;
    574 
    575         port->send_on_proxy_removal.reset(
    576             new std::pair<NodeName, ScopedMessage>(
    577                 event.proxy_node_name,
    578                 NewInternalMessage(event.proxy_port_name,
    579                                    EventType::kObserveProxyAck,
    580                                    ack)));
    581       }
    582     } else {
    583       // Forward this event along to our peer. Eventually, it should find the
    584       // port referring to the proxy.
    585       delegate_->ForwardMessage(
    586           port->peer_node_name,
    587           NewInternalMessage(port->peer_port_name,
    588                              EventType::kObserveProxy,
    589                              event));
    590     }
    591   }
    592   return OK;
    593 }
    594 
    595 int Node::OnObserveProxyAck(const PortName& port_name,
    596                             uint64_t last_sequence_num) {
    597   DVLOG(2) << "ObserveProxyAck at " << port_name << "@" << name_
    598            << " (last_sequence_num=" << last_sequence_num << ")";
    599 
    600   scoped_refptr<Port> port = GetPort(port_name);
    601   if (!port)
    602     return ERROR_PORT_UNKNOWN;  // The port may have observed closure first, so
    603                                 // this is not an "Oops".
    604 
    605   {
    606     base::AutoLock lock(port->lock);
    607 
    608     if (port->state != Port::kProxying)
    609       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
    610 
    611     if (last_sequence_num == kInvalidSequenceNum) {
    612       // Send again.
    613       InitiateProxyRemoval(LockedPort(port.get()), port_name);
    614       return OK;
    615     }
    616 
    617     // We can now remove this port once we have received and forwarded the last
    618     // message addressed to this port.
    619     port->remove_proxy_on_last_message = true;
    620     port->last_sequence_num_to_receive = last_sequence_num;
    621   }
    622   TryRemoveProxy(PortRef(port_name, std::move(port)));
    623   return OK;
    624 }
    625 
    626 int Node::OnObserveClosure(const PortName& port_name,
    627                            uint64_t last_sequence_num) {
    628   // OK if the port doesn't exist, as it may have been closed already.
    629   scoped_refptr<Port> port = GetPort(port_name);
    630   if (!port)
    631     return OK;
    632 
    633   // This message tells the port that it should no longer expect more messages
    634   // beyond last_sequence_num. This message is forwarded along until we reach
    635   // the receiving end, and this message serves as an equivalent to
    636   // ObserveProxyAck.
    637 
    638   bool notify_delegate = false;
    639   ObserveClosureEventData forwarded_data;
    640   NodeName peer_node_name;
    641   PortName peer_port_name;
    642   bool try_remove_proxy = false;
    643   {
    644     base::AutoLock lock(port->lock);
    645 
    646     port->peer_closed = true;
    647     port->last_sequence_num_to_receive = last_sequence_num;
    648 
    649     DVLOG(2) << "ObserveClosure at " << port_name << "@" << name_
    650              << " (state=" << port->state << ") pointing to "
    651              << port->peer_port_name << "@" << port->peer_node_name
    652              << " (last_sequence_num=" << last_sequence_num << ")";
    653 
    654     // We always forward ObserveClosure, even beyond the receiving port which
    655     // cares about it. This ensures that any dead-end proxies beyond that port
    656     // are notified to remove themselves.
    657 
    658     if (port->state == Port::kReceiving) {
    659       notify_delegate = true;
    660 
    661       // When forwarding along the other half of the port cycle, this will only
    662       // reach dead-end proxies. Tell them we've sent our last message so they
    663       // can go away.
    664       //
    665       // TODO: Repurposing ObserveClosure for this has the desired result but
    666       // may be semantically confusing since the forwarding port is not actually
    667       // closed. Consider replacing this with a new event type.
    668       forwarded_data.last_sequence_num = port->next_sequence_num_to_send - 1;
    669     } else {
    670       // We haven't yet reached the receiving peer of the closed port, so
    671       // forward the message along as-is.
    672       forwarded_data.last_sequence_num = last_sequence_num;
    673 
    674       // See about removing the port if it is a proxy as our peer won't be able
    675       // to participate in proxy removal.
    676       port->remove_proxy_on_last_message = true;
    677       if (port->state == Port::kProxying)
    678         try_remove_proxy = true;
    679     }
    680 
    681     DVLOG(2) << "Forwarding ObserveClosure from "
    682              << port_name << "@" << name_ << " to peer "
    683              << port->peer_port_name << "@" << port->peer_node_name
    684              << " (last_sequence_num=" << forwarded_data.last_sequence_num
    685              << ")";
    686 
    687     peer_node_name = port->peer_node_name;
    688     peer_port_name = port->peer_port_name;
    689   }
    690   if (try_remove_proxy)
    691     TryRemoveProxy(PortRef(port_name, port));
    692 
    693   delegate_->ForwardMessage(
    694       peer_node_name,
    695       NewInternalMessage(peer_port_name, EventType::kObserveClosure,
    696                          forwarded_data));
    697 
    698   if (notify_delegate) {
    699     PortRef port_ref(port_name, std::move(port));
    700     delegate_->PortStatusChanged(port_ref);
    701   }
    702   return OK;
    703 }
    704 
    705 int Node::OnMergePort(const PortName& port_name,
    706                       const MergePortEventData& event) {
    707   scoped_refptr<Port> port = GetPort(port_name);
    708 
    709   DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state="
    710            << (port ? port->state : -1) << ") merging with proxy "
    711            << event.new_port_name
    712            << "@" << name_ << " pointing to "
    713            << event.new_port_descriptor.peer_port_name << "@"
    714            << event.new_port_descriptor.peer_node_name << " referred by "
    715            << event.new_port_descriptor.referring_port_name << "@"
    716            << event.new_port_descriptor.referring_node_name;
    717 
    718   bool close_target_port = false;
    719   bool close_new_port = false;
    720 
    721   // Accept the new port. This is now the receiving end of the other port cycle
    722   // to be merged with ours.
    723   int rv = AcceptPort(event.new_port_name, event.new_port_descriptor);
    724   if (rv != OK) {
    725     close_target_port = true;
    726   } else if (port) {
    727     // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn
    728     // needs to hold |ports_lock_|. We also acquire multiple port locks within.
    729     base::AutoLock ports_lock(ports_lock_);
    730     base::AutoLock lock(port->lock);
    731 
    732     if (port->state != Port::kReceiving) {
    733       close_new_port = true;
    734     } else {
    735       scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
    736       base::AutoLock new_port_lock(new_port->lock);
    737       DCHECK(new_port->state == Port::kReceiving);
    738 
    739       // Both ports are locked. Now all we have to do is swap their peer
    740       // information and set them up as proxies.
    741 
    742       PortRef port0_ref(port_name, port);
    743       PortRef port1_ref(event.new_port_name, new_port);
    744       int rv = MergePorts_Locked(port0_ref, port1_ref);
    745       if (rv == OK)
    746         return rv;
    747 
    748       close_new_port = true;
    749       close_target_port = true;
    750     }
    751   } else {
    752     close_new_port = true;
    753   }
    754 
    755   if (close_target_port) {
    756     PortRef target_port;
    757     rv = GetPort(port_name, &target_port);
    758     DCHECK(rv == OK);
    759 
    760     ClosePort(target_port);
    761   }
    762 
    763   if (close_new_port) {
    764     PortRef new_port;
    765     rv = GetPort(event.new_port_name, &new_port);
    766     DCHECK(rv == OK);
    767 
    768     ClosePort(new_port);
    769   }
    770 
    771   return ERROR_PORT_STATE_UNEXPECTED;
    772 }
    773 
    774 int Node::AddPortWithName(const PortName& port_name, scoped_refptr<Port> port) {
    775   base::AutoLock lock(ports_lock_);
    776 
    777   if (!ports_.insert(std::make_pair(port_name, std::move(port))).second)
    778     return OOPS(ERROR_PORT_EXISTS);  // Suggests a bad UUID generator.
    779 
    780   DVLOG(2) << "Created port " << port_name << "@" << name_;
    781   return OK;
    782 }
    783 
    784 void Node::ErasePort(const PortName& port_name) {
    785   base::AutoLock lock(ports_lock_);
    786   ErasePort_Locked(port_name);
    787 }
    788 
    789 void Node::ErasePort_Locked(const PortName& port_name) {
    790   ports_lock_.AssertAcquired();
    791   ports_.erase(port_name);
    792   DVLOG(2) << "Deleted port " << port_name << "@" << name_;
    793 }
    794 
    795 scoped_refptr<Port> Node::GetPort(const PortName& port_name) {
    796   base::AutoLock lock(ports_lock_);
    797   return GetPort_Locked(port_name);
    798 }
    799 
    800 scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) {
    801   ports_lock_.AssertAcquired();
    802   auto iter = ports_.find(port_name);
    803   if (iter == ports_.end())
    804     return nullptr;
    805 
    806 #if defined(OS_ANDROID) && defined(ARCH_CPU_ARM64)
    807   // Workaround for https://crbug.com/665869.
    808   base::subtle::MemoryBarrier();
    809 #endif
    810 
    811   return iter->second;
    812 }
    813 
    814 int Node::SendMessageInternal(const PortRef& port_ref, ScopedMessage* message) {
    815   ScopedMessage& m = *message;
    816   for (size_t i = 0; i < m->num_ports(); ++i) {
    817     if (m->ports()[i] == port_ref.name())
    818       return ERROR_PORT_CANNOT_SEND_SELF;
    819   }
    820 
    821   Port* port = port_ref.port();
    822   NodeName peer_node_name;
    823   {
    824     // We must acquire |ports_lock_| before grabbing any port locks, because
    825     // WillSendMessage_Locked may need to lock multiple ports out of order.
    826     base::AutoLock ports_lock(ports_lock_);
    827     base::AutoLock lock(port->lock);
    828 
    829     if (port->state != Port::kReceiving)
    830       return ERROR_PORT_STATE_UNEXPECTED;
    831 
    832     if (port->peer_closed)
    833       return ERROR_PORT_PEER_CLOSED;
    834 
    835     int rv = WillSendMessage_Locked(LockedPort(port), port_ref.name(), m.get());
    836     if (rv != OK)
    837       return rv;
    838 
    839     // Beyond this point there's no sense in returning anything but OK. Even if
    840     // message forwarding or acceptance fails, there's nothing the embedder can
    841     // do to recover. Assume that failure beyond this point must be treated as a
    842     // transport failure.
    843 
    844     peer_node_name = port->peer_node_name;
    845   }
    846 
    847   if (peer_node_name != name_) {
    848     delegate_->ForwardMessage(peer_node_name, std::move(m));
    849     return OK;
    850   }
    851 
    852   int rv = AcceptMessage(std::move(m));
    853   if (rv != OK) {
    854     // See comment above for why we don't return an error in this case.
    855     DVLOG(2) << "AcceptMessage failed: " << rv;
    856   }
    857 
    858   return OK;
    859 }
    860 
    861 int Node::MergePorts_Locked(const PortRef& port0_ref,
    862                             const PortRef& port1_ref) {
    863   Port* port0 = port0_ref.port();
    864   Port* port1 = port1_ref.port();
    865 
    866   ports_lock_.AssertAcquired();
    867   port0->lock.AssertAcquired();
    868   port1->lock.AssertAcquired();
    869 
    870   CHECK(port0->state == Port::kReceiving);
    871   CHECK(port1->state == Port::kReceiving);
    872 
    873   // Ports cannot be merged with their own receiving peer!
    874   if (port0->peer_node_name == name_ &&
    875       port0->peer_port_name == port1_ref.name())
    876     return ERROR_PORT_STATE_UNEXPECTED;
    877 
    878   if (port1->peer_node_name == name_ &&
    879       port1->peer_port_name == port0_ref.name())
    880     return ERROR_PORT_STATE_UNEXPECTED;
    881 
    882   // Only merge if both ports have never sent a message.
    883   if (port0->next_sequence_num_to_send == kInitialSequenceNum &&
    884       port1->next_sequence_num_to_send == kInitialSequenceNum) {
    885     // Swap the ports' peer information and switch them both into buffering
    886     // (eventually proxying) mode.
    887 
    888     std::swap(port0->peer_node_name, port1->peer_node_name);
    889     std::swap(port0->peer_port_name, port1->peer_port_name);
    890 
    891     port0->state = Port::kBuffering;
    892     if (port0->peer_closed)
    893       port0->remove_proxy_on_last_message = true;
    894 
    895     port1->state = Port::kBuffering;
    896     if (port1->peer_closed)
    897       port1->remove_proxy_on_last_message = true;
    898 
    899     int rv1 = BeginProxying_Locked(LockedPort(port0), port0_ref.name());
    900     int rv2 = BeginProxying_Locked(LockedPort(port1), port1_ref.name());
    901 
    902     if (rv1 == OK && rv2 == OK) {
    903       // If either merged port had a closed peer, its new peer needs to be
    904       // informed of this.
    905       if (port1->peer_closed) {
    906         ObserveClosureEventData data;
    907         data.last_sequence_num = port0->last_sequence_num_to_receive;
    908         delegate_->ForwardMessage(
    909             port0->peer_node_name,
    910             NewInternalMessage(port0->peer_port_name,
    911                                EventType::kObserveClosure, data));
    912       }
    913 
    914       if (port0->peer_closed) {
    915         ObserveClosureEventData data;
    916         data.last_sequence_num = port1->last_sequence_num_to_receive;
    917         delegate_->ForwardMessage(
    918             port1->peer_node_name,
    919             NewInternalMessage(port1->peer_port_name,
    920                                EventType::kObserveClosure, data));
    921       }
    922 
    923       return OK;
    924     }
    925 
    926     // If either proxy failed to initialize (e.g. had undeliverable messages
    927     // or ended up in a bad state somehow), we keep the system in a consistent
    928     // state by undoing the peer swap.
    929     std::swap(port0->peer_node_name, port1->peer_node_name);
    930     std::swap(port0->peer_port_name, port1->peer_port_name);
    931     port0->remove_proxy_on_last_message = false;
    932     port1->remove_proxy_on_last_message = false;
    933     port0->state = Port::kReceiving;
    934     port1->state = Port::kReceiving;
    935   }
    936 
    937   return ERROR_PORT_STATE_UNEXPECTED;
    938 }
    939 
    940 void Node::WillSendPort(const LockedPort& port,
    941                         const NodeName& to_node_name,
    942                         PortName* port_name,
    943                         PortDescriptor* port_descriptor) {
    944   port->lock.AssertAcquired();
    945 
    946   PortName local_port_name = *port_name;
    947 
    948   PortName new_port_name;
    949   delegate_->GenerateRandomPortName(&new_port_name);
    950 
    951   // Make sure we don't send messages to the new peer until after we know it
    952   // exists. In the meantime, just buffer messages locally.
    953   DCHECK(port->state == Port::kReceiving);
    954   port->state = Port::kBuffering;
    955 
    956   // If we already know our peer is closed, we already know this proxy can
    957   // be removed once it receives and forwards its last expected message.
    958   if (port->peer_closed)
    959     port->remove_proxy_on_last_message = true;
    960 
    961   *port_name = new_port_name;
    962 
    963   port_descriptor->peer_node_name = port->peer_node_name;
    964   port_descriptor->peer_port_name = port->peer_port_name;
    965   port_descriptor->referring_node_name = name_;
    966   port_descriptor->referring_port_name = local_port_name;
    967   port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
    968   port_descriptor->next_sequence_num_to_receive =
    969       port->message_queue.next_sequence_num();
    970   port_descriptor->last_sequence_num_to_receive =
    971       port->last_sequence_num_to_receive;
    972   port_descriptor->peer_closed = port->peer_closed;
    973   memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));
    974 
    975   // Configure the local port to point to the new port.
    976   port->peer_node_name = to_node_name;
    977   port->peer_port_name = new_port_name;
    978 }
    979 
    980 int Node::AcceptPort(const PortName& port_name,
    981                      const PortDescriptor& port_descriptor) {
    982   scoped_refptr<Port> port = make_scoped_refptr(
    983       new Port(port_descriptor.next_sequence_num_to_send,
    984                port_descriptor.next_sequence_num_to_receive));
    985   port->state = Port::kReceiving;
    986   port->peer_node_name = port_descriptor.peer_node_name;
    987   port->peer_port_name = port_descriptor.peer_port_name;
    988   port->last_sequence_num_to_receive =
    989       port_descriptor.last_sequence_num_to_receive;
    990   port->peer_closed = port_descriptor.peer_closed;
    991 
    992   DVLOG(2) << "Accepting port " << port_name << " [peer_closed="
    993            << port->peer_closed << "; last_sequence_num_to_receive="
    994            << port->last_sequence_num_to_receive << "]";
    995 
    996   // A newly accepted port is not signalable until the message referencing the
    997   // new port finds its way to the consumer (see GetMessage).
    998   port->message_queue.set_signalable(false);
    999 
   1000   int rv = AddPortWithName(port_name, std::move(port));
   1001   if (rv != OK)
   1002     return rv;
   1003 
   1004   // Allow referring port to forward messages.
   1005   delegate_->ForwardMessage(
   1006       port_descriptor.referring_node_name,
   1007       NewInternalMessage(port_descriptor.referring_port_name,
   1008                          EventType::kPortAccepted));
   1009   return OK;
   1010 }
   1011 
   1012 int Node::WillSendMessage_Locked(const LockedPort& port,
   1013                                  const PortName& port_name,
   1014                                  Message* message) {
   1015   ports_lock_.AssertAcquired();
   1016   port->lock.AssertAcquired();
   1017 
   1018   DCHECK(message);
   1019 
   1020   // Messages may already have a sequence number if they're being forwarded
   1021   // by a proxy. Otherwise, use the next outgoing sequence number.
   1022   uint64_t* sequence_num =
   1023       &GetMutableEventData<UserEventData>(message)->sequence_num;
   1024   if (*sequence_num == 0)
   1025     *sequence_num = port->next_sequence_num_to_send++;
   1026 
   1027 #if DCHECK_IS_ON()
   1028   std::ostringstream ports_buf;
   1029   for (size_t i = 0; i < message->num_ports(); ++i) {
   1030     if (i > 0)
   1031       ports_buf << ",";
   1032     ports_buf << message->ports()[i];
   1033   }
   1034 #endif
   1035 
   1036   if (message->num_ports() > 0) {
   1037     // Note: Another thread could be trying to send the same ports, so we need
   1038     // to ensure that they are ours to send before we mutate their state.
   1039 
   1040     std::vector<scoped_refptr<Port>> ports;
   1041     ports.resize(message->num_ports());
   1042 
   1043     {
   1044       for (size_t i = 0; i < message->num_ports(); ++i) {
   1045         ports[i] = GetPort_Locked(message->ports()[i]);
   1046         DCHECK(ports[i]);
   1047 
   1048         ports[i]->lock.Acquire();
   1049         int error = OK;
   1050         if (ports[i]->state != Port::kReceiving)
   1051           error = ERROR_PORT_STATE_UNEXPECTED;
   1052         else if (message->ports()[i] == port->peer_port_name)
   1053           error = ERROR_PORT_CANNOT_SEND_PEER;
   1054 
   1055         if (error != OK) {
   1056           // Oops, we cannot send this port.
   1057           for (size_t j = 0; j <= i; ++j)
   1058             ports[i]->lock.Release();
   1059           // Backpedal on the sequence number.
   1060           port->next_sequence_num_to_send--;
   1061           return error;
   1062         }
   1063       }
   1064     }
   1065 
   1066     PortDescriptor* port_descriptors =
   1067         GetMutablePortDescriptors(GetMutableEventData<UserEventData>(message));
   1068 
   1069     for (size_t i = 0; i < message->num_ports(); ++i) {
   1070       WillSendPort(LockedPort(ports[i].get()),
   1071                    port->peer_node_name,
   1072                    message->mutable_ports() + i,
   1073                    port_descriptors + i);
   1074     }
   1075 
   1076     for (size_t i = 0; i < message->num_ports(); ++i)
   1077       ports[i]->lock.Release();
   1078   }
   1079 
   1080 #if DCHECK_IS_ON()
   1081   DVLOG(4) << "Sending message "
   1082            << GetEventData<UserEventData>(*message)->sequence_num
   1083            << " [ports=" << ports_buf.str() << "]"
   1084            << " from " << port_name << "@" << name_
   1085            << " to " << port->peer_port_name << "@" << port->peer_node_name;
   1086 #endif
   1087 
   1088   GetMutableEventHeader(message)->port_name = port->peer_port_name;
   1089   return OK;
   1090 }
   1091 
   1092 int Node::BeginProxying_Locked(const LockedPort& port,
   1093                                const PortName& port_name) {
   1094   ports_lock_.AssertAcquired();
   1095   port->lock.AssertAcquired();
   1096 
   1097   if (port->state != Port::kBuffering)
   1098     return OOPS(ERROR_PORT_STATE_UNEXPECTED);
   1099 
   1100   port->state = Port::kProxying;
   1101 
   1102   int rv = ForwardMessages_Locked(LockedPort(port), port_name);
   1103   if (rv != OK)
   1104     return rv;
   1105 
   1106   // We may have observed closure while buffering. In that case, we can advance
   1107   // to removing the proxy without sending out an ObserveProxy message. We
   1108   // already know the last expected message, etc.
   1109 
   1110   if (port->remove_proxy_on_last_message) {
   1111     MaybeRemoveProxy_Locked(LockedPort(port), port_name);
   1112 
   1113     // Make sure we propagate closure to our current peer.
   1114     ObserveClosureEventData data;
   1115     data.last_sequence_num = port->last_sequence_num_to_receive;
   1116     delegate_->ForwardMessage(
   1117         port->peer_node_name,
   1118         NewInternalMessage(port->peer_port_name,
   1119                            EventType::kObserveClosure, data));
   1120   } else {
   1121     InitiateProxyRemoval(LockedPort(port), port_name);
   1122   }
   1123 
   1124   return OK;
   1125 }
   1126 
   1127 int Node::BeginProxying(PortRef port_ref) {
   1128   Port* port = port_ref.port();
   1129   {
   1130     base::AutoLock ports_lock(ports_lock_);
   1131     base::AutoLock lock(port->lock);
   1132 
   1133     if (port->state != Port::kBuffering)
   1134       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
   1135 
   1136     port->state = Port::kProxying;
   1137 
   1138     int rv = ForwardMessages_Locked(LockedPort(port), port_ref.name());
   1139     if (rv != OK)
   1140       return rv;
   1141   }
   1142 
   1143   bool should_remove;
   1144   NodeName peer_node_name;
   1145   ScopedMessage closure_message;
   1146   {
   1147     base::AutoLock lock(port->lock);
   1148     if (port->state != Port::kProxying)
   1149       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
   1150 
   1151     should_remove = port->remove_proxy_on_last_message;
   1152     if (should_remove) {
   1153       // Make sure we propagate closure to our current peer.
   1154       ObserveClosureEventData data;
   1155       data.last_sequence_num = port->last_sequence_num_to_receive;
   1156       peer_node_name = port->peer_node_name;
   1157       closure_message = NewInternalMessage(port->peer_port_name,
   1158                                            EventType::kObserveClosure, data);
   1159     } else {
   1160       InitiateProxyRemoval(LockedPort(port), port_ref.name());
   1161     }
   1162   }
   1163 
   1164   if (should_remove) {
   1165     TryRemoveProxy(port_ref);
   1166     delegate_->ForwardMessage(peer_node_name, std::move(closure_message));
   1167   }
   1168 
   1169   return OK;
   1170 }
   1171 
   1172 int Node::ForwardMessages_Locked(const LockedPort& port,
   1173                                  const PortName &port_name) {
   1174   ports_lock_.AssertAcquired();
   1175   port->lock.AssertAcquired();
   1176 
   1177   for (;;) {
   1178     ScopedMessage message;
   1179     port->message_queue.GetNextMessage(&message, nullptr);
   1180     if (!message)
   1181       break;
   1182 
   1183     int rv = WillSendMessage_Locked(LockedPort(port), port_name, message.get());
   1184     if (rv != OK)
   1185       return rv;
   1186 
   1187     delegate_->ForwardMessage(port->peer_node_name, std::move(message));
   1188   }
   1189   return OK;
   1190 }
   1191 
   1192 void Node::InitiateProxyRemoval(const LockedPort& port,
   1193                                 const PortName& port_name) {
   1194   port->lock.AssertAcquired();
   1195 
   1196   // To remove this node, we start by notifying the connected graph that we are
   1197   // a proxy. This allows whatever port is referencing this node to skip it.
   1198   // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
   1199   // the peer was closed in the meantime).
   1200 
   1201   ObserveProxyEventData data;
   1202   data.proxy_node_name = name_;
   1203   data.proxy_port_name = port_name;
   1204   data.proxy_to_node_name = port->peer_node_name;
   1205   data.proxy_to_port_name = port->peer_port_name;
   1206 
   1207   delegate_->ForwardMessage(
   1208       port->peer_node_name,
   1209       NewInternalMessage(port->peer_port_name, EventType::kObserveProxy, data));
   1210 }
   1211 
   1212 void Node::MaybeRemoveProxy_Locked(const LockedPort& port,
   1213                                    const PortName& port_name) {
   1214   // |ports_lock_| must be held so we can potentilaly ErasePort_Locked().
   1215   ports_lock_.AssertAcquired();
   1216   port->lock.AssertAcquired();
   1217 
   1218   DCHECK(port->state == Port::kProxying);
   1219 
   1220   // Make sure we have seen ObserveProxyAck before removing the port.
   1221   if (!port->remove_proxy_on_last_message)
   1222     return;
   1223 
   1224   if (!CanAcceptMoreMessages(port.get())) {
   1225     // This proxy port is done. We can now remove it!
   1226     ErasePort_Locked(port_name);
   1227 
   1228     if (port->send_on_proxy_removal) {
   1229       NodeName to_node = port->send_on_proxy_removal->first;
   1230       ScopedMessage& message = port->send_on_proxy_removal->second;
   1231 
   1232       delegate_->ForwardMessage(to_node, std::move(message));
   1233       port->send_on_proxy_removal.reset();
   1234     }
   1235   } else {
   1236     DVLOG(2) << "Cannot remove port " << port_name << "@" << name_
   1237              << " now; waiting for more messages";
   1238   }
   1239 }
   1240 
   1241 void Node::TryRemoveProxy(PortRef port_ref) {
   1242   Port* port = port_ref.port();
   1243   bool should_erase = false;
   1244   ScopedMessage msg;
   1245   NodeName to_node;
   1246   {
   1247     base::AutoLock lock(port->lock);
   1248 
   1249     // Port already removed. Nothing to do.
   1250     if (port->state == Port::kClosed)
   1251       return;
   1252 
   1253     DCHECK(port->state == Port::kProxying);
   1254 
   1255     // Make sure we have seen ObserveProxyAck before removing the port.
   1256     if (!port->remove_proxy_on_last_message)
   1257       return;
   1258 
   1259     if (!CanAcceptMoreMessages(port)) {
   1260       // This proxy port is done. We can now remove it!
   1261       should_erase = true;
   1262 
   1263       if (port->send_on_proxy_removal) {
   1264         to_node = port->send_on_proxy_removal->first;
   1265         msg = std::move(port->send_on_proxy_removal->second);
   1266         port->send_on_proxy_removal.reset();
   1267       }
   1268     } else {
   1269       DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_
   1270                << " now; waiting for more messages";
   1271     }
   1272   }
   1273 
   1274   if (should_erase)
   1275     ErasePort(port_ref.name());
   1276 
   1277   if (msg)
   1278     delegate_->ForwardMessage(to_node, std::move(msg));
   1279 }
   1280 
   1281 void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
   1282                                    const PortName& port_name) {
   1283   // Wipes out all ports whose peer node matches |node_name| and whose peer port
   1284   // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
   1285   // node is matched.
   1286 
   1287   std::vector<PortRef> ports_to_notify;
   1288   std::vector<PortName> dead_proxies_to_broadcast;
   1289   std::deque<PortName> referenced_port_names;
   1290 
   1291   {
   1292     base::AutoLock ports_lock(ports_lock_);
   1293 
   1294     for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) {
   1295       Port* port = iter->second.get();
   1296       {
   1297         base::AutoLock port_lock(port->lock);
   1298 
   1299         if (port->peer_node_name == node_name &&
   1300               (port_name == kInvalidPortName ||
   1301                     port->peer_port_name == port_name)) {
   1302           if (!port->peer_closed) {
   1303             // Treat this as immediate peer closure. It's an exceptional
   1304             // condition akin to a broken pipe, so we don't care about losing
   1305             // messages.
   1306 
   1307             port->peer_closed = true;
   1308             port->last_sequence_num_to_receive =
   1309                 port->message_queue.next_sequence_num() - 1;
   1310 
   1311             if (port->state == Port::kReceiving)
   1312               ports_to_notify.push_back(PortRef(iter->first, port));
   1313           }
   1314 
   1315           // We don't expect to forward any further messages, and we don't
   1316           // expect to receive a Port{Accepted,Rejected} event. Because we're
   1317           // a proxy with no active peer, we cannot use the normal proxy removal
   1318           // procedure of forward-propagating an ObserveProxy. Instead we
   1319           // broadcast our own death so it can be back-propagated. This is
   1320           // inefficient but rare.
   1321           if (port->state != Port::kReceiving) {
   1322             dead_proxies_to_broadcast.push_back(iter->first);
   1323             iter->second->message_queue.GetReferencedPorts(
   1324                 &referenced_port_names);
   1325           }
   1326         }
   1327       }
   1328     }
   1329 
   1330     for (const auto& proxy_name : dead_proxies_to_broadcast) {
   1331       ports_.erase(proxy_name);
   1332       DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
   1333     }
   1334   }
   1335 
   1336   // Wake up any receiving ports who have just observed simulated peer closure.
   1337   for (const auto& port : ports_to_notify)
   1338     delegate_->PortStatusChanged(port);
   1339 
   1340   for (const auto& proxy_name : dead_proxies_to_broadcast) {
   1341     // Broadcast an event signifying that this proxy is no longer functioning.
   1342     ObserveProxyEventData event;
   1343     event.proxy_node_name = name_;
   1344     event.proxy_port_name = proxy_name;
   1345     event.proxy_to_node_name = kInvalidNodeName;
   1346     event.proxy_to_port_name = kInvalidPortName;
   1347     delegate_->BroadcastMessage(NewInternalMessage(
   1348         kInvalidPortName, EventType::kObserveProxy, event));
   1349 
   1350     // Also process death locally since the port that points this closed one
   1351     // could be on the current node.
   1352     // Note: Although this is recursive, only a single port is involved which
   1353     // limits the expected branching to 1.
   1354     DestroyAllPortsWithPeer(name_, proxy_name);
   1355   }
   1356 
   1357   // Close any ports referenced by the closed proxies.
   1358   for (const auto& name : referenced_port_names) {
   1359     PortRef ref;
   1360     if (GetPort(name, &ref) == OK)
   1361       ClosePort(ref);
   1362   }
   1363 }
   1364 
   1365 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name,
   1366                                               const EventType& type,
   1367                                               const void* data,
   1368                                               size_t num_data_bytes) {
   1369   ScopedMessage message;
   1370   delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message);
   1371 
   1372   EventHeader* header = GetMutableEventHeader(message.get());
   1373   header->port_name = port_name;
   1374   header->type = type;
   1375   header->padding = 0;
   1376 
   1377   if (num_data_bytes)
   1378     memcpy(header + 1, data, num_data_bytes);
   1379 
   1380   return message;
   1381 }
   1382 
   1383 }  // namespace ports
   1384 }  // namespace edk
   1385 }  // namespace mojo
   1386