Home | History | Annotate | Download | only in ports
      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include <inttypes.h>
      6 #include <stdio.h>
      7 #include <stdlib.h>
      8 #include <string.h>
      9 
     10 #include <map>
     11 #include <sstream>
     12 #include <utility>
     13 
     14 #include "base/bind.h"
     15 #include "base/callback.h"
     16 #include "base/containers/queue.h"
     17 #include "base/logging.h"
     18 #include "base/memory/ref_counted.h"
     19 #include "base/strings/string_piece.h"
     20 #include "base/strings/stringprintf.h"
     21 #include "base/synchronization/lock.h"
     22 #include "base/synchronization/waitable_event.h"
     23 #include "base/test/scoped_task_environment.h"
     24 #include "base/threading/thread.h"
     25 #include "mojo/core/ports/event.h"
     26 #include "mojo/core/ports/node.h"
     27 #include "mojo/core/ports/node_delegate.h"
     28 #include "mojo/core/ports/user_message.h"
     29 #include "testing/gtest/include/gtest/gtest.h"
     30 
     31 namespace mojo {
     32 namespace core {
     33 namespace ports {
     34 namespace test {
     35 
     36 namespace {
     37 
     38 // TODO(rockot): Remove this unnecessary alias.
     39 using ScopedMessage = std::unique_ptr<UserMessageEvent>;
     40 
     41 class TestMessage : public UserMessage {
     42  public:
     43   static const TypeInfo kUserMessageTypeInfo;
     44 
     45   TestMessage(const base::StringPiece& payload)
     46       : UserMessage(&kUserMessageTypeInfo), payload_(payload) {}
     47   ~TestMessage() override {}
     48 
     49   const std::string& payload() const { return payload_; }
     50 
     51  private:
     52   std::string payload_;
     53 };
     54 
     55 const UserMessage::TypeInfo TestMessage::kUserMessageTypeInfo = {};
     56 
     57 ScopedMessage NewUserMessageEvent(const base::StringPiece& payload,
     58                                   size_t num_ports) {
     59   auto event = std::make_unique<UserMessageEvent>(num_ports);
     60   event->AttachMessage(std::make_unique<TestMessage>(payload));
     61   return event;
     62 }
     63 
     64 bool MessageEquals(const ScopedMessage& message, const base::StringPiece& s) {
     65   return message->GetMessage<TestMessage>()->payload() == s;
     66 }
     67 
     68 class TestNode;
     69 
     70 class MessageRouter {
     71  public:
     72   virtual ~MessageRouter() {}
     73 
     74   virtual void ForwardEvent(TestNode* from_node,
     75                             const NodeName& node_name,
     76                             ScopedEvent event) = 0;
     77   virtual void BroadcastEvent(TestNode* from_node, ScopedEvent event) = 0;
     78 };
     79 
     80 class TestNode : public NodeDelegate {
     81  public:
     82   explicit TestNode(uint64_t id)
     83       : node_name_(id, 1),
     84         node_(node_name_, this),
     85         node_thread_(base::StringPrintf("Node %" PRIu64 " thread", id)),
     86         events_available_event_(
     87             base::WaitableEvent::ResetPolicy::AUTOMATIC,
     88             base::WaitableEvent::InitialState::NOT_SIGNALED),
     89         idle_event_(base::WaitableEvent::ResetPolicy::MANUAL,
     90                     base::WaitableEvent::InitialState::SIGNALED) {}
     91 
     92   ~TestNode() override {
     93     StopWhenIdle();
     94     node_thread_.Stop();
     95   }
     96 
     97   const NodeName& name() const { return node_name_; }
     98 
     99   // NOTE: Node is thread-safe.
    100   Node& node() { return node_; }
    101 
    102   base::WaitableEvent& idle_event() { return idle_event_; }
    103 
    104   bool IsIdle() {
    105     base::AutoLock lock(lock_);
    106     return started_ && !dispatching_ &&
    107            (incoming_events_.empty() || (block_on_event_ && blocked_));
    108   }
    109 
    110   void BlockOnEvent(Event::Type type) {
    111     base::AutoLock lock(lock_);
    112     blocked_event_type_ = type;
    113     block_on_event_ = true;
    114   }
    115 
    116   void Unblock() {
    117     base::AutoLock lock(lock_);
    118     block_on_event_ = false;
    119     events_available_event_.Signal();
    120   }
    121 
    122   void Start(MessageRouter* router) {
    123     router_ = router;
    124     node_thread_.Start();
    125     node_thread_.task_runner()->PostTask(
    126         FROM_HERE,
    127         base::Bind(&TestNode::ProcessEvents, base::Unretained(this)));
    128   }
    129 
    130   void StopWhenIdle() {
    131     base::AutoLock lock(lock_);
    132     should_quit_ = true;
    133     events_available_event_.Signal();
    134   }
    135 
    136   void WakeUp() { events_available_event_.Signal(); }
    137 
    138   int SendStringMessage(const PortRef& port, const std::string& s) {
    139     return node_.SendUserMessage(port, NewUserMessageEvent(s, 0));
    140   }
    141 
    142   int SendStringMessageWithPort(const PortRef& port,
    143                                 const std::string& s,
    144                                 const PortName& sent_port_name) {
    145     auto event = NewUserMessageEvent(s, 1);
    146     event->ports()[0] = sent_port_name;
    147     return node_.SendUserMessage(port, std::move(event));
    148   }
    149 
    150   int SendStringMessageWithPort(const PortRef& port,
    151                                 const std::string& s,
    152                                 const PortRef& sent_port) {
    153     return SendStringMessageWithPort(port, s, sent_port.name());
    154   }
    155 
    156   void set_drop_messages(bool value) {
    157     base::AutoLock lock(lock_);
    158     drop_messages_ = value;
    159   }
    160 
    161   void set_save_messages(bool value) {
    162     base::AutoLock lock(lock_);
    163     save_messages_ = value;
    164   }
    165 
    166   bool ReadMessage(const PortRef& port, ScopedMessage* message) {
    167     return node_.GetMessage(port, message, nullptr) == OK && *message;
    168   }
    169 
    170   bool GetSavedMessage(ScopedMessage* message) {
    171     base::AutoLock lock(lock_);
    172     if (saved_messages_.empty()) {
    173       message->reset();
    174       return false;
    175     }
    176     std::swap(*message, saved_messages_.front());
    177     saved_messages_.pop();
    178     return true;
    179   }
    180 
    181   void EnqueueEvent(ScopedEvent event) {
    182     idle_event_.Reset();
    183 
    184     // NOTE: This may be called from ForwardMessage and thus must not reenter
    185     // |node_|.
    186     base::AutoLock lock(lock_);
    187     incoming_events_.emplace(std::move(event));
    188     events_available_event_.Signal();
    189   }
    190 
    191   void ForwardEvent(const NodeName& node_name, ScopedEvent event) override {
    192     {
    193       base::AutoLock lock(lock_);
    194       if (drop_messages_) {
    195         DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to "
    196                  << node_name;
    197 
    198         base::AutoUnlock unlock(lock_);
    199         ClosePortsInEvent(event.get());
    200         return;
    201       }
    202     }
    203 
    204     DCHECK(router_);
    205     DVLOG(1) << "ForwardEvent from node " << node_name_ << " to " << node_name;
    206     router_->ForwardEvent(this, node_name, std::move(event));
    207   }
    208 
    209   void BroadcastEvent(ScopedEvent event) override {
    210     router_->BroadcastEvent(this, std::move(event));
    211   }
    212 
    213   void PortStatusChanged(const PortRef& port) override {
    214     // The port may be closed, in which case we ignore the notification.
    215     base::AutoLock lock(lock_);
    216     if (!save_messages_)
    217       return;
    218 
    219     for (;;) {
    220       ScopedMessage message;
    221       {
    222         base::AutoUnlock unlock(lock_);
    223         if (!ReadMessage(port, &message))
    224           break;
    225       }
    226 
    227       saved_messages_.emplace(std::move(message));
    228     }
    229   }
    230 
    231   void ClosePortsInEvent(Event* event) {
    232     if (event->type() != Event::Type::kUserMessage)
    233       return;
    234 
    235     UserMessageEvent* message_event = static_cast<UserMessageEvent*>(event);
    236     for (size_t i = 0; i < message_event->num_ports(); ++i) {
    237       PortRef port;
    238       ASSERT_EQ(OK, node_.GetPort(message_event->ports()[i], &port));
    239       EXPECT_EQ(OK, node_.ClosePort(port));
    240     }
    241   }
    242 
    243  private:
    244   void ProcessEvents() {
    245     for (;;) {
    246       events_available_event_.Wait();
    247       base::AutoLock lock(lock_);
    248 
    249       if (should_quit_)
    250         return;
    251 
    252       dispatching_ = true;
    253       while (!incoming_events_.empty()) {
    254         if (block_on_event_ &&
    255             incoming_events_.front()->type() == blocked_event_type_) {
    256           blocked_ = true;
    257           // Go idle if we hit a blocked event type.
    258           break;
    259         } else {
    260           blocked_ = false;
    261         }
    262         ScopedEvent event = std::move(incoming_events_.front());
    263         incoming_events_.pop();
    264 
    265         // NOTE: AcceptMessage() can re-enter this object to call any of the
    266         // NodeDelegate interface methods.
    267         base::AutoUnlock unlock(lock_);
    268         node_.AcceptEvent(std::move(event));
    269       }
    270 
    271       dispatching_ = false;
    272       started_ = true;
    273       idle_event_.Signal();
    274     };
    275   }
    276 
    277   const NodeName node_name_;
    278   Node node_;
    279   MessageRouter* router_ = nullptr;
    280 
    281   base::Thread node_thread_;
    282   base::WaitableEvent events_available_event_;
    283   base::WaitableEvent idle_event_;
    284 
    285   // Guards fields below.
    286   base::Lock lock_;
    287   bool started_ = false;
    288   bool dispatching_ = false;
    289   bool should_quit_ = false;
    290   bool drop_messages_ = false;
    291   bool save_messages_ = false;
    292   bool blocked_ = false;
    293   bool block_on_event_ = false;
    294   Event::Type blocked_event_type_;
    295   base::queue<ScopedEvent> incoming_events_;
    296   base::queue<ScopedMessage> saved_messages_;
    297 };
    298 
    299 class PortsTest : public testing::Test, public MessageRouter {
    300  public:
    301   void AddNode(TestNode* node) {
    302     {
    303       base::AutoLock lock(lock_);
    304       nodes_[node->name()] = node;
    305     }
    306     node->Start(this);
    307   }
    308 
    309   void RemoveNode(TestNode* node) {
    310     {
    311       base::AutoLock lock(lock_);
    312       nodes_.erase(node->name());
    313     }
    314 
    315     for (const auto& entry : nodes_)
    316       entry.second->node().LostConnectionToNode(node->name());
    317   }
    318 
    319   // Waits until all known Nodes are idle. Message forwarding and processing
    320   // is handled in such a way that idleness is a stable state: once all nodes in
    321   // the system are idle, they will remain idle until the test explicitly
    322   // initiates some further event (e.g. sending a message, closing a port, or
    323   // removing a Node).
    324   void WaitForIdle() {
    325     for (;;) {
    326       base::AutoLock global_lock(global_lock_);
    327       bool all_nodes_idle = true;
    328       for (const auto& entry : nodes_) {
    329         if (!entry.second->IsIdle())
    330           all_nodes_idle = false;
    331         entry.second->WakeUp();
    332       }
    333       if (all_nodes_idle)
    334         return;
    335 
    336       // Wait for any Node to signal that it's idle.
    337       base::AutoUnlock global_unlock(global_lock_);
    338       std::vector<base::WaitableEvent*> events;
    339       for (const auto& entry : nodes_)
    340         events.push_back(&entry.second->idle_event());
    341       base::WaitableEvent::WaitMany(events.data(), events.size());
    342     }
    343   }
    344 
    345   void CreatePortPair(TestNode* node0,
    346                       PortRef* port0,
    347                       TestNode* node1,
    348                       PortRef* port1) {
    349     if (node0 == node1) {
    350       EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1));
    351     } else {
    352       EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0));
    353       EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1));
    354       EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(),
    355                                                  port1->name()));
    356       EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(),
    357                                                  port0->name()));
    358     }
    359   }
    360 
    361  private:
    362   // MessageRouter:
    363   void ForwardEvent(TestNode* from_node,
    364                     const NodeName& node_name,
    365                     ScopedEvent event) override {
    366     base::AutoLock global_lock(global_lock_);
    367     base::AutoLock lock(lock_);
    368     // Drop messages from nodes that have been removed.
    369     if (nodes_.find(from_node->name()) == nodes_.end()) {
    370       from_node->ClosePortsInEvent(event.get());
    371       return;
    372     }
    373 
    374     auto it = nodes_.find(node_name);
    375     if (it == nodes_.end()) {
    376       DVLOG(1) << "Node not found: " << node_name;
    377       return;
    378     }
    379 
    380     it->second->EnqueueEvent(std::move(event));
    381   }
    382 
    383   void BroadcastEvent(TestNode* from_node, ScopedEvent event) override {
    384     base::AutoLock global_lock(global_lock_);
    385     base::AutoLock lock(lock_);
    386 
    387     // Drop messages from nodes that have been removed.
    388     if (nodes_.find(from_node->name()) == nodes_.end())
    389       return;
    390 
    391     for (const auto& entry : nodes_) {
    392       TestNode* node = entry.second;
    393       // Broadcast doesn't deliver to the local node.
    394       if (node == from_node)
    395         continue;
    396       node->EnqueueEvent(event->Clone());
    397     }
    398   }
    399 
    400   base::test::ScopedTaskEnvironment scoped_task_environment_;
    401 
    402   // Acquired before any operation which makes a Node busy, and before testing
    403   // if all nodes are idle.
    404   base::Lock global_lock_;
    405 
    406   base::Lock lock_;
    407   std::map<NodeName, TestNode*> nodes_;
    408 };
    409 
    410 }  // namespace
    411 
    412 TEST_F(PortsTest, Basic1) {
    413   TestNode node0(0);
    414   AddNode(&node0);
    415 
    416   TestNode node1(1);
    417   AddNode(&node1);
    418 
    419   PortRef x0, x1;
    420   CreatePortPair(&node0, &x0, &node1, &x1);
    421 
    422   PortRef a0, a1;
    423   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
    424   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
    425   EXPECT_EQ(OK, node0.node().ClosePort(a0));
    426 
    427   EXPECT_EQ(OK, node0.node().ClosePort(x0));
    428   EXPECT_EQ(OK, node1.node().ClosePort(x1));
    429 
    430   WaitForIdle();
    431 
    432   EXPECT_TRUE(node0.node().CanShutdownCleanly());
    433   EXPECT_TRUE(node1.node().CanShutdownCleanly());
    434 }
    435 
    436 TEST_F(PortsTest, Basic2) {
    437   TestNode node0(0);
    438   AddNode(&node0);
    439 
    440   TestNode node1(1);
    441   AddNode(&node1);
    442 
    443   PortRef x0, x1;
    444   CreatePortPair(&node0, &x0, &node1, &x1);
    445 
    446   PortRef b0, b1;
    447   EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
    448   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1));
    449   EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again"));
    450 
    451   EXPECT_EQ(OK, node0.node().ClosePort(b0));
    452 
    453   EXPECT_EQ(OK, node0.node().ClosePort(x0));
    454   EXPECT_EQ(OK, node1.node().ClosePort(x1));
    455 
    456   WaitForIdle();
    457 
    458   EXPECT_TRUE(node0.node().CanShutdownCleanly());
    459   EXPECT_TRUE(node1.node().CanShutdownCleanly());
    460 }
    461 
    462 TEST_F(PortsTest, Basic3) {
    463   TestNode node0(0);
    464   AddNode(&node0);
    465 
    466   TestNode node1(1);
    467   AddNode(&node1);
    468 
    469   PortRef x0, x1;
    470   CreatePortPair(&node0, &x0, &node1, &x1);
    471 
    472   PortRef a0, a1;
    473   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
    474 
    475   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
    476   EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again"));
    477 
    478   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0));
    479 
    480   PortRef b0, b1;
    481   EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
    482   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1));
    483   EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz"));
    484 
    485   EXPECT_EQ(OK, node0.node().ClosePort(b0));
    486 
    487   EXPECT_EQ(OK, node0.node().ClosePort(x0));
    488   EXPECT_EQ(OK, node1.node().ClosePort(x1));
    489 
    490   WaitForIdle();
    491 
    492   EXPECT_TRUE(node0.node().CanShutdownCleanly());
    493   EXPECT_TRUE(node1.node().CanShutdownCleanly());
    494 }
    495 
    496 TEST_F(PortsTest, LostConnectionToNode1) {
    497   TestNode node0(0);
    498   AddNode(&node0);
    499 
    500   TestNode node1(1);
    501   AddNode(&node1);
    502   node1.set_drop_messages(true);
    503 
    504   PortRef x0, x1;
    505   CreatePortPair(&node0, &x0, &node1, &x1);
    506 
    507   // Transfer a port to node1 and simulate a lost connection to node1.
    508 
    509   PortRef a0, a1;
    510   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
    511   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
    512 
    513   WaitForIdle();
    514 
    515   RemoveNode(&node1);
    516 
    517   WaitForIdle();
    518 
    519   EXPECT_EQ(OK, node0.node().ClosePort(a0));
    520   EXPECT_EQ(OK, node0.node().ClosePort(x0));
    521   EXPECT_EQ(OK, node1.node().ClosePort(x1));
    522 
    523   WaitForIdle();
    524 
    525   EXPECT_TRUE(node0.node().CanShutdownCleanly());
    526   EXPECT_TRUE(node1.node().CanShutdownCleanly());
    527 }
    528 
    529 TEST_F(PortsTest, LostConnectionToNode2) {
    530   TestNode node0(0);
    531   AddNode(&node0);
    532 
    533   TestNode node1(1);
    534   AddNode(&node1);
    535 
    536   PortRef x0, x1;
    537   CreatePortPair(&node0, &x0, &node1, &x1);
    538 
    539   PortRef a0, a1;
    540   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
    541   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1));
    542 
    543   WaitForIdle();
    544 
    545   node1.set_drop_messages(true);
    546 
    547   RemoveNode(&node1);
    548 
    549   WaitForIdle();
    550 
    551   // a0 should have eventually detected peer closure after node loss.
    552   ScopedMessage message;
    553   EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
    554             node0.node().GetMessage(a0, &message, nullptr));
    555   EXPECT_FALSE(message);
    556 
    557   EXPECT_EQ(OK, node0.node().ClosePort(a0));
    558 
    559   EXPECT_EQ(OK, node0.node().ClosePort(x0));
    560 
    561   EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr));
    562   EXPECT_TRUE(message);
    563   node1.ClosePortsInEvent(message.get());
    564 
    565   EXPECT_EQ(OK, node1.node().ClosePort(x1));
    566 
    567   WaitForIdle();
    568 
    569   EXPECT_TRUE(node0.node().CanShutdownCleanly());
    570   EXPECT_TRUE(node1.node().CanShutdownCleanly());
    571 }
    572 
    573 TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
    574   // Tests that a proxy gets cleaned up when its indirect peer lives on a lost
    575   // node.
    576 
    577   TestNode node0(0);
    578   AddNode(&node0);
    579 
    580   TestNode node1(1);
    581   AddNode(&node1);
    582 
    583   TestNode node2(2);
    584   AddNode(&node2);
    585 
    586   // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
    587   PortRef A, B, C, D;
    588   CreatePortPair(&node0, &A, &node1, &B);
    589   CreatePortPair(&node1, &C, &node2, &D);
    590 
    591   // Create E-F and send F over A to node 1.
    592   PortRef E, F;
    593   EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
    594   EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));
    595 
    596   WaitForIdle();
    597 
    598   ScopedMessage message;
    599   ASSERT_TRUE(node1.ReadMessage(B, &message));
    600   ASSERT_EQ(1u, message->num_ports());
    601 
    602   EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));
    603 
    604   // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
    605   // will trivially become aware of the loss, and this test verifies that the
    606   // port A on node 0 will eventually also become aware of it.
    607 
    608   // Make sure node2 stops processing events when it encounters an ObserveProxy.
    609   node2.BlockOnEvent(Event::Type::kObserveProxy);
    610 
    611   EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
    612   WaitForIdle();
    613 
    614   // Simulate node 1 and 2 disconnecting.
    615   EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));
    616 
    617   // Let node2 continue processing events and wait for everyone to go idle.
    618   node2.Unblock();
    619   WaitForIdle();
    620 
    621   // Port F should be gone.
    622   EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));
    623 
    624   // Port E should have detected peer closure despite the fact that there is
    625   // no longer a continuous route from F to E over which the event could travel.
    626   PortStatus status;
    627   EXPECT_EQ(OK, node0.node().GetStatus(E, &status));
    628   EXPECT_TRUE(status.peer_closed);
    629 
    630   EXPECT_EQ(OK, node0.node().ClosePort(A));
    631   EXPECT_EQ(OK, node1.node().ClosePort(B));
    632   EXPECT_EQ(OK, node1.node().ClosePort(C));
    633   EXPECT_EQ(OK, node0.node().ClosePort(E));
    634 
    635   WaitForIdle();
    636 
    637   EXPECT_TRUE(node0.node().CanShutdownCleanly());
    638   EXPECT_TRUE(node1.node().CanShutdownCleanly());
    639 }
    640 
    641 TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
    642   // Tests that a proxy gets cleaned up when its direct peer lives on a lost
    643   // node and it's predecessor lives on the same node.
    644 
    645   TestNode node0(0);
    646   AddNode(&node0);
    647 
    648   TestNode node1(1);
    649   AddNode(&node1);
    650 
    651   PortRef A, B;
    652   CreatePortPair(&node0, &A, &node1, &B);
    653 
    654   PortRef C, D;
    655   EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
    656 
    657   // Send D but block node0 on an ObserveProxy event.
    658   node0.BlockOnEvent(Event::Type::kObserveProxy);
    659   EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D));
    660 
    661   // node0 won't collapse the proxy but node1 will receive the message before
    662   // going idle.
    663   WaitForIdle();
    664 
    665   ScopedMessage message;
    666   ASSERT_TRUE(node1.ReadMessage(B, &message));
    667   ASSERT_EQ(1u, message->num_ports());
    668   PortRef E;
    669   EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));
    670 
    671   RemoveNode(&node1);
    672 
    673   node0.Unblock();
    674   WaitForIdle();
    675 
    676   // Port C should have detected peer closure.
    677   PortStatus status;
    678   EXPECT_EQ(OK, node0.node().GetStatus(C, &status));
    679   EXPECT_TRUE(status.peer_closed);
    680 
    681   EXPECT_EQ(OK, node0.node().ClosePort(A));
    682   EXPECT_EQ(OK, node1.node().ClosePort(B));
    683   EXPECT_EQ(OK, node0.node().ClosePort(C));
    684   EXPECT_EQ(OK, node1.node().ClosePort(E));
    685 
    686   EXPECT_TRUE(node0.node().CanShutdownCleanly());
    687   EXPECT_TRUE(node1.node().CanShutdownCleanly());
    688 }
    689 
    690 TEST_F(PortsTest, GetMessage1) {
    691   TestNode node(0);
    692   AddNode(&node);
    693 
    694   PortRef a0, a1;
    695   EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
    696 
    697   ScopedMessage message;
    698   EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
    699   EXPECT_FALSE(message);
    700 
    701   EXPECT_EQ(OK, node.node().ClosePort(a1));
    702 
    703   WaitForIdle();
    704 
    705   EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
    706             node.node().GetMessage(a0, &message, nullptr));
    707   EXPECT_FALSE(message);
    708 
    709   EXPECT_EQ(OK, node.node().ClosePort(a0));
    710 
    711   WaitForIdle();
    712 
    713   EXPECT_TRUE(node.node().CanShutdownCleanly());
    714 }
    715 
    716 TEST_F(PortsTest, GetMessage2) {
    717   TestNode node(0);
    718   AddNode(&node);
    719 
    720   PortRef a0, a1;
    721   EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
    722 
    723   EXPECT_EQ(OK, node.SendStringMessage(a1, "1"));
    724 
    725   ScopedMessage message;
    726   EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
    727 
    728   ASSERT_TRUE(message);
    729   EXPECT_TRUE(MessageEquals(message, "1"));
    730 
    731   EXPECT_EQ(OK, node.node().ClosePort(a0));
    732   EXPECT_EQ(OK, node.node().ClosePort(a1));
    733 
    734   EXPECT_TRUE(node.node().CanShutdownCleanly());
    735 }
    736 
    737 TEST_F(PortsTest, GetMessage3) {
    738   TestNode node(0);
    739   AddNode(&node);
    740 
    741   PortRef a0, a1;
    742   EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
    743 
    744   const char* kStrings[] = {"1", "2", "3"};
    745 
    746   for (size_t i = 0; i < sizeof(kStrings) / sizeof(kStrings[0]); ++i)
    747     EXPECT_EQ(OK, node.SendStringMessage(a1, kStrings[i]));
    748 
    749   ScopedMessage message;
    750   for (size_t i = 0; i < sizeof(kStrings) / sizeof(kStrings[0]); ++i) {
    751     EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
    752     ASSERT_TRUE(message);
    753     EXPECT_TRUE(MessageEquals(message, kStrings[i]));
    754   }
    755 
    756   EXPECT_EQ(OK, node.node().ClosePort(a0));
    757   EXPECT_EQ(OK, node.node().ClosePort(a1));
    758 
    759   EXPECT_TRUE(node.node().CanShutdownCleanly());
    760 }
    761 
    762 TEST_F(PortsTest, Delegation1) {
    763   TestNode node0(0);
    764   AddNode(&node0);
    765 
    766   TestNode node1(1);
    767   AddNode(&node1);
    768 
    769   PortRef x0, x1;
    770   CreatePortPair(&node0, &x0, &node1, &x1);
    771 
    772   // In this test, we send a message to a port that has been moved.
    773 
    774   PortRef a0, a1;
    775   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
    776   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1));
    777   WaitForIdle();
    778 
    779   ScopedMessage message;
    780   ASSERT_TRUE(node1.ReadMessage(x1, &message));
    781   ASSERT_EQ(1u, message->num_ports());
    782   EXPECT_TRUE(MessageEquals(message, "a1"));
    783 
    784   // This is "a1" from the point of view of node1.
    785   PortName a2_name = message->ports()[0];
    786   EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name));
    787   EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello"));
    788 
    789   WaitForIdle();
    790 
    791   ASSERT_TRUE(node0.ReadMessage(x0, &message));
    792   ASSERT_EQ(1u, message->num_ports());
    793   EXPECT_TRUE(MessageEquals(message, "a2"));
    794 
    795   // This is "a2" from the point of view of node1.
    796   PortName a3_name = message->ports()[0];
    797 
    798   PortRef a3;
    799   EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3));
    800 
    801   ASSERT_TRUE(node0.ReadMessage(a3, &message));
    802   EXPECT_EQ(0u, message->num_ports());
    803   EXPECT_TRUE(MessageEquals(message, "hello"));
    804 
    805   EXPECT_EQ(OK, node0.node().ClosePort(a0));
    806   EXPECT_EQ(OK, node0.node().ClosePort(a3));
    807 
    808   EXPECT_EQ(OK, node0.node().ClosePort(x0));
    809   EXPECT_EQ(OK, node1.node().ClosePort(x1));
    810 
    811   EXPECT_TRUE(node0.node().CanShutdownCleanly());
    812   EXPECT_TRUE(node1.node().CanShutdownCleanly());
    813 }
    814 
    815 TEST_F(PortsTest, Delegation2) {
    816   TestNode node0(0);
    817   AddNode(&node0);
    818 
    819   TestNode node1(1);
    820   AddNode(&node1);
    821 
    822   for (int i = 0; i < 100; ++i) {
    823     // Setup pipe a<->b between node0 and node1.
    824     PortRef A, B;
    825     CreatePortPair(&node0, &A, &node1, &B);
    826 
    827     PortRef C, D;
    828     EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
    829 
    830     PortRef E, F;
    831     EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
    832 
    833     node1.set_save_messages(true);
    834 
    835     // Pass D over A to B.
    836     EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D));
    837 
    838     // Pass F over C to D.
    839     EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F));
    840 
    841     // This message should find its way to node1.
    842     EXPECT_EQ(OK, node0.SendStringMessage(E, "hello"));
    843 
    844     WaitForIdle();
    845 
    846     EXPECT_EQ(OK, node0.node().ClosePort(C));
    847     EXPECT_EQ(OK, node0.node().ClosePort(E));
    848 
    849     EXPECT_EQ(OK, node0.node().ClosePort(A));
    850     EXPECT_EQ(OK, node1.node().ClosePort(B));
    851 
    852     bool got_hello = false;
    853     ScopedMessage message;
    854     while (node1.GetSavedMessage(&message)) {
    855       node1.ClosePortsInEvent(message.get());
    856       if (MessageEquals(message, "hello")) {
    857         got_hello = true;
    858         break;
    859       }
    860     }
    861 
    862     EXPECT_TRUE(got_hello);
    863 
    864     WaitForIdle();  // Because closing ports may have generated tasks.
    865   }
    866 
    867   EXPECT_TRUE(node0.node().CanShutdownCleanly());
    868   EXPECT_TRUE(node1.node().CanShutdownCleanly());
    869 }
    870 
    871 TEST_F(PortsTest, SendUninitialized) {
    872   TestNode node(0);
    873   AddNode(&node);
    874 
    875   PortRef x0;
    876   EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0));
    877   EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops"));
    878   EXPECT_EQ(OK, node.node().ClosePort(x0));
    879   EXPECT_TRUE(node.node().CanShutdownCleanly());
    880 }
    881 
    882 TEST_F(PortsTest, SendFailure) {
    883   TestNode node(0);
    884   AddNode(&node);
    885 
    886   node.set_save_messages(true);
    887 
    888   PortRef A, B;
    889   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
    890 
    891   // Try to send A over itself.
    892 
    893   EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
    894             node.SendStringMessageWithPort(A, "oops", A));
    895 
    896   // Try to send B over A.
    897 
    898   EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
    899             node.SendStringMessageWithPort(A, "nope", B));
    900 
    901   // B should be closed immediately.
    902   EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B));
    903 
    904   WaitForIdle();
    905 
    906   // There should have been no messages accepted.
    907   ScopedMessage message;
    908   EXPECT_FALSE(node.GetSavedMessage(&message));
    909 
    910   EXPECT_EQ(OK, node.node().ClosePort(A));
    911 
    912   WaitForIdle();
    913 
    914   EXPECT_TRUE(node.node().CanShutdownCleanly());
    915 }
    916 
    917 TEST_F(PortsTest, DontLeakUnreceivedPorts) {
    918   TestNode node(0);
    919   AddNode(&node);
    920 
    921   PortRef A, B, C, D;
    922   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
    923   EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
    924 
    925   EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
    926 
    927   EXPECT_EQ(OK, node.node().ClosePort(C));
    928   EXPECT_EQ(OK, node.node().ClosePort(A));
    929   EXPECT_EQ(OK, node.node().ClosePort(B));
    930 
    931   WaitForIdle();
    932 
    933   EXPECT_TRUE(node.node().CanShutdownCleanly());
    934 }
    935 
    936 TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
    937   TestNode node(0);
    938   AddNode(&node);
    939 
    940   PortRef A, B, C, D;
    941   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
    942   EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
    943 
    944   EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
    945 
    946   ScopedMessage message;
    947   EXPECT_TRUE(node.ReadMessage(B, &message));
    948   ASSERT_EQ(1u, message->num_ports());
    949   EXPECT_TRUE(MessageEquals(message, "foo"));
    950   PortRef E;
    951   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
    952 
    953   EXPECT_TRUE(
    954       node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
    955 
    956   WaitForIdle();
    957 
    958   EXPECT_TRUE(
    959       node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
    960   EXPECT_FALSE(node.node().CanShutdownCleanly());
    961 
    962   EXPECT_EQ(OK, node.node().ClosePort(A));
    963   EXPECT_EQ(OK, node.node().ClosePort(B));
    964   EXPECT_EQ(OK, node.node().ClosePort(C));
    965   EXPECT_EQ(OK, node.node().ClosePort(E));
    966 
    967   WaitForIdle();
    968 
    969   EXPECT_TRUE(node.node().CanShutdownCleanly());
    970 }
    971 
    972 TEST_F(PortsTest, ProxyCollapse1) {
    973   TestNode node(0);
    974   AddNode(&node);
    975 
    976   PortRef A, B;
    977   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
    978 
    979   PortRef X, Y;
    980   EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
    981 
    982   ScopedMessage message;
    983 
    984   // Send B and receive it as C.
    985   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
    986   ASSERT_TRUE(node.ReadMessage(Y, &message));
    987   ASSERT_EQ(1u, message->num_ports());
    988   PortRef C;
    989   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
    990 
    991   // Send C and receive it as D.
    992   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
    993   ASSERT_TRUE(node.ReadMessage(Y, &message));
    994   ASSERT_EQ(1u, message->num_ports());
    995   PortRef D;
    996   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
    997 
    998   // Send D and receive it as E.
    999   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D));
   1000   ASSERT_TRUE(node.ReadMessage(Y, &message));
   1001   ASSERT_EQ(1u, message->num_ports());
   1002   PortRef E;
   1003   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
   1004 
   1005   EXPECT_EQ(OK, node.node().ClosePort(X));
   1006   EXPECT_EQ(OK, node.node().ClosePort(Y));
   1007 
   1008   EXPECT_EQ(OK, node.node().ClosePort(A));
   1009   EXPECT_EQ(OK, node.node().ClosePort(E));
   1010 
   1011   // The node should not idle until all proxies are collapsed.
   1012   WaitForIdle();
   1013 
   1014   EXPECT_TRUE(node.node().CanShutdownCleanly());
   1015 }
   1016 
   1017 TEST_F(PortsTest, ProxyCollapse2) {
   1018   TestNode node(0);
   1019   AddNode(&node);
   1020 
   1021   PortRef A, B;
   1022   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
   1023 
   1024   PortRef X, Y;
   1025   EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
   1026 
   1027   ScopedMessage message;
   1028 
   1029   // Send B and A to create proxies in each direction.
   1030   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
   1031   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
   1032 
   1033   EXPECT_EQ(OK, node.node().ClosePort(X));
   1034   EXPECT_EQ(OK, node.node().ClosePort(Y));
   1035 
   1036   // At this point we have a scenario with:
   1037   //
   1038   // D -> [B] -> C -> [A]
   1039   //
   1040   // Ensure that the proxies can collapse. The sent ports will be closed
   1041   // eventually as a result of Y's closure.
   1042 
   1043   WaitForIdle();
   1044 
   1045   EXPECT_TRUE(node.node().CanShutdownCleanly());
   1046 }
   1047 
   1048 TEST_F(PortsTest, SendWithClosedPeer) {
   1049   // This tests that if a port is sent when its peer is already known to be
   1050   // closed, the newly created port will be aware of that peer closure, and the
   1051   // proxy will eventually collapse.
   1052 
   1053   TestNode node(0);
   1054   AddNode(&node);
   1055 
   1056   // Send a message from A to B, then close A.
   1057   PortRef A, B;
   1058   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
   1059   EXPECT_EQ(OK, node.SendStringMessage(A, "hey"));
   1060   EXPECT_EQ(OK, node.node().ClosePort(A));
   1061 
   1062   // Now send B over X-Y as new port C.
   1063   PortRef X, Y;
   1064   EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
   1065   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
   1066   ScopedMessage message;
   1067   ASSERT_TRUE(node.ReadMessage(Y, &message));
   1068   ASSERT_EQ(1u, message->num_ports());
   1069   PortRef C;
   1070   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
   1071 
   1072   EXPECT_EQ(OK, node.node().ClosePort(X));
   1073   EXPECT_EQ(OK, node.node().ClosePort(Y));
   1074 
   1075   WaitForIdle();
   1076 
   1077   // C should have received the message originally sent to B, and it should also
   1078   // be aware of A's closure.
   1079 
   1080   ASSERT_TRUE(node.ReadMessage(C, &message));
   1081   EXPECT_TRUE(MessageEquals(message, "hey"));
   1082 
   1083   PortStatus status;
   1084   EXPECT_EQ(OK, node.node().GetStatus(C, &status));
   1085   EXPECT_FALSE(status.receiving_messages);
   1086   EXPECT_FALSE(status.has_messages);
   1087   EXPECT_TRUE(status.peer_closed);
   1088 
   1089   node.node().ClosePort(C);
   1090 
   1091   WaitForIdle();
   1092 
   1093   EXPECT_TRUE(node.node().CanShutdownCleanly());
   1094 }
   1095 
   1096 TEST_F(PortsTest, SendWithClosedPeerSent) {
   1097   // This tests that if a port is closed while some number of proxies are still
   1098   // routing messages (directly or indirectly) to it, that the peer port is
   1099   // eventually notified of the closure, and the dead-end proxies will
   1100   // eventually be removed.
   1101 
   1102   TestNode node(0);
   1103   AddNode(&node);
   1104 
   1105   PortRef X, Y;
   1106   EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
   1107 
   1108   PortRef A, B;
   1109   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
   1110 
   1111   ScopedMessage message;
   1112 
   1113   // Send A as new port C.
   1114   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
   1115 
   1116   ASSERT_TRUE(node.ReadMessage(Y, &message));
   1117   ASSERT_EQ(1u, message->num_ports());
   1118   PortRef C;
   1119   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
   1120 
   1121   // Send C as new port D.
   1122   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
   1123 
   1124   ASSERT_TRUE(node.ReadMessage(Y, &message));
   1125   ASSERT_EQ(1u, message->num_ports());
   1126   PortRef D;
   1127   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
   1128 
   1129   // Send a message to B through D, then close D.
   1130   EXPECT_EQ(OK, node.SendStringMessage(D, "hey"));
   1131   EXPECT_EQ(OK, node.node().ClosePort(D));
   1132 
   1133   // Now send B as new port E.
   1134 
   1135   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
   1136   EXPECT_EQ(OK, node.node().ClosePort(X));
   1137 
   1138   ASSERT_TRUE(node.ReadMessage(Y, &message));
   1139   ASSERT_EQ(1u, message->num_ports());
   1140   PortRef E;
   1141   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
   1142 
   1143   EXPECT_EQ(OK, node.node().ClosePort(Y));
   1144 
   1145   WaitForIdle();
   1146 
   1147   // E should receive the message originally sent to B, and it should also be
   1148   // aware of D's closure.
   1149 
   1150   ASSERT_TRUE(node.ReadMessage(E, &message));
   1151   EXPECT_TRUE(MessageEquals(message, "hey"));
   1152 
   1153   PortStatus status;
   1154   EXPECT_EQ(OK, node.node().GetStatus(E, &status));
   1155   EXPECT_FALSE(status.receiving_messages);
   1156   EXPECT_FALSE(status.has_messages);
   1157   EXPECT_TRUE(status.peer_closed);
   1158 
   1159   EXPECT_EQ(OK, node.node().ClosePort(E));
   1160 
   1161   WaitForIdle();
   1162 
   1163   EXPECT_TRUE(node.node().CanShutdownCleanly());
   1164 }
   1165 
   1166 TEST_F(PortsTest, MergePorts) {
   1167   TestNode node0(0);
   1168   AddNode(&node0);
   1169 
   1170   TestNode node1(1);
   1171   AddNode(&node1);
   1172 
   1173   // Setup two independent port pairs, A-B on node0 and C-D on node1.
   1174   PortRef A, B, C, D;
   1175   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
   1176   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
   1177 
   1178   // Write a message on A.
   1179   EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
   1180 
   1181   // Initiate a merge between B and C.
   1182   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
   1183 
   1184   WaitForIdle();
   1185 
   1186   // Expect all proxies to be gone once idle.
   1187   EXPECT_TRUE(
   1188       node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
   1189   EXPECT_TRUE(
   1190       node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
   1191 
   1192   // Expect D to have received the message sent on A.
   1193   ScopedMessage message;
   1194   ASSERT_TRUE(node1.ReadMessage(D, &message));
   1195   EXPECT_TRUE(MessageEquals(message, "hey"));
   1196 
   1197   EXPECT_EQ(OK, node0.node().ClosePort(A));
   1198   EXPECT_EQ(OK, node1.node().ClosePort(D));
   1199 
   1200   // No more ports should be open.
   1201   EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1202   EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1203 }
   1204 
   1205 TEST_F(PortsTest, MergePortWithClosedPeer1) {
   1206   // This tests that the right thing happens when initiating a merge on a port
   1207   // whose peer has already been closed.
   1208 
   1209   TestNode node0(0);
   1210   AddNode(&node0);
   1211 
   1212   TestNode node1(1);
   1213   AddNode(&node1);
   1214 
   1215   // Setup two independent port pairs, A-B on node0 and C-D on node1.
   1216   PortRef A, B, C, D;
   1217   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
   1218   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
   1219 
   1220   // Write a message on A.
   1221   EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
   1222 
   1223   // Close A.
   1224   EXPECT_EQ(OK, node0.node().ClosePort(A));
   1225 
   1226   // Initiate a merge between B and C.
   1227   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
   1228 
   1229   WaitForIdle();
   1230 
   1231   // Expect all proxies to be gone once idle. node0 should have no ports since
   1232   // A was explicitly closed.
   1233   EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1234   EXPECT_TRUE(
   1235       node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
   1236 
   1237   // Expect D to have received the message sent on A.
   1238   ScopedMessage message;
   1239   ASSERT_TRUE(node1.ReadMessage(D, &message));
   1240   EXPECT_TRUE(MessageEquals(message, "hey"));
   1241 
   1242   EXPECT_EQ(OK, node1.node().ClosePort(D));
   1243 
   1244   // No more ports should be open.
   1245   EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1246   EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1247 }
   1248 
   1249 TEST_F(PortsTest, MergePortWithClosedPeer2) {
   1250   // This tests that the right thing happens when merging into a port whose peer
   1251   // has already been closed.
   1252 
   1253   TestNode node0(0);
   1254   AddNode(&node0);
   1255 
   1256   TestNode node1(1);
   1257   AddNode(&node1);
   1258 
   1259   // Setup two independent port pairs, A-B on node0 and C-D on node1.
   1260   PortRef A, B, C, D;
   1261   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
   1262   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
   1263 
   1264   // Write a message on D and close it.
   1265   EXPECT_EQ(OK, node0.SendStringMessage(D, "hey"));
   1266   EXPECT_EQ(OK, node1.node().ClosePort(D));
   1267 
   1268   // Initiate a merge between B and C.
   1269   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
   1270 
   1271   WaitForIdle();
   1272 
   1273   // Expect all proxies to be gone once idle. node1 should have no ports since
   1274   // D was explicitly closed.
   1275   EXPECT_TRUE(
   1276       node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
   1277   EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1278 
   1279   // Expect A to have received the message sent on D.
   1280   ScopedMessage message;
   1281   ASSERT_TRUE(node0.ReadMessage(A, &message));
   1282   EXPECT_TRUE(MessageEquals(message, "hey"));
   1283 
   1284   EXPECT_EQ(OK, node0.node().ClosePort(A));
   1285 
   1286   // No more ports should be open.
   1287   EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1288   EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1289 }
   1290 
   1291 TEST_F(PortsTest, MergePortsWithClosedPeers) {
   1292   // This tests that no residual ports are left behind if two ports are merged
   1293   // when both of their peers have been closed.
   1294 
   1295   TestNode node0(0);
   1296   AddNode(&node0);
   1297 
   1298   TestNode node1(1);
   1299   AddNode(&node1);
   1300 
   1301   // Setup two independent port pairs, A-B on node0 and C-D on node1.
   1302   PortRef A, B, C, D;
   1303   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
   1304   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
   1305 
   1306   // Close A and D.
   1307   EXPECT_EQ(OK, node0.node().ClosePort(A));
   1308   EXPECT_EQ(OK, node1.node().ClosePort(D));
   1309 
   1310   WaitForIdle();
   1311 
   1312   // Initiate a merge between B and C.
   1313   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
   1314 
   1315   WaitForIdle();
   1316 
   1317   // Expect everything to have gone away.
   1318   EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1319   EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1320 }
   1321 
   1322 TEST_F(PortsTest, MergePortsWithMovedPeers) {
   1323   // This tests that ports can be merged successfully even if their peers are
   1324   // moved around.
   1325 
   1326   TestNode node0(0);
   1327   AddNode(&node0);
   1328 
   1329   TestNode node1(1);
   1330   AddNode(&node1);
   1331 
   1332   // Setup two independent port pairs, A-B on node0 and C-D on node1.
   1333   PortRef A, B, C, D;
   1334   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
   1335   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
   1336 
   1337   // Set up another pair X-Y for moving ports on node0.
   1338   PortRef X, Y;
   1339   EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y));
   1340 
   1341   ScopedMessage message;
   1342 
   1343   // Move A to new port E.
   1344   EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A));
   1345   ASSERT_TRUE(node0.ReadMessage(Y, &message));
   1346   ASSERT_EQ(1u, message->num_ports());
   1347   PortRef E;
   1348   ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
   1349 
   1350   EXPECT_EQ(OK, node0.node().ClosePort(X));
   1351   EXPECT_EQ(OK, node0.node().ClosePort(Y));
   1352 
   1353   // Write messages on E and D.
   1354   EXPECT_EQ(OK, node0.SendStringMessage(E, "hey"));
   1355   EXPECT_EQ(OK, node1.SendStringMessage(D, "hi"));
   1356 
   1357   // Initiate a merge between B and C.
   1358   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
   1359 
   1360   WaitForIdle();
   1361 
   1362   // Expect to receive D's message on E and E's message on D.
   1363   ASSERT_TRUE(node0.ReadMessage(E, &message));
   1364   EXPECT_TRUE(MessageEquals(message, "hi"));
   1365   ASSERT_TRUE(node1.ReadMessage(D, &message));
   1366   EXPECT_TRUE(MessageEquals(message, "hey"));
   1367 
   1368   // Close E and D.
   1369   EXPECT_EQ(OK, node0.node().ClosePort(E));
   1370   EXPECT_EQ(OK, node1.node().ClosePort(D));
   1371 
   1372   WaitForIdle();
   1373 
   1374   // Expect everything to have gone away.
   1375   EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1376   EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1377 }
   1378 
   1379 TEST_F(PortsTest, MergePortsFailsGracefully) {
   1380   // This tests that the system remains in a well-defined state if something
   1381   // goes wrong during port merge.
   1382 
   1383   TestNode node0(0);
   1384   AddNode(&node0);
   1385 
   1386   TestNode node1(1);
   1387   AddNode(&node1);
   1388 
   1389   // Setup two independent port pairs, A-B on node0 and C-D on node1.
   1390   PortRef A, B, C, D;
   1391   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
   1392   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
   1393 
   1394   ScopedMessage message;
   1395   PortRef X, Y;
   1396   EXPECT_EQ(OK, node0.node().CreateUninitializedPort(&X));
   1397   EXPECT_EQ(OK, node1.node().CreateUninitializedPort(&Y));
   1398   EXPECT_EQ(OK, node0.node().InitializePort(X, node1.name(), Y.name()));
   1399   EXPECT_EQ(OK, node1.node().InitializePort(Y, node0.name(), X.name()));
   1400 
   1401   // Block the merge from proceeding until we can do something stupid with port
   1402   // C. This avoids the test logic racing with async merge logic.
   1403   node1.BlockOnEvent(Event::Type::kMergePort);
   1404 
   1405   // Initiate the merge between B and C.
   1406   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
   1407 
   1408   // Move C to a new port E. This is not a sane use of Node's public API but
   1409   // is still hypothetically possible. It allows us to force a merge failure
   1410   // because C will be in an invalid state by the time the merge is processed.
   1411   // As a result, B should be closed.
   1412   EXPECT_EQ(OK, node1.SendStringMessageWithPort(Y, "foo", C));
   1413 
   1414   node1.Unblock();
   1415 
   1416   WaitForIdle();
   1417 
   1418   ASSERT_TRUE(node0.ReadMessage(X, &message));
   1419   ASSERT_EQ(1u, message->num_ports());
   1420   PortRef E;
   1421   ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
   1422 
   1423   EXPECT_EQ(OK, node0.node().ClosePort(X));
   1424   EXPECT_EQ(OK, node1.node().ClosePort(Y));
   1425 
   1426   WaitForIdle();
   1427 
   1428   // C goes away as a result of normal proxy removal. B should have been closed
   1429   // cleanly by the failed MergePorts.
   1430   EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C));
   1431   EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B));
   1432 
   1433   // Close A, D, and E.
   1434   EXPECT_EQ(OK, node0.node().ClosePort(A));
   1435   EXPECT_EQ(OK, node1.node().ClosePort(D));
   1436   EXPECT_EQ(OK, node0.node().ClosePort(E));
   1437 
   1438   WaitForIdle();
   1439 
   1440   // Expect everything to have gone away.
   1441   EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1442   EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1443 }
   1444 
   1445 TEST_F(PortsTest, RemotePeerStatus) {
   1446   TestNode node0(0);
   1447   AddNode(&node0);
   1448 
   1449   TestNode node1(1);
   1450   AddNode(&node1);
   1451 
   1452   // Create a local port pair. Neither port should appear to have a remote peer.
   1453   PortRef a, b;
   1454   PortStatus status;
   1455   node0.node().CreatePortPair(&a, &b);
   1456   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
   1457   EXPECT_FALSE(status.peer_remote);
   1458   ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
   1459   EXPECT_FALSE(status.peer_remote);
   1460 
   1461   // Create a port pair spanning the two nodes. Both spanning ports should
   1462   // immediately appear to have a remote peer.
   1463   PortRef x0, x1;
   1464   CreatePortPair(&node0, &x0, &node1, &x1);
   1465 
   1466   ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
   1467   EXPECT_TRUE(status.peer_remote);
   1468   ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
   1469   EXPECT_TRUE(status.peer_remote);
   1470 
   1471   PortRef x2, x3;
   1472   CreatePortPair(&node0, &x2, &node1, &x3);
   1473 
   1474   // Transfer |b| to |node1| and |x1| to |node0|. i.e., make the local peers
   1475   // remote and the remote peers local.
   1476   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", b));
   1477   EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", x1));
   1478   WaitForIdle();
   1479 
   1480   ScopedMessage message;
   1481   ASSERT_TRUE(node0.ReadMessage(x2, &message));
   1482   ASSERT_EQ(1u, message->num_ports());
   1483   ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &x1));
   1484 
   1485   ASSERT_TRUE(node1.ReadMessage(x3, &message));
   1486   ASSERT_EQ(1u, message->num_ports());
   1487   ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &b));
   1488 
   1489   // Now x0-x1 should be local to node0 and a-b should span the nodes.
   1490   ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
   1491   EXPECT_FALSE(status.peer_remote);
   1492   ASSERT_EQ(OK, node0.node().GetStatus(x1, &status));
   1493   EXPECT_FALSE(status.peer_remote);
   1494   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
   1495   EXPECT_TRUE(status.peer_remote);
   1496   ASSERT_EQ(OK, node1.node().GetStatus(b, &status));
   1497   EXPECT_TRUE(status.peer_remote);
   1498 
   1499   // And swap them back one more time.
   1500   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", x1));
   1501   EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", b));
   1502   WaitForIdle();
   1503 
   1504   ASSERT_TRUE(node0.ReadMessage(x2, &message));
   1505   ASSERT_EQ(1u, message->num_ports());
   1506   ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &b));
   1507 
   1508   ASSERT_TRUE(node1.ReadMessage(x3, &message));
   1509   ASSERT_EQ(1u, message->num_ports());
   1510   ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &x1));
   1511 
   1512   ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
   1513   EXPECT_TRUE(status.peer_remote);
   1514   ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
   1515   EXPECT_TRUE(status.peer_remote);
   1516   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
   1517   EXPECT_FALSE(status.peer_remote);
   1518   ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
   1519   EXPECT_FALSE(status.peer_remote);
   1520 
   1521   EXPECT_EQ(OK, node0.node().ClosePort(x0));
   1522   EXPECT_EQ(OK, node1.node().ClosePort(x1));
   1523   EXPECT_EQ(OK, node0.node().ClosePort(x2));
   1524   EXPECT_EQ(OK, node1.node().ClosePort(x3));
   1525   EXPECT_EQ(OK, node0.node().ClosePort(a));
   1526   EXPECT_EQ(OK, node0.node().ClosePort(b));
   1527 
   1528   EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1529   EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1530 }
   1531 
   1532 TEST_F(PortsTest, RemotePeerStatusAfterLocalPortMerge) {
   1533   TestNode node0(0);
   1534   AddNode(&node0);
   1535 
   1536   TestNode node1(1);
   1537   AddNode(&node1);
   1538 
   1539   // Set up a-b on node0 and c-d spanning node0-node1.
   1540   PortRef a, b, c, d;
   1541   node0.node().CreatePortPair(&a, &b);
   1542   CreatePortPair(&node0, &c, &node1, &d);
   1543 
   1544   PortStatus status;
   1545   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
   1546   EXPECT_FALSE(status.peer_remote);
   1547   ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
   1548   EXPECT_FALSE(status.peer_remote);
   1549   ASSERT_EQ(OK, node0.node().GetStatus(c, &status));
   1550   EXPECT_TRUE(status.peer_remote);
   1551   ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
   1552   EXPECT_TRUE(status.peer_remote);
   1553 
   1554   EXPECT_EQ(OK, node0.node().MergeLocalPorts(b, c));
   1555   WaitForIdle();
   1556 
   1557   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
   1558   EXPECT_TRUE(status.peer_remote);
   1559   ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
   1560   EXPECT_TRUE(status.peer_remote);
   1561 
   1562   EXPECT_EQ(OK, node0.node().ClosePort(a));
   1563   EXPECT_EQ(OK, node1.node().ClosePort(d));
   1564   EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1565   EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1566 }
   1567 
   1568 TEST_F(PortsTest, RemotePeerStatusAfterRemotePortMerge) {
   1569   TestNode node0(0);
   1570   AddNode(&node0);
   1571 
   1572   TestNode node1(1);
   1573   AddNode(&node1);
   1574 
   1575   // Set up a-b on node0 and c-d on node1.
   1576   PortRef a, b, c, d;
   1577   node0.node().CreatePortPair(&a, &b);
   1578   node1.node().CreatePortPair(&c, &d);
   1579 
   1580   PortStatus status;
   1581   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
   1582   EXPECT_FALSE(status.peer_remote);
   1583   ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
   1584   EXPECT_FALSE(status.peer_remote);
   1585   ASSERT_EQ(OK, node1.node().GetStatus(c, &status));
   1586   EXPECT_FALSE(status.peer_remote);
   1587   ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
   1588   EXPECT_FALSE(status.peer_remote);
   1589 
   1590   EXPECT_EQ(OK, node0.node().MergePorts(b, node1.name(), c.name()));
   1591   WaitForIdle();
   1592 
   1593   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
   1594   EXPECT_TRUE(status.peer_remote);
   1595   ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
   1596   EXPECT_TRUE(status.peer_remote);
   1597 
   1598   EXPECT_EQ(OK, node0.node().ClosePort(a));
   1599   EXPECT_EQ(OK, node1.node().ClosePort(d));
   1600   EXPECT_TRUE(node0.node().CanShutdownCleanly());
   1601   EXPECT_TRUE(node1.node().CanShutdownCleanly());
   1602 }
   1603 
   1604 TEST_F(PortsTest, RetransmitUserMessageEvents) {
   1605   // Ensures that user message events can be retransmitted properly.
   1606   TestNode node0(0);
   1607   AddNode(&node0);
   1608 
   1609   PortRef a, b;
   1610   node0.node().CreatePortPair(&a, &b);
   1611 
   1612   // Ping.
   1613   const char* kMessage = "hey";
   1614   ScopedMessage message;
   1615   EXPECT_EQ(OK, node0.SendStringMessage(a, kMessage));
   1616   ASSERT_TRUE(node0.ReadMessage(b, &message));
   1617   EXPECT_TRUE(MessageEquals(message, kMessage));
   1618 
   1619   // Pong.
   1620   EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
   1621   EXPECT_FALSE(message);
   1622   ASSERT_TRUE(node0.ReadMessage(a, &message));
   1623   EXPECT_TRUE(MessageEquals(message, kMessage));
   1624 
   1625   // Ping again.
   1626   EXPECT_EQ(OK, node0.node().SendUserMessage(a, std::move(message)));
   1627   EXPECT_FALSE(message);
   1628   ASSERT_TRUE(node0.ReadMessage(b, &message));
   1629   EXPECT_TRUE(MessageEquals(message, kMessage));
   1630 
   1631   // Pong again!
   1632   EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
   1633   EXPECT_FALSE(message);
   1634   ASSERT_TRUE(node0.ReadMessage(a, &message));
   1635   EXPECT_TRUE(MessageEquals(message, kMessage));
   1636 
   1637   EXPECT_EQ(OK, node0.node().ClosePort(a));
   1638   EXPECT_EQ(OK, node0.node().ClosePort(b));
   1639 }
   1640 
   1641 }  // namespace test
   1642 }  // namespace ports
   1643 }  // namespace core
   1644 }  // namespace mojo
   1645