Home | History | Annotate | Download | only in ports
      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/edk/system/ports/node.h"
      6 
      7 #include <string.h>
      8 
      9 #include <utility>
     10 
     11 #include "base/logging.h"
     12 #include "base/memory/ref_counted.h"
     13 #include "base/synchronization/lock.h"
     14 #include "mojo/edk/system/ports/node_delegate.h"
     15 
     16 namespace mojo {
     17 namespace edk {
     18 namespace ports {
     19 
     20 namespace {
     21 
     22 int DebugError(const char* message, int error_code) {
     23   CHECK(false) << "Oops: " << message;
     24   return error_code;
     25 }
     26 
     27 #define OOPS(x) DebugError(#x, x)
     28 
     29 bool CanAcceptMoreMessages(const Port* port) {
     30   // Have we already doled out the last message (i.e., do we expect to NOT
     31   // receive further messages)?
     32   uint64_t next_sequence_num = port->message_queue.next_sequence_num();
     33   if (port->state == Port::kClosed)
     34     return false;
     35   if (port->peer_closed || port->remove_proxy_on_last_message) {
     36     if (port->last_sequence_num_to_receive == next_sequence_num - 1)
     37       return false;
     38   }
     39   return true;
     40 }
     41 
     42 }  // namespace
     43 
     44 class Node::LockedPort {
     45  public:
     46   explicit LockedPort(Port* port) : port_(port) {
     47     port_->lock.AssertAcquired();
     48   }
     49 
     50   Port* get() const { return port_; }
     51   Port* operator->() const { return port_; }
     52 
     53  private:
     54   Port* const port_;
     55 };
     56 
     57 Node::Node(const NodeName& name, NodeDelegate* delegate)
     58     : name_(name),
     59       delegate_(delegate) {
     60 }
     61 
     62 Node::~Node() {
     63   if (!ports_.empty())
     64     DLOG(WARNING) << "Unclean shutdown for node " << name_;
     65 }
     66 
     67 bool Node::CanShutdownCleanly(bool allow_local_ports) {
     68   base::AutoLock ports_lock(ports_lock_);
     69 
     70   if (!allow_local_ports) {
     71 #if DCHECK_IS_ON()
     72     for (auto entry : ports_) {
     73       DVLOG(2) << "Port " << entry.first << " referencing node "
     74                << entry.second->peer_node_name << " is blocking shutdown of "
     75                << "node " << name_ << " (state=" << entry.second->state << ")";
     76     }
     77 #endif
     78     return ports_.empty();
     79   }
     80 
     81   // NOTE: This is not efficient, though it probably doesn't need to be since
     82   // relatively few ports should be open during shutdown and shutdown doesn't
     83   // need to be blazingly fast.
     84   bool can_shutdown = true;
     85   for (auto entry : ports_) {
     86     base::AutoLock lock(entry.second->lock);
     87     if (entry.second->peer_node_name != name_ &&
     88         entry.second->state != Port::kReceiving) {
     89       can_shutdown = false;
     90 #if DCHECK_IS_ON()
     91       DVLOG(2) << "Port " << entry.first << " referencing node "
     92                << entry.second->peer_node_name << " is blocking shutdown of "
     93                << "node " << name_ << " (state=" << entry.second->state << ")";
     94 #else
     95       // Exit early when not debugging.
     96       break;
     97 #endif
     98     }
     99   }
    100 
    101   return can_shutdown;
    102 }
    103 
    104 int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
    105   scoped_refptr<Port> port = GetPort(port_name);
    106   if (!port)
    107     return ERROR_PORT_UNKNOWN;
    108 
    109   *port_ref = PortRef(port_name, std::move(port));
    110   return OK;
    111 }
    112 
    113 int Node::CreateUninitializedPort(PortRef* port_ref) {
    114   PortName port_name;
    115   delegate_->GenerateRandomPortName(&port_name);
    116 
    117   scoped_refptr<Port> port = make_scoped_refptr(new Port(kInitialSequenceNum,
    118                                                          kInitialSequenceNum));
    119   int rv = AddPortWithName(port_name, port);
    120   if (rv != OK)
    121     return rv;
    122 
    123   *port_ref = PortRef(port_name, std::move(port));
    124   return OK;
    125 }
    126 
    127 int Node::InitializePort(const PortRef& port_ref,
    128                          const NodeName& peer_node_name,
    129                          const PortName& peer_port_name) {
    130   Port* port = port_ref.port();
    131 
    132   {
    133     base::AutoLock lock(port->lock);
    134     if (port->state != Port::kUninitialized)
    135       return ERROR_PORT_STATE_UNEXPECTED;
    136 
    137     port->state = Port::kReceiving;
    138     port->peer_node_name = peer_node_name;
    139     port->peer_port_name = peer_port_name;
    140   }
    141 
    142   delegate_->PortStatusChanged(port_ref);
    143 
    144   return OK;
    145 }
    146 
    147 int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
    148   int rv;
    149 
    150   rv = CreateUninitializedPort(port0_ref);
    151   if (rv != OK)
    152     return rv;
    153 
    154   rv = CreateUninitializedPort(port1_ref);
    155   if (rv != OK)
    156     return rv;
    157 
    158   rv = InitializePort(*port0_ref, name_, port1_ref->name());
    159   if (rv != OK)
    160     return rv;
    161 
    162   rv = InitializePort(*port1_ref, name_, port0_ref->name());
    163   if (rv != OK)
    164     return rv;
    165 
    166   return OK;
    167 }
    168 
    169 int Node::SetUserData(const PortRef& port_ref,
    170                       const scoped_refptr<UserData>& user_data) {
    171   Port* port = port_ref.port();
    172 
    173   base::AutoLock lock(port->lock);
    174   if (port->state == Port::kClosed)
    175     return ERROR_PORT_STATE_UNEXPECTED;
    176 
    177   port->user_data = std::move(user_data);
    178 
    179   return OK;
    180 }
    181 
    182 int Node::GetUserData(const PortRef& port_ref,
    183                       scoped_refptr<UserData>* user_data) {
    184   Port* port = port_ref.port();
    185 
    186   base::AutoLock lock(port->lock);
    187   if (port->state == Port::kClosed)
    188     return ERROR_PORT_STATE_UNEXPECTED;
    189 
    190   *user_data = port->user_data;
    191 
    192   return OK;
    193 }
    194 
    195 int Node::ClosePort(const PortRef& port_ref) {
    196   std::deque<PortName> referenced_port_names;
    197 
    198   ObserveClosureEventData data;
    199 
    200   NodeName peer_node_name;
    201   PortName peer_port_name;
    202   Port* port = port_ref.port();
    203   {
    204     // We may need to erase the port, which requires ports_lock_ to be held,
    205     // but ports_lock_ must be acquired before any individual port locks.
    206     base::AutoLock ports_lock(ports_lock_);
    207 
    208     base::AutoLock lock(port->lock);
    209     if (port->state == Port::kUninitialized) {
    210       // If the port was not yet initialized, there's nothing interesting to do.
    211       ErasePort_Locked(port_ref.name());
    212       return OK;
    213     }
    214 
    215     if (port->state != Port::kReceiving)
    216       return ERROR_PORT_STATE_UNEXPECTED;
    217 
    218     port->state = Port::kClosed;
    219 
    220     // We pass along the sequence number of the last message sent from this
    221     // port to allow the peer to have the opportunity to consume all inbound
    222     // messages before notifying the embedder that this port is closed.
    223     data.last_sequence_num = port->next_sequence_num_to_send - 1;
    224 
    225     peer_node_name = port->peer_node_name;
    226     peer_port_name = port->peer_port_name;
    227 
    228     // If the port being closed still has unread messages, then we need to take
    229     // care to close those ports so as to avoid leaking memory.
    230     port->message_queue.GetReferencedPorts(&referenced_port_names);
    231 
    232     ErasePort_Locked(port_ref.name());
    233   }
    234 
    235   DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@" << name_
    236            << " to " << peer_port_name << "@" << peer_node_name;
    237 
    238   delegate_->ForwardMessage(
    239       peer_node_name,
    240       NewInternalMessage(peer_port_name, EventType::kObserveClosure, data));
    241 
    242   for (const auto& name : referenced_port_names) {
    243     PortRef ref;
    244     if (GetPort(name, &ref) == OK)
    245       ClosePort(ref);
    246   }
    247   return OK;
    248 }
    249 
    250 int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
    251   Port* port = port_ref.port();
    252 
    253   base::AutoLock lock(port->lock);
    254 
    255   if (port->state != Port::kReceiving)
    256     return ERROR_PORT_STATE_UNEXPECTED;
    257 
    258   port_status->has_messages = port->message_queue.HasNextMessage();
    259   port_status->receiving_messages = CanAcceptMoreMessages(port);
    260   port_status->peer_closed = port->peer_closed;
    261   return OK;
    262 }
    263 
    264 int Node::GetMessage(const PortRef& port_ref, ScopedMessage* message) {
    265   return GetMessageIf(port_ref, nullptr, message);
    266 }
    267 
    268 int Node::GetMessageIf(const PortRef& port_ref,
    269                        std::function<bool(const Message&)> selector,
    270                        ScopedMessage* message) {
    271   *message = nullptr;
    272 
    273   DVLOG(2) << "GetMessageIf for " << port_ref.name() << "@" << name_;
    274 
    275   Port* port = port_ref.port();
    276   {
    277     base::AutoLock lock(port->lock);
    278 
    279     // This could also be treated like the port being unknown since the
    280     // embedder should no longer be referring to a port that has been sent.
    281     if (port->state != Port::kReceiving)
    282       return ERROR_PORT_STATE_UNEXPECTED;
    283 
    284     // Let the embedder get messages until there are no more before reporting
    285     // that the peer closed its end.
    286     if (!CanAcceptMoreMessages(port))
    287       return ERROR_PORT_PEER_CLOSED;
    288 
    289     port->message_queue.GetNextMessageIf(std::move(selector), message);
    290   }
    291 
    292   // Allow referenced ports to trigger PortStatusChanged calls.
    293   if (*message) {
    294     for (size_t i = 0; i < (*message)->num_ports(); ++i) {
    295       const PortName& new_port_name = (*message)->ports()[i];
    296       scoped_refptr<Port> new_port = GetPort(new_port_name);
    297 
    298       DCHECK(new_port) << "Port " << new_port_name << "@" << name_
    299                        << " does not exist!";
    300 
    301       base::AutoLock lock(new_port->lock);
    302 
    303       DCHECK(new_port->state == Port::kReceiving);
    304       new_port->message_queue.set_signalable(true);
    305     }
    306   }
    307 
    308   return OK;
    309 }
    310 
    311 int Node::SendMessage(const PortRef& port_ref, ScopedMessage message) {
    312   int rv = SendMessageInternal(port_ref, &message);
    313   if (rv != OK) {
    314     // If send failed, close all carried ports. Note that we're careful not to
    315     // close the sending port itself if it happened to be one of the encoded
    316     // ports (an invalid but possible condition.)
    317     for (size_t i = 0; i < message->num_ports(); ++i) {
    318       if (message->ports()[i] == port_ref.name())
    319         continue;
    320 
    321       PortRef port;
    322       if (GetPort(message->ports()[i], &port) == OK)
    323         ClosePort(port);
    324     }
    325   }
    326   return rv;
    327 }
    328 
    329 int Node::AcceptMessage(ScopedMessage message) {
    330   const EventHeader* header = GetEventHeader(*message);
    331   switch (header->type) {
    332     case EventType::kUser:
    333       return OnUserMessage(std::move(message));
    334     case EventType::kPortAccepted:
    335       return OnPortAccepted(header->port_name);
    336     case EventType::kObserveProxy:
    337       return OnObserveProxy(
    338           header->port_name,
    339           *GetEventData<ObserveProxyEventData>(*message));
    340     case EventType::kObserveProxyAck:
    341       return OnObserveProxyAck(
    342           header->port_name,
    343           GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num);
    344     case EventType::kObserveClosure:
    345       return OnObserveClosure(
    346           header->port_name,
    347           GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
    348     case EventType::kMergePort:
    349       return OnMergePort(header->port_name,
    350                          *GetEventData<MergePortEventData>(*message));
    351   }
    352   return OOPS(ERROR_NOT_IMPLEMENTED);
    353 }
    354 
    355 int Node::MergePorts(const PortRef& port_ref,
    356                      const NodeName& destination_node_name,
    357                      const PortName& destination_port_name) {
    358   Port* port = port_ref.port();
    359   MergePortEventData data;
    360   {
    361     base::AutoLock lock(port->lock);
    362 
    363     DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
    364              << " to " << destination_port_name << "@" << destination_node_name;
    365 
    366     // Send the port-to-merge over to the destination node so it can be merged
    367     // into the port cycle atomically there.
    368     data.new_port_name = port_ref.name();
    369     WillSendPort(LockedPort(port), destination_node_name, &data.new_port_name,
    370                  &data.new_port_descriptor);
    371   }
    372   delegate_->ForwardMessage(
    373       destination_node_name,
    374       NewInternalMessage(destination_port_name,
    375                          EventType::kMergePort, data));
    376   return OK;
    377 }
    378 
    379 int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
    380   Port* port0 = port0_ref.port();
    381   Port* port1 = port1_ref.port();
    382   int rv;
    383   {
    384     // |ports_lock_| must be held when acquiring overlapping port locks.
    385     base::AutoLock ports_lock(ports_lock_);
    386     base::AutoLock port0_lock(port0->lock);
    387     base::AutoLock port1_lock(port1->lock);
    388 
    389     DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
    390              << " and " << port1_ref.name() << "@" << name_;
    391 
    392     if (port0->state != Port::kReceiving || port1->state != Port::kReceiving)
    393       rv = ERROR_PORT_STATE_UNEXPECTED;
    394     else
    395       rv = MergePorts_Locked(port0_ref, port1_ref);
    396   }
    397 
    398   if (rv != OK) {
    399     ClosePort(port0_ref);
    400     ClosePort(port1_ref);
    401   }
    402 
    403   return rv;
    404 }
    405 
    406 int Node::LostConnectionToNode(const NodeName& node_name) {
    407   // We can no longer send events to the given node. We also can't expect any
    408   // PortAccepted events.
    409 
    410   DVLOG(1) << "Observing lost connection from node " << name_
    411            << " to node " << node_name;
    412 
    413   DestroyAllPortsWithPeer(node_name, kInvalidPortName);
    414   return OK;
    415 }
    416 
    417 int Node::OnUserMessage(ScopedMessage message) {
    418   PortName port_name = GetEventHeader(*message)->port_name;
    419   const auto* event = GetEventData<UserEventData>(*message);
    420 
    421 #if DCHECK_IS_ON()
    422   std::ostringstream ports_buf;
    423   for (size_t i = 0; i < message->num_ports(); ++i) {
    424     if (i > 0)
    425       ports_buf << ",";
    426     ports_buf << message->ports()[i];
    427   }
    428 
    429   DVLOG(2) << "AcceptMessage " << event->sequence_num
    430              << " [ports=" << ports_buf.str() << "] at "
    431              << port_name << "@" << name_;
    432 #endif
    433 
    434   scoped_refptr<Port> port = GetPort(port_name);
    435 
    436   // Even if this port does not exist, cannot receive anymore messages or is
    437   // buffering or proxying messages, we still need these ports to be bound to
    438   // this node. When the message is forwarded, these ports will get transferred
    439   // following the usual method. If the message cannot be accepted, then the
    440   // newly bound ports will simply be closed.
    441 
    442   for (size_t i = 0; i < message->num_ports(); ++i) {
    443     int rv = AcceptPort(message->ports()[i], GetPortDescriptors(event)[i]);
    444     if (rv != OK)
    445       return rv;
    446   }
    447 
    448   bool has_next_message = false;
    449   bool message_accepted = false;
    450 
    451   if (port) {
    452     // We may want to forward messages once the port lock is held, so we must
    453     // acquire |ports_lock_| first.
    454     base::AutoLock ports_lock(ports_lock_);
    455     base::AutoLock lock(port->lock);
    456 
    457     // Reject spurious messages if we've already received the last expected
    458     // message.
    459     if (CanAcceptMoreMessages(port.get())) {
    460       message_accepted = true;
    461       port->message_queue.AcceptMessage(std::move(message), &has_next_message);
    462 
    463       if (port->state == Port::kBuffering) {
    464         has_next_message = false;
    465       } else if (port->state == Port::kProxying) {
    466         has_next_message = false;
    467 
    468         // Forward messages. We forward messages in sequential order here so
    469         // that we maintain the message queue's notion of next sequence number.
    470         // That's useful for the proxy removal process as we can tell when this
    471         // port has seen all of the messages it is expected to see.
    472         int rv = ForwardMessages_Locked(LockedPort(port.get()), port_name);
    473         if (rv != OK)
    474           return rv;
    475 
    476         MaybeRemoveProxy_Locked(LockedPort(port.get()), port_name);
    477       }
    478     }
    479   }
    480 
    481   if (!message_accepted) {
    482     DVLOG(2) << "Message not accepted!\n";
    483     // Close all newly accepted ports as they are effectively orphaned.
    484     for (size_t i = 0; i < message->num_ports(); ++i) {
    485       PortRef port_ref;
    486       if (GetPort(message->ports()[i], &port_ref) == OK) {
    487         ClosePort(port_ref);
    488       } else {
    489         DLOG(WARNING) << "Cannot close non-existent port!\n";
    490       }
    491     }
    492   } else if (has_next_message) {
    493     PortRef port_ref(port_name, port);
    494     delegate_->PortStatusChanged(port_ref);
    495   }
    496 
    497   return OK;
    498 }
    499 
    500 int Node::OnPortAccepted(const PortName& port_name) {
    501   scoped_refptr<Port> port = GetPort(port_name);
    502   if (!port)
    503     return ERROR_PORT_UNKNOWN;
    504 
    505   DVLOG(2) << "PortAccepted at " << port_name << "@" << name_
    506            << " pointing to "
    507            << port->peer_port_name << "@" << port->peer_node_name;
    508 
    509   return BeginProxying(PortRef(port_name, port));
    510 }
    511 
    512 int Node::OnObserveProxy(const PortName& port_name,
    513                          const ObserveProxyEventData& event) {
    514   if (port_name == kInvalidPortName) {
    515     // An ObserveProxy with an invalid target port name is a broadcast used to
    516     // inform ports when their peer (which was itself a proxy) has become
    517     // defunct due to unexpected node disconnection.
    518     //
    519     // Receiving ports affected by this treat it as equivalent to peer closure.
    520     // Proxies affected by this can be removed and will in turn broadcast their
    521     // own death with a similar message.
    522     CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName);
    523     CHECK_EQ(event.proxy_to_port_name, kInvalidPortName);
    524     DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name);
    525     return OK;
    526   }
    527 
    528   // The port may have already been closed locally, in which case the
    529   // ObserveClosure message will contain the last_sequence_num field.
    530   // We can then silently ignore this message.
    531   scoped_refptr<Port> port = GetPort(port_name);
    532   if (!port) {
    533     DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found";
    534 
    535     if (port_name != event.proxy_port_name &&
    536         port_name != event.proxy_to_port_name) {
    537       // The receiving port may have been removed while this message was in
    538       // transit.  In this case, we restart the ObserveProxy circulation from
    539       // the referenced proxy port to avoid leaking the proxy.
    540       delegate_->ForwardMessage(
    541           event.proxy_node_name,
    542           NewInternalMessage(
    543               event.proxy_port_name, EventType::kObserveProxy, event));
    544     }
    545     return OK;
    546   }
    547 
    548   DVLOG(2) << "ObserveProxy at " << port_name << "@" << name_ << ", proxy at "
    549            << event.proxy_port_name << "@"
    550            << event.proxy_node_name << " pointing to "
    551            << event.proxy_to_port_name << "@"
    552            << event.proxy_to_node_name;
    553 
    554   {
    555     base::AutoLock lock(port->lock);
    556 
    557     if (port->peer_node_name == event.proxy_node_name &&
    558         port->peer_port_name == event.proxy_port_name) {
    559       if (port->state == Port::kReceiving) {
    560         port->peer_node_name = event.proxy_to_node_name;
    561         port->peer_port_name = event.proxy_to_port_name;
    562 
    563         ObserveProxyAckEventData ack;
    564         ack.last_sequence_num = port->next_sequence_num_to_send - 1;
    565 
    566         delegate_->ForwardMessage(
    567             event.proxy_node_name,
    568             NewInternalMessage(event.proxy_port_name,
    569                                EventType::kObserveProxyAck,
    570                                ack));
    571       } else {
    572         // As a proxy ourselves, we don't know how to honor the ObserveProxy
    573         // event or to populate the last_sequence_num field of ObserveProxyAck.
    574         // Afterall, another port could be sending messages to our peer now
    575         // that we've sent out our own ObserveProxy event.  Instead, we will
    576         // send an ObserveProxyAck indicating that the ObserveProxy event
    577         // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
    578         // However, this has to be done after we are removed as a proxy.
    579         // Otherwise, we might just find ourselves back here again, which
    580         // would be akin to a busy loop.
    581 
    582         DVLOG(2) << "Delaying ObserveProxyAck to "
    583                  << event.proxy_port_name << "@" << event.proxy_node_name;
    584 
    585         ObserveProxyAckEventData ack;
    586         ack.last_sequence_num = kInvalidSequenceNum;
    587 
    588         port->send_on_proxy_removal.reset(
    589             new std::pair<NodeName, ScopedMessage>(
    590                 event.proxy_node_name,
    591                 NewInternalMessage(event.proxy_port_name,
    592                                    EventType::kObserveProxyAck,
    593                                    ack)));
    594       }
    595     } else {
    596       // Forward this event along to our peer. Eventually, it should find the
    597       // port referring to the proxy.
    598       delegate_->ForwardMessage(
    599           port->peer_node_name,
    600           NewInternalMessage(port->peer_port_name,
    601                              EventType::kObserveProxy,
    602                              event));
    603     }
    604   }
    605   return OK;
    606 }
    607 
    608 int Node::OnObserveProxyAck(const PortName& port_name,
    609                             uint64_t last_sequence_num) {
    610   DVLOG(2) << "ObserveProxyAck at " << port_name << "@" << name_
    611            << " (last_sequence_num=" << last_sequence_num << ")";
    612 
    613   scoped_refptr<Port> port = GetPort(port_name);
    614   if (!port)
    615     return ERROR_PORT_UNKNOWN;  // The port may have observed closure first, so
    616                                 // this is not an "Oops".
    617 
    618   {
    619     base::AutoLock lock(port->lock);
    620 
    621     if (port->state != Port::kProxying)
    622       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
    623 
    624     if (last_sequence_num == kInvalidSequenceNum) {
    625       // Send again.
    626       InitiateProxyRemoval(LockedPort(port.get()), port_name);
    627       return OK;
    628     }
    629 
    630     // We can now remove this port once we have received and forwarded the last
    631     // message addressed to this port.
    632     port->remove_proxy_on_last_message = true;
    633     port->last_sequence_num_to_receive = last_sequence_num;
    634   }
    635   TryRemoveProxy(PortRef(port_name, port));
    636   return OK;
    637 }
    638 
    639 int Node::OnObserveClosure(const PortName& port_name,
    640                            uint64_t last_sequence_num) {
    641   // OK if the port doesn't exist, as it may have been closed already.
    642   scoped_refptr<Port> port = GetPort(port_name);
    643   if (!port)
    644     return OK;
    645 
    646   // This message tells the port that it should no longer expect more messages
    647   // beyond last_sequence_num. This message is forwarded along until we reach
    648   // the receiving end, and this message serves as an equivalent to
    649   // ObserveProxyAck.
    650 
    651   bool notify_delegate = false;
    652   ObserveClosureEventData forwarded_data;
    653   NodeName peer_node_name;
    654   PortName peer_port_name;
    655   bool try_remove_proxy = false;
    656   {
    657     base::AutoLock lock(port->lock);
    658 
    659     port->peer_closed = true;
    660     port->last_sequence_num_to_receive = last_sequence_num;
    661 
    662     DVLOG(2) << "ObserveClosure at " << port_name << "@" << name_
    663              << " (state=" << port->state << ") pointing to "
    664              << port->peer_port_name << "@" << port->peer_node_name
    665              << " (last_sequence_num=" << last_sequence_num << ")";
    666 
    667     // We always forward ObserveClosure, even beyond the receiving port which
    668     // cares about it. This ensures that any dead-end proxies beyond that port
    669     // are notified to remove themselves.
    670 
    671     if (port->state == Port::kReceiving) {
    672       notify_delegate = true;
    673 
    674       // When forwarding along the other half of the port cycle, this will only
    675       // reach dead-end proxies. Tell them we've sent our last message so they
    676       // can go away.
    677       //
    678       // TODO: Repurposing ObserveClosure for this has the desired result but
    679       // may be semantically confusing since the forwarding port is not actually
    680       // closed. Consider replacing this with a new event type.
    681       forwarded_data.last_sequence_num = port->next_sequence_num_to_send - 1;
    682     } else {
    683       // We haven't yet reached the receiving peer of the closed port, so
    684       // forward the message along as-is.
    685       forwarded_data.last_sequence_num = last_sequence_num;
    686 
    687       // See about removing the port if it is a proxy as our peer won't be able
    688       // to participate in proxy removal.
    689       port->remove_proxy_on_last_message = true;
    690       if (port->state == Port::kProxying)
    691         try_remove_proxy = true;
    692     }
    693 
    694     DVLOG(2) << "Forwarding ObserveClosure from "
    695              << port_name << "@" << name_ << " to peer "
    696              << port->peer_port_name << "@" << port->peer_node_name
    697              << " (last_sequence_num=" << forwarded_data.last_sequence_num
    698              << ")";
    699 
    700     peer_node_name = port->peer_node_name;
    701     peer_port_name = port->peer_port_name;
    702   }
    703   if (try_remove_proxy)
    704     TryRemoveProxy(PortRef(port_name, port));
    705 
    706   delegate_->ForwardMessage(
    707       peer_node_name,
    708       NewInternalMessage(peer_port_name, EventType::kObserveClosure,
    709                          forwarded_data));
    710 
    711   if (notify_delegate) {
    712     PortRef port_ref(port_name, port);
    713     delegate_->PortStatusChanged(port_ref);
    714   }
    715   return OK;
    716 }
    717 
    718 int Node::OnMergePort(const PortName& port_name,
    719                       const MergePortEventData& event) {
    720   scoped_refptr<Port> port = GetPort(port_name);
    721 
    722   DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state="
    723            << (port ? port->state : -1) << ") merging with proxy "
    724            << event.new_port_name
    725            << "@" << name_ << " pointing to "
    726            << event.new_port_descriptor.peer_port_name << "@"
    727            << event.new_port_descriptor.peer_node_name << " referred by "
    728            << event.new_port_descriptor.referring_port_name << "@"
    729            << event.new_port_descriptor.referring_node_name;
    730 
    731   bool close_target_port = false;
    732   bool close_new_port = false;
    733 
    734   // Accept the new port. This is now the receiving end of the other port cycle
    735   // to be merged with ours.
    736   int rv = AcceptPort(event.new_port_name, event.new_port_descriptor);
    737   if (rv != OK) {
    738     close_target_port = true;
    739   } else if (port) {
    740     // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn
    741     // needs to hold |ports_lock_|. We also acquire multiple port locks within.
    742     base::AutoLock ports_lock(ports_lock_);
    743     base::AutoLock lock(port->lock);
    744 
    745     if (port->state != Port::kReceiving) {
    746       close_new_port = true;
    747     } else {
    748       scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
    749       base::AutoLock new_port_lock(new_port->lock);
    750       DCHECK(new_port->state == Port::kReceiving);
    751 
    752       // Both ports are locked. Now all we have to do is swap their peer
    753       // information and set them up as proxies.
    754 
    755       PortRef port0_ref(port_name, port);
    756       PortRef port1_ref(event.new_port_name, new_port);
    757       int rv = MergePorts_Locked(port0_ref, port1_ref);
    758       if (rv == OK)
    759         return rv;
    760 
    761       close_new_port = true;
    762       close_target_port = true;
    763     }
    764   } else {
    765     close_new_port = true;
    766   }
    767 
    768   if (close_target_port) {
    769     PortRef target_port;
    770     rv = GetPort(port_name, &target_port);
    771     DCHECK(rv == OK);
    772 
    773     ClosePort(target_port);
    774   }
    775 
    776   if (close_new_port) {
    777     PortRef new_port;
    778     rv = GetPort(event.new_port_name, &new_port);
    779     DCHECK(rv == OK);
    780 
    781     ClosePort(new_port);
    782   }
    783 
    784   return ERROR_PORT_STATE_UNEXPECTED;
    785 }
    786 
    787 int Node::AddPortWithName(const PortName& port_name,
    788                           const scoped_refptr<Port>& port) {
    789   base::AutoLock lock(ports_lock_);
    790 
    791   if (!ports_.insert(std::make_pair(port_name, port)).second)
    792     return OOPS(ERROR_PORT_EXISTS);  // Suggests a bad UUID generator.
    793 
    794   DVLOG(2) << "Created port " << port_name << "@" << name_;
    795   return OK;
    796 }
    797 
    798 void Node::ErasePort(const PortName& port_name) {
    799   base::AutoLock lock(ports_lock_);
    800   ErasePort_Locked(port_name);
    801 }
    802 
    803 void Node::ErasePort_Locked(const PortName& port_name) {
    804   ports_lock_.AssertAcquired();
    805   ports_.erase(port_name);
    806   DVLOG(2) << "Deleted port " << port_name << "@" << name_;
    807 }
    808 
    809 scoped_refptr<Port> Node::GetPort(const PortName& port_name) {
    810   base::AutoLock lock(ports_lock_);
    811   return GetPort_Locked(port_name);
    812 }
    813 
    814 scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) {
    815   ports_lock_.AssertAcquired();
    816   auto iter = ports_.find(port_name);
    817   if (iter == ports_.end())
    818     return nullptr;
    819 
    820   return iter->second;
    821 }
    822 
    823 int Node::SendMessageInternal(const PortRef& port_ref, ScopedMessage* message) {
    824   ScopedMessage& m = *message;
    825   for (size_t i = 0; i < m->num_ports(); ++i) {
    826     if (m->ports()[i] == port_ref.name())
    827       return ERROR_PORT_CANNOT_SEND_SELF;
    828   }
    829 
    830   Port* port = port_ref.port();
    831   NodeName peer_node_name;
    832   {
    833     // We must acquire |ports_lock_| before grabbing any port locks, because
    834     // WillSendMessage_Locked may need to lock multiple ports out of order.
    835     base::AutoLock ports_lock(ports_lock_);
    836     base::AutoLock lock(port->lock);
    837 
    838     if (port->state != Port::kReceiving)
    839       return ERROR_PORT_STATE_UNEXPECTED;
    840 
    841     if (port->peer_closed)
    842       return ERROR_PORT_PEER_CLOSED;
    843 
    844     int rv = WillSendMessage_Locked(LockedPort(port), port_ref.name(), m.get());
    845     if (rv != OK)
    846       return rv;
    847 
    848     // Beyond this point there's no sense in returning anything but OK. Even if
    849     // message forwarding or acceptance fails, there's nothing the embedder can
    850     // do to recover. Assume that failure beyond this point must be treated as a
    851     // transport failure.
    852 
    853     peer_node_name = port->peer_node_name;
    854   }
    855 
    856   if (peer_node_name != name_) {
    857     delegate_->ForwardMessage(peer_node_name, std::move(m));
    858     return OK;
    859   }
    860 
    861   int rv = AcceptMessage(std::move(m));
    862   if (rv != OK) {
    863     // See comment above for why we don't return an error in this case.
    864     DVLOG(2) << "AcceptMessage failed: " << rv;
    865   }
    866 
    867   return OK;
    868 }
    869 
    870 int Node::MergePorts_Locked(const PortRef& port0_ref,
    871                             const PortRef& port1_ref) {
    872   Port* port0 = port0_ref.port();
    873   Port* port1 = port1_ref.port();
    874 
    875   ports_lock_.AssertAcquired();
    876   port0->lock.AssertAcquired();
    877   port1->lock.AssertAcquired();
    878 
    879   CHECK(port0->state == Port::kReceiving);
    880   CHECK(port1->state == Port::kReceiving);
    881 
    882   // Ports cannot be merged with their own receiving peer!
    883   if (port0->peer_node_name == name_ &&
    884       port0->peer_port_name == port1_ref.name())
    885     return ERROR_PORT_STATE_UNEXPECTED;
    886 
    887   if (port1->peer_node_name == name_ &&
    888       port1->peer_port_name == port0_ref.name())
    889     return ERROR_PORT_STATE_UNEXPECTED;
    890 
    891   // Only merge if both ports have never sent a message.
    892   if (port0->next_sequence_num_to_send == kInitialSequenceNum &&
    893       port1->next_sequence_num_to_send == kInitialSequenceNum) {
    894     // Swap the ports' peer information and switch them both into buffering
    895     // (eventually proxying) mode.
    896 
    897     std::swap(port0->peer_node_name, port1->peer_node_name);
    898     std::swap(port0->peer_port_name, port1->peer_port_name);
    899 
    900     port0->state = Port::kBuffering;
    901     if (port0->peer_closed)
    902       port0->remove_proxy_on_last_message = true;
    903 
    904     port1->state = Port::kBuffering;
    905     if (port1->peer_closed)
    906       port1->remove_proxy_on_last_message = true;
    907 
    908     int rv1 = BeginProxying_Locked(LockedPort(port0), port0_ref.name());
    909     int rv2 = BeginProxying_Locked(LockedPort(port1), port1_ref.name());
    910 
    911     if (rv1 == OK && rv2 == OK) {
    912       // If either merged port had a closed peer, its new peer needs to be
    913       // informed of this.
    914       if (port1->peer_closed) {
    915         ObserveClosureEventData data;
    916         data.last_sequence_num = port0->last_sequence_num_to_receive;
    917         delegate_->ForwardMessage(
    918             port0->peer_node_name,
    919             NewInternalMessage(port0->peer_port_name,
    920                                EventType::kObserveClosure, data));
    921       }
    922 
    923       if (port0->peer_closed) {
    924         ObserveClosureEventData data;
    925         data.last_sequence_num = port1->last_sequence_num_to_receive;
    926         delegate_->ForwardMessage(
    927             port1->peer_node_name,
    928             NewInternalMessage(port1->peer_port_name,
    929                                EventType::kObserveClosure, data));
    930       }
    931 
    932       return OK;
    933     }
    934 
    935     // If either proxy failed to initialize (e.g. had undeliverable messages
    936     // or ended up in a bad state somehow), we keep the system in a consistent
    937     // state by undoing the peer swap.
    938     std::swap(port0->peer_node_name, port1->peer_node_name);
    939     std::swap(port0->peer_port_name, port1->peer_port_name);
    940     port0->remove_proxy_on_last_message = false;
    941     port1->remove_proxy_on_last_message = false;
    942     port0->state = Port::kReceiving;
    943     port1->state = Port::kReceiving;
    944   }
    945 
    946   return ERROR_PORT_STATE_UNEXPECTED;
    947 }
    948 
    949 void Node::WillSendPort(const LockedPort& port,
    950                         const NodeName& to_node_name,
    951                         PortName* port_name,
    952                         PortDescriptor* port_descriptor) {
    953   port->lock.AssertAcquired();
    954 
    955   PortName local_port_name = *port_name;
    956 
    957   PortName new_port_name;
    958   delegate_->GenerateRandomPortName(&new_port_name);
    959 
    960   // Make sure we don't send messages to the new peer until after we know it
    961   // exists. In the meantime, just buffer messages locally.
    962   DCHECK(port->state == Port::kReceiving);
    963   port->state = Port::kBuffering;
    964 
    965   // If we already know our peer is closed, we already know this proxy can
    966   // be removed once it receives and forwards its last expected message.
    967   if (port->peer_closed)
    968     port->remove_proxy_on_last_message = true;
    969 
    970   *port_name = new_port_name;
    971 
    972   port_descriptor->peer_node_name = port->peer_node_name;
    973   port_descriptor->peer_port_name = port->peer_port_name;
    974   port_descriptor->referring_node_name = name_;
    975   port_descriptor->referring_port_name = local_port_name;
    976   port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
    977   port_descriptor->next_sequence_num_to_receive =
    978       port->message_queue.next_sequence_num();
    979   port_descriptor->last_sequence_num_to_receive =
    980       port->last_sequence_num_to_receive;
    981   port_descriptor->peer_closed = port->peer_closed;
    982   memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));
    983 
    984   // Configure the local port to point to the new port.
    985   port->peer_node_name = to_node_name;
    986   port->peer_port_name = new_port_name;
    987 }
    988 
    989 int Node::AcceptPort(const PortName& port_name,
    990                      const PortDescriptor& port_descriptor) {
    991   scoped_refptr<Port> port = make_scoped_refptr(
    992       new Port(port_descriptor.next_sequence_num_to_send,
    993                port_descriptor.next_sequence_num_to_receive));
    994   port->state = Port::kReceiving;
    995   port->peer_node_name = port_descriptor.peer_node_name;
    996   port->peer_port_name = port_descriptor.peer_port_name;
    997   port->last_sequence_num_to_receive =
    998       port_descriptor.last_sequence_num_to_receive;
    999   port->peer_closed = port_descriptor.peer_closed;
   1000 
   1001   DVLOG(2) << "Accepting port " << port_name << " [peer_closed="
   1002            << port->peer_closed << "; last_sequence_num_to_receive="
   1003            << port->last_sequence_num_to_receive << "]";
   1004 
   1005   // A newly accepted port is not signalable until the message referencing the
   1006   // new port finds its way to the consumer (see GetMessageIf).
   1007   port->message_queue.set_signalable(false);
   1008 
   1009   int rv = AddPortWithName(port_name, port);
   1010   if (rv != OK)
   1011     return rv;
   1012 
   1013   // Allow referring port to forward messages.
   1014   delegate_->ForwardMessage(
   1015       port_descriptor.referring_node_name,
   1016       NewInternalMessage(port_descriptor.referring_port_name,
   1017                          EventType::kPortAccepted));
   1018   return OK;
   1019 }
   1020 
   1021 int Node::WillSendMessage_Locked(const LockedPort& port,
   1022                                  const PortName& port_name,
   1023                                  Message* message) {
   1024   ports_lock_.AssertAcquired();
   1025   port->lock.AssertAcquired();
   1026 
   1027   DCHECK(message);
   1028 
   1029   // Messages may already have a sequence number if they're being forwarded
   1030   // by a proxy. Otherwise, use the next outgoing sequence number.
   1031   uint64_t* sequence_num =
   1032       &GetMutableEventData<UserEventData>(message)->sequence_num;
   1033   if (*sequence_num == 0)
   1034     *sequence_num = port->next_sequence_num_to_send++;
   1035 
   1036 #if DCHECK_IS_ON()
   1037   std::ostringstream ports_buf;
   1038   for (size_t i = 0; i < message->num_ports(); ++i) {
   1039     if (i > 0)
   1040       ports_buf << ",";
   1041     ports_buf << message->ports()[i];
   1042   }
   1043 #endif
   1044 
   1045   if (message->num_ports() > 0) {
   1046     // Note: Another thread could be trying to send the same ports, so we need
   1047     // to ensure that they are ours to send before we mutate their state.
   1048 
   1049     std::vector<scoped_refptr<Port>> ports;
   1050     ports.resize(message->num_ports());
   1051 
   1052     {
   1053       for (size_t i = 0; i < message->num_ports(); ++i) {
   1054         ports[i] = GetPort_Locked(message->ports()[i]);
   1055         DCHECK(ports[i]);
   1056 
   1057         ports[i]->lock.Acquire();
   1058         int error = OK;
   1059         if (ports[i]->state != Port::kReceiving)
   1060           error = ERROR_PORT_STATE_UNEXPECTED;
   1061         else if (message->ports()[i] == port->peer_port_name)
   1062           error = ERROR_PORT_CANNOT_SEND_PEER;
   1063 
   1064         if (error != OK) {
   1065           // Oops, we cannot send this port.
   1066           for (size_t j = 0; j <= i; ++j)
   1067             ports[i]->lock.Release();
   1068           // Backpedal on the sequence number.
   1069           port->next_sequence_num_to_send--;
   1070           return error;
   1071         }
   1072       }
   1073     }
   1074 
   1075     PortDescriptor* port_descriptors =
   1076         GetMutablePortDescriptors(GetMutableEventData<UserEventData>(message));
   1077 
   1078     for (size_t i = 0; i < message->num_ports(); ++i) {
   1079       WillSendPort(LockedPort(ports[i].get()),
   1080                    port->peer_node_name,
   1081                    message->mutable_ports() + i,
   1082                    port_descriptors + i);
   1083     }
   1084 
   1085     for (size_t i = 0; i < message->num_ports(); ++i)
   1086       ports[i]->lock.Release();
   1087   }
   1088 
   1089 #if DCHECK_IS_ON()
   1090   DVLOG(2) << "Sending message "
   1091            << GetEventData<UserEventData>(*message)->sequence_num
   1092            << " [ports=" << ports_buf.str() << "]"
   1093            << " from " << port_name << "@" << name_
   1094            << " to " << port->peer_port_name << "@" << port->peer_node_name;
   1095 #endif
   1096 
   1097   GetMutableEventHeader(message)->port_name = port->peer_port_name;
   1098   return OK;
   1099 }
   1100 
   1101 int Node::BeginProxying_Locked(const LockedPort& port,
   1102                                const PortName& port_name) {
   1103   ports_lock_.AssertAcquired();
   1104   port->lock.AssertAcquired();
   1105 
   1106   if (port->state != Port::kBuffering)
   1107     return OOPS(ERROR_PORT_STATE_UNEXPECTED);
   1108 
   1109   port->state = Port::kProxying;
   1110 
   1111   int rv = ForwardMessages_Locked(LockedPort(port), port_name);
   1112   if (rv != OK)
   1113     return rv;
   1114 
   1115   // We may have observed closure while buffering. In that case, we can advance
   1116   // to removing the proxy without sending out an ObserveProxy message. We
   1117   // already know the last expected message, etc.
   1118 
   1119   if (port->remove_proxy_on_last_message) {
   1120     MaybeRemoveProxy_Locked(LockedPort(port), port_name);
   1121 
   1122     // Make sure we propagate closure to our current peer.
   1123     ObserveClosureEventData data;
   1124     data.last_sequence_num = port->last_sequence_num_to_receive;
   1125     delegate_->ForwardMessage(
   1126         port->peer_node_name,
   1127         NewInternalMessage(port->peer_port_name,
   1128                            EventType::kObserveClosure, data));
   1129   } else {
   1130     InitiateProxyRemoval(LockedPort(port), port_name);
   1131   }
   1132 
   1133   return OK;
   1134 }
   1135 
   1136 int Node::BeginProxying(PortRef port_ref) {
   1137   Port* port = port_ref.port();
   1138   {
   1139     base::AutoLock ports_lock(ports_lock_);
   1140     base::AutoLock lock(port->lock);
   1141 
   1142     if (port->state != Port::kBuffering)
   1143       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
   1144 
   1145     port->state = Port::kProxying;
   1146 
   1147     int rv = ForwardMessages_Locked(LockedPort(port), port_ref.name());
   1148     if (rv != OK)
   1149       return rv;
   1150   }
   1151 
   1152   bool should_remove;
   1153   NodeName peer_node_name;
   1154   ScopedMessage closure_message;
   1155   {
   1156     base::AutoLock lock(port->lock);
   1157     if (port->state != Port::kProxying)
   1158       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
   1159 
   1160     should_remove = port->remove_proxy_on_last_message;
   1161     if (should_remove) {
   1162       // Make sure we propagate closure to our current peer.
   1163       ObserveClosureEventData data;
   1164       data.last_sequence_num = port->last_sequence_num_to_receive;
   1165       peer_node_name = port->peer_node_name;
   1166       closure_message = NewInternalMessage(port->peer_port_name,
   1167                                            EventType::kObserveClosure, data);
   1168     } else {
   1169       InitiateProxyRemoval(LockedPort(port), port_ref.name());
   1170     }
   1171   }
   1172 
   1173   if (should_remove) {
   1174     TryRemoveProxy(port_ref);
   1175     delegate_->ForwardMessage(peer_node_name, std::move(closure_message));
   1176   }
   1177 
   1178   return OK;
   1179 }
   1180 
   1181 int Node::ForwardMessages_Locked(const LockedPort& port,
   1182                                  const PortName &port_name) {
   1183   ports_lock_.AssertAcquired();
   1184   port->lock.AssertAcquired();
   1185 
   1186   for (;;) {
   1187     ScopedMessage message;
   1188     port->message_queue.GetNextMessageIf(nullptr, &message);
   1189     if (!message)
   1190       break;
   1191 
   1192     int rv = WillSendMessage_Locked(LockedPort(port), port_name, message.get());
   1193     if (rv != OK)
   1194       return rv;
   1195 
   1196     delegate_->ForwardMessage(port->peer_node_name, std::move(message));
   1197   }
   1198   return OK;
   1199 }
   1200 
   1201 void Node::InitiateProxyRemoval(const LockedPort& port,
   1202                                 const PortName& port_name) {
   1203   port->lock.AssertAcquired();
   1204 
   1205   // To remove this node, we start by notifying the connected graph that we are
   1206   // a proxy. This allows whatever port is referencing this node to skip it.
   1207   // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
   1208   // the peer was closed in the meantime).
   1209 
   1210   ObserveProxyEventData data;
   1211   data.proxy_node_name = name_;
   1212   data.proxy_port_name = port_name;
   1213   data.proxy_to_node_name = port->peer_node_name;
   1214   data.proxy_to_port_name = port->peer_port_name;
   1215 
   1216   delegate_->ForwardMessage(
   1217       port->peer_node_name,
   1218       NewInternalMessage(port->peer_port_name, EventType::kObserveProxy, data));
   1219 }
   1220 
   1221 void Node::MaybeRemoveProxy_Locked(const LockedPort& port,
   1222                                    const PortName& port_name) {
   1223   // |ports_lock_| must be held so we can potentilaly ErasePort_Locked().
   1224   ports_lock_.AssertAcquired();
   1225   port->lock.AssertAcquired();
   1226 
   1227   DCHECK(port->state == Port::kProxying);
   1228 
   1229   // Make sure we have seen ObserveProxyAck before removing the port.
   1230   if (!port->remove_proxy_on_last_message)
   1231     return;
   1232 
   1233   if (!CanAcceptMoreMessages(port.get())) {
   1234     // This proxy port is done. We can now remove it!
   1235     ErasePort_Locked(port_name);
   1236 
   1237     if (port->send_on_proxy_removal) {
   1238       NodeName to_node = port->send_on_proxy_removal->first;
   1239       ScopedMessage& message = port->send_on_proxy_removal->second;
   1240 
   1241       delegate_->ForwardMessage(to_node, std::move(message));
   1242       port->send_on_proxy_removal.reset();
   1243     }
   1244   } else {
   1245     DVLOG(2) << "Cannot remove port " << port_name << "@" << name_
   1246              << " now; waiting for more messages";
   1247   }
   1248 }
   1249 
   1250 void Node::TryRemoveProxy(PortRef port_ref) {
   1251   Port* port = port_ref.port();
   1252   bool should_erase = false;
   1253   ScopedMessage msg;
   1254   NodeName to_node;
   1255   {
   1256     base::AutoLock lock(port->lock);
   1257 
   1258     // Port already removed. Nothing to do.
   1259     if (port->state == Port::kClosed)
   1260       return;
   1261 
   1262     DCHECK(port->state == Port::kProxying);
   1263 
   1264     // Make sure we have seen ObserveProxyAck before removing the port.
   1265     if (!port->remove_proxy_on_last_message)
   1266       return;
   1267 
   1268     if (!CanAcceptMoreMessages(port)) {
   1269       // This proxy port is done. We can now remove it!
   1270       should_erase = true;
   1271 
   1272       if (port->send_on_proxy_removal) {
   1273         to_node = port->send_on_proxy_removal->first;
   1274         msg = std::move(port->send_on_proxy_removal->second);
   1275         port->send_on_proxy_removal.reset();
   1276       }
   1277     } else {
   1278       DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_
   1279                << " now; waiting for more messages";
   1280     }
   1281   }
   1282 
   1283   if (should_erase)
   1284     ErasePort(port_ref.name());
   1285 
   1286   if (msg)
   1287     delegate_->ForwardMessage(to_node, std::move(msg));
   1288 }
   1289 
   1290 void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
   1291                                    const PortName& port_name) {
   1292   // Wipes out all ports whose peer node matches |node_name| and whose peer port
   1293   // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
   1294   // node is matched.
   1295 
   1296   std::vector<PortRef> ports_to_notify;
   1297   std::vector<PortName> dead_proxies_to_broadcast;
   1298   std::deque<PortName> referenced_port_names;
   1299 
   1300   {
   1301     base::AutoLock ports_lock(ports_lock_);
   1302 
   1303     for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) {
   1304       Port* port = iter->second.get();
   1305       {
   1306         base::AutoLock port_lock(port->lock);
   1307 
   1308         if (port->peer_node_name == node_name &&
   1309               (port_name == kInvalidPortName ||
   1310                     port->peer_port_name == port_name)) {
   1311           if (!port->peer_closed) {
   1312             // Treat this as immediate peer closure. It's an exceptional
   1313             // condition akin to a broken pipe, so we don't care about losing
   1314             // messages.
   1315 
   1316             port->peer_closed = true;
   1317             port->last_sequence_num_to_receive =
   1318                 port->message_queue.next_sequence_num() - 1;
   1319 
   1320             if (port->state == Port::kReceiving)
   1321               ports_to_notify.push_back(PortRef(iter->first, port));
   1322           }
   1323 
   1324           // We don't expect to forward any further messages, and we don't
   1325           // expect to receive a Port{Accepted,Rejected} event. Because we're
   1326           // a proxy with no active peer, we cannot use the normal proxy removal
   1327           // procedure of forward-propagating an ObserveProxy. Instead we
   1328           // broadcast our own death so it can be back-propagated. This is
   1329           // inefficient but rare.
   1330           if (port->state != Port::kReceiving) {
   1331             dead_proxies_to_broadcast.push_back(iter->first);
   1332             iter->second->message_queue.GetReferencedPorts(
   1333                 &referenced_port_names);
   1334           }
   1335         }
   1336       }
   1337     }
   1338 
   1339     for (const auto& proxy_name : dead_proxies_to_broadcast) {
   1340       ports_.erase(proxy_name);
   1341       DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
   1342     }
   1343   }
   1344 
   1345   // Wake up any receiving ports who have just observed simulated peer closure.
   1346   for (const auto& port : ports_to_notify)
   1347     delegate_->PortStatusChanged(port);
   1348 
   1349   for (const auto& proxy_name : dead_proxies_to_broadcast) {
   1350     // Broadcast an event signifying that this proxy is no longer functioning.
   1351     ObserveProxyEventData event;
   1352     event.proxy_node_name = name_;
   1353     event.proxy_port_name = proxy_name;
   1354     event.proxy_to_node_name = kInvalidNodeName;
   1355     event.proxy_to_port_name = kInvalidPortName;
   1356     delegate_->BroadcastMessage(NewInternalMessage(
   1357         kInvalidPortName, EventType::kObserveProxy, event));
   1358 
   1359     // Also process death locally since the port that points this closed one
   1360     // could be on the current node.
   1361     // Note: Although this is recursive, only a single port is involved which
   1362     // limits the expected branching to 1.
   1363     DestroyAllPortsWithPeer(name_, proxy_name);
   1364   }
   1365 
   1366   // Close any ports referenced by the closed proxies.
   1367   for (const auto& name : referenced_port_names) {
   1368     PortRef ref;
   1369     if (GetPort(name, &ref) == OK)
   1370       ClosePort(ref);
   1371   }
   1372 }
   1373 
   1374 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name,
   1375                                               const EventType& type,
   1376                                               const void* data,
   1377                                               size_t num_data_bytes) {
   1378   ScopedMessage message;
   1379   delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message);
   1380 
   1381   EventHeader* header = GetMutableEventHeader(message.get());
   1382   header->port_name = port_name;
   1383   header->type = type;
   1384   header->padding = 0;
   1385 
   1386   if (num_data_bytes)
   1387     memcpy(header + 1, data, num_data_bytes);
   1388 
   1389   return message;
   1390 }
   1391 
   1392 }  // namespace ports
   1393 }  // namespace edk
   1394 }  // namespace mojo
   1395