1 // Copyright 2016 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include <inttypes.h> 6 #include <stdio.h> 7 #include <stdlib.h> 8 #include <string.h> 9 10 #include <map> 11 #include <sstream> 12 #include <utility> 13 14 #include "base/bind.h" 15 #include "base/callback.h" 16 #include "base/containers/queue.h" 17 #include "base/logging.h" 18 #include "base/memory/ref_counted.h" 19 #include "base/strings/string_piece.h" 20 #include "base/strings/stringprintf.h" 21 #include "base/synchronization/lock.h" 22 #include "base/synchronization/waitable_event.h" 23 #include "base/test/scoped_task_environment.h" 24 #include "base/threading/thread.h" 25 #include "mojo/core/ports/event.h" 26 #include "mojo/core/ports/node.h" 27 #include "mojo/core/ports/node_delegate.h" 28 #include "mojo/core/ports/user_message.h" 29 #include "testing/gtest/include/gtest/gtest.h" 30 31 namespace mojo { 32 namespace core { 33 namespace ports { 34 namespace test { 35 36 namespace { 37 38 // TODO(rockot): Remove this unnecessary alias. 39 using ScopedMessage = std::unique_ptr<UserMessageEvent>; 40 41 class TestMessage : public UserMessage { 42 public: 43 static const TypeInfo kUserMessageTypeInfo; 44 45 TestMessage(const base::StringPiece& payload) 46 : UserMessage(&kUserMessageTypeInfo), payload_(payload) {} 47 ~TestMessage() override {} 48 49 const std::string& payload() const { return payload_; } 50 51 private: 52 std::string payload_; 53 }; 54 55 const UserMessage::TypeInfo TestMessage::kUserMessageTypeInfo = {}; 56 57 ScopedMessage NewUserMessageEvent(const base::StringPiece& payload, 58 size_t num_ports) { 59 auto event = std::make_unique<UserMessageEvent>(num_ports); 60 event->AttachMessage(std::make_unique<TestMessage>(payload)); 61 return event; 62 } 63 64 bool MessageEquals(const ScopedMessage& message, const base::StringPiece& s) { 65 return message->GetMessage<TestMessage>()->payload() == s; 66 } 67 68 class TestNode; 69 70 class MessageRouter { 71 public: 72 virtual ~MessageRouter() {} 73 74 virtual void ForwardEvent(TestNode* from_node, 75 const NodeName& node_name, 76 ScopedEvent event) = 0; 77 virtual void BroadcastEvent(TestNode* from_node, ScopedEvent event) = 0; 78 }; 79 80 class TestNode : public NodeDelegate { 81 public: 82 explicit TestNode(uint64_t id) 83 : node_name_(id, 1), 84 node_(node_name_, this), 85 node_thread_(base::StringPrintf("Node %" PRIu64 " thread", id)), 86 events_available_event_( 87 base::WaitableEvent::ResetPolicy::AUTOMATIC, 88 base::WaitableEvent::InitialState::NOT_SIGNALED), 89 idle_event_(base::WaitableEvent::ResetPolicy::MANUAL, 90 base::WaitableEvent::InitialState::SIGNALED) {} 91 92 ~TestNode() override { 93 StopWhenIdle(); 94 node_thread_.Stop(); 95 } 96 97 const NodeName& name() const { return node_name_; } 98 99 // NOTE: Node is thread-safe. 100 Node& node() { return node_; } 101 102 base::WaitableEvent& idle_event() { return idle_event_; } 103 104 bool IsIdle() { 105 base::AutoLock lock(lock_); 106 return started_ && !dispatching_ && 107 (incoming_events_.empty() || (block_on_event_ && blocked_)); 108 } 109 110 void BlockOnEvent(Event::Type type) { 111 base::AutoLock lock(lock_); 112 blocked_event_type_ = type; 113 block_on_event_ = true; 114 } 115 116 void Unblock() { 117 base::AutoLock lock(lock_); 118 block_on_event_ = false; 119 events_available_event_.Signal(); 120 } 121 122 void Start(MessageRouter* router) { 123 router_ = router; 124 node_thread_.Start(); 125 node_thread_.task_runner()->PostTask( 126 FROM_HERE, 127 base::Bind(&TestNode::ProcessEvents, base::Unretained(this))); 128 } 129 130 void StopWhenIdle() { 131 base::AutoLock lock(lock_); 132 should_quit_ = true; 133 events_available_event_.Signal(); 134 } 135 136 void WakeUp() { events_available_event_.Signal(); } 137 138 int SendStringMessage(const PortRef& port, const std::string& s) { 139 return node_.SendUserMessage(port, NewUserMessageEvent(s, 0)); 140 } 141 142 int SendStringMessageWithPort(const PortRef& port, 143 const std::string& s, 144 const PortName& sent_port_name) { 145 auto event = NewUserMessageEvent(s, 1); 146 event->ports()[0] = sent_port_name; 147 return node_.SendUserMessage(port, std::move(event)); 148 } 149 150 int SendStringMessageWithPort(const PortRef& port, 151 const std::string& s, 152 const PortRef& sent_port) { 153 return SendStringMessageWithPort(port, s, sent_port.name()); 154 } 155 156 void set_drop_messages(bool value) { 157 base::AutoLock lock(lock_); 158 drop_messages_ = value; 159 } 160 161 void set_save_messages(bool value) { 162 base::AutoLock lock(lock_); 163 save_messages_ = value; 164 } 165 166 bool ReadMessage(const PortRef& port, ScopedMessage* message) { 167 return node_.GetMessage(port, message, nullptr) == OK && *message; 168 } 169 170 bool GetSavedMessage(ScopedMessage* message) { 171 base::AutoLock lock(lock_); 172 if (saved_messages_.empty()) { 173 message->reset(); 174 return false; 175 } 176 std::swap(*message, saved_messages_.front()); 177 saved_messages_.pop(); 178 return true; 179 } 180 181 void EnqueueEvent(ScopedEvent event) { 182 idle_event_.Reset(); 183 184 // NOTE: This may be called from ForwardMessage and thus must not reenter 185 // |node_|. 186 base::AutoLock lock(lock_); 187 incoming_events_.emplace(std::move(event)); 188 events_available_event_.Signal(); 189 } 190 191 void ForwardEvent(const NodeName& node_name, ScopedEvent event) override { 192 { 193 base::AutoLock lock(lock_); 194 if (drop_messages_) { 195 DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to " 196 << node_name; 197 198 base::AutoUnlock unlock(lock_); 199 ClosePortsInEvent(event.get()); 200 return; 201 } 202 } 203 204 DCHECK(router_); 205 DVLOG(1) << "ForwardEvent from node " << node_name_ << " to " << node_name; 206 router_->ForwardEvent(this, node_name, std::move(event)); 207 } 208 209 void BroadcastEvent(ScopedEvent event) override { 210 router_->BroadcastEvent(this, std::move(event)); 211 } 212 213 void PortStatusChanged(const PortRef& port) override { 214 // The port may be closed, in which case we ignore the notification. 215 base::AutoLock lock(lock_); 216 if (!save_messages_) 217 return; 218 219 for (;;) { 220 ScopedMessage message; 221 { 222 base::AutoUnlock unlock(lock_); 223 if (!ReadMessage(port, &message)) 224 break; 225 } 226 227 saved_messages_.emplace(std::move(message)); 228 } 229 } 230 231 void ClosePortsInEvent(Event* event) { 232 if (event->type() != Event::Type::kUserMessage) 233 return; 234 235 UserMessageEvent* message_event = static_cast<UserMessageEvent*>(event); 236 for (size_t i = 0; i < message_event->num_ports(); ++i) { 237 PortRef port; 238 ASSERT_EQ(OK, node_.GetPort(message_event->ports()[i], &port)); 239 EXPECT_EQ(OK, node_.ClosePort(port)); 240 } 241 } 242 243 private: 244 void ProcessEvents() { 245 for (;;) { 246 events_available_event_.Wait(); 247 base::AutoLock lock(lock_); 248 249 if (should_quit_) 250 return; 251 252 dispatching_ = true; 253 while (!incoming_events_.empty()) { 254 if (block_on_event_ && 255 incoming_events_.front()->type() == blocked_event_type_) { 256 blocked_ = true; 257 // Go idle if we hit a blocked event type. 258 break; 259 } else { 260 blocked_ = false; 261 } 262 ScopedEvent event = std::move(incoming_events_.front()); 263 incoming_events_.pop(); 264 265 // NOTE: AcceptMessage() can re-enter this object to call any of the 266 // NodeDelegate interface methods. 267 base::AutoUnlock unlock(lock_); 268 node_.AcceptEvent(std::move(event)); 269 } 270 271 dispatching_ = false; 272 started_ = true; 273 idle_event_.Signal(); 274 }; 275 } 276 277 const NodeName node_name_; 278 Node node_; 279 MessageRouter* router_ = nullptr; 280 281 base::Thread node_thread_; 282 base::WaitableEvent events_available_event_; 283 base::WaitableEvent idle_event_; 284 285 // Guards fields below. 286 base::Lock lock_; 287 bool started_ = false; 288 bool dispatching_ = false; 289 bool should_quit_ = false; 290 bool drop_messages_ = false; 291 bool save_messages_ = false; 292 bool blocked_ = false; 293 bool block_on_event_ = false; 294 Event::Type blocked_event_type_; 295 base::queue<ScopedEvent> incoming_events_; 296 base::queue<ScopedMessage> saved_messages_; 297 }; 298 299 class PortsTest : public testing::Test, public MessageRouter { 300 public: 301 void AddNode(TestNode* node) { 302 { 303 base::AutoLock lock(lock_); 304 nodes_[node->name()] = node; 305 } 306 node->Start(this); 307 } 308 309 void RemoveNode(TestNode* node) { 310 { 311 base::AutoLock lock(lock_); 312 nodes_.erase(node->name()); 313 } 314 315 for (const auto& entry : nodes_) 316 entry.second->node().LostConnectionToNode(node->name()); 317 } 318 319 // Waits until all known Nodes are idle. Message forwarding and processing 320 // is handled in such a way that idleness is a stable state: once all nodes in 321 // the system are idle, they will remain idle until the test explicitly 322 // initiates some further event (e.g. sending a message, closing a port, or 323 // removing a Node). 324 void WaitForIdle() { 325 for (;;) { 326 base::AutoLock global_lock(global_lock_); 327 bool all_nodes_idle = true; 328 for (const auto& entry : nodes_) { 329 if (!entry.second->IsIdle()) 330 all_nodes_idle = false; 331 entry.second->WakeUp(); 332 } 333 if (all_nodes_idle) 334 return; 335 336 // Wait for any Node to signal that it's idle. 337 base::AutoUnlock global_unlock(global_lock_); 338 std::vector<base::WaitableEvent*> events; 339 for (const auto& entry : nodes_) 340 events.push_back(&entry.second->idle_event()); 341 base::WaitableEvent::WaitMany(events.data(), events.size()); 342 } 343 } 344 345 void CreatePortPair(TestNode* node0, 346 PortRef* port0, 347 TestNode* node1, 348 PortRef* port1) { 349 if (node0 == node1) { 350 EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1)); 351 } else { 352 EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0)); 353 EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1)); 354 EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(), 355 port1->name())); 356 EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(), 357 port0->name())); 358 } 359 } 360 361 private: 362 // MessageRouter: 363 void ForwardEvent(TestNode* from_node, 364 const NodeName& node_name, 365 ScopedEvent event) override { 366 base::AutoLock global_lock(global_lock_); 367 base::AutoLock lock(lock_); 368 // Drop messages from nodes that have been removed. 369 if (nodes_.find(from_node->name()) == nodes_.end()) { 370 from_node->ClosePortsInEvent(event.get()); 371 return; 372 } 373 374 auto it = nodes_.find(node_name); 375 if (it == nodes_.end()) { 376 DVLOG(1) << "Node not found: " << node_name; 377 return; 378 } 379 380 it->second->EnqueueEvent(std::move(event)); 381 } 382 383 void BroadcastEvent(TestNode* from_node, ScopedEvent event) override { 384 base::AutoLock global_lock(global_lock_); 385 base::AutoLock lock(lock_); 386 387 // Drop messages from nodes that have been removed. 388 if (nodes_.find(from_node->name()) == nodes_.end()) 389 return; 390 391 for (const auto& entry : nodes_) { 392 TestNode* node = entry.second; 393 // Broadcast doesn't deliver to the local node. 394 if (node == from_node) 395 continue; 396 node->EnqueueEvent(event->Clone()); 397 } 398 } 399 400 base::test::ScopedTaskEnvironment scoped_task_environment_; 401 402 // Acquired before any operation which makes a Node busy, and before testing 403 // if all nodes are idle. 404 base::Lock global_lock_; 405 406 base::Lock lock_; 407 std::map<NodeName, TestNode*> nodes_; 408 }; 409 410 } // namespace 411 412 TEST_F(PortsTest, Basic1) { 413 TestNode node0(0); 414 AddNode(&node0); 415 416 TestNode node1(1); 417 AddNode(&node1); 418 419 PortRef x0, x1; 420 CreatePortPair(&node0, &x0, &node1, &x1); 421 422 PortRef a0, a1; 423 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); 424 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1)); 425 EXPECT_EQ(OK, node0.node().ClosePort(a0)); 426 427 EXPECT_EQ(OK, node0.node().ClosePort(x0)); 428 EXPECT_EQ(OK, node1.node().ClosePort(x1)); 429 430 WaitForIdle(); 431 432 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 433 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 434 } 435 436 TEST_F(PortsTest, Basic2) { 437 TestNode node0(0); 438 AddNode(&node0); 439 440 TestNode node1(1); 441 AddNode(&node1); 442 443 PortRef x0, x1; 444 CreatePortPair(&node0, &x0, &node1, &x1); 445 446 PortRef b0, b1; 447 EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1)); 448 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1)); 449 EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again")); 450 451 EXPECT_EQ(OK, node0.node().ClosePort(b0)); 452 453 EXPECT_EQ(OK, node0.node().ClosePort(x0)); 454 EXPECT_EQ(OK, node1.node().ClosePort(x1)); 455 456 WaitForIdle(); 457 458 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 459 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 460 } 461 462 TEST_F(PortsTest, Basic3) { 463 TestNode node0(0); 464 AddNode(&node0); 465 466 TestNode node1(1); 467 AddNode(&node1); 468 469 PortRef x0, x1; 470 CreatePortPair(&node0, &x0, &node1, &x1); 471 472 PortRef a0, a1; 473 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); 474 475 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1)); 476 EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again")); 477 478 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0)); 479 480 PortRef b0, b1; 481 EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1)); 482 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1)); 483 EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz")); 484 485 EXPECT_EQ(OK, node0.node().ClosePort(b0)); 486 487 EXPECT_EQ(OK, node0.node().ClosePort(x0)); 488 EXPECT_EQ(OK, node1.node().ClosePort(x1)); 489 490 WaitForIdle(); 491 492 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 493 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 494 } 495 496 TEST_F(PortsTest, LostConnectionToNode1) { 497 TestNode node0(0); 498 AddNode(&node0); 499 500 TestNode node1(1); 501 AddNode(&node1); 502 node1.set_drop_messages(true); 503 504 PortRef x0, x1; 505 CreatePortPair(&node0, &x0, &node1, &x1); 506 507 // Transfer a port to node1 and simulate a lost connection to node1. 508 509 PortRef a0, a1; 510 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); 511 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1)); 512 513 WaitForIdle(); 514 515 RemoveNode(&node1); 516 517 WaitForIdle(); 518 519 EXPECT_EQ(OK, node0.node().ClosePort(a0)); 520 EXPECT_EQ(OK, node0.node().ClosePort(x0)); 521 EXPECT_EQ(OK, node1.node().ClosePort(x1)); 522 523 WaitForIdle(); 524 525 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 526 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 527 } 528 529 TEST_F(PortsTest, LostConnectionToNode2) { 530 TestNode node0(0); 531 AddNode(&node0); 532 533 TestNode node1(1); 534 AddNode(&node1); 535 536 PortRef x0, x1; 537 CreatePortPair(&node0, &x0, &node1, &x1); 538 539 PortRef a0, a1; 540 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); 541 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1)); 542 543 WaitForIdle(); 544 545 node1.set_drop_messages(true); 546 547 RemoveNode(&node1); 548 549 WaitForIdle(); 550 551 // a0 should have eventually detected peer closure after node loss. 552 ScopedMessage message; 553 EXPECT_EQ(ERROR_PORT_PEER_CLOSED, 554 node0.node().GetMessage(a0, &message, nullptr)); 555 EXPECT_FALSE(message); 556 557 EXPECT_EQ(OK, node0.node().ClosePort(a0)); 558 559 EXPECT_EQ(OK, node0.node().ClosePort(x0)); 560 561 EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr)); 562 EXPECT_TRUE(message); 563 node1.ClosePortsInEvent(message.get()); 564 565 EXPECT_EQ(OK, node1.node().ClosePort(x1)); 566 567 WaitForIdle(); 568 569 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 570 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 571 } 572 573 TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) { 574 // Tests that a proxy gets cleaned up when its indirect peer lives on a lost 575 // node. 576 577 TestNode node0(0); 578 AddNode(&node0); 579 580 TestNode node1(1); 581 AddNode(&node1); 582 583 TestNode node2(2); 584 AddNode(&node2); 585 586 // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2. 587 PortRef A, B, C, D; 588 CreatePortPair(&node0, &A, &node1, &B); 589 CreatePortPair(&node1, &C, &node2, &D); 590 591 // Create E-F and send F over A to node 1. 592 PortRef E, F; 593 EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F)); 594 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F)); 595 596 WaitForIdle(); 597 598 ScopedMessage message; 599 ASSERT_TRUE(node1.ReadMessage(B, &message)); 600 ASSERT_EQ(1u, message->num_ports()); 601 602 EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F)); 603 604 // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1 605 // will trivially become aware of the loss, and this test verifies that the 606 // port A on node 0 will eventually also become aware of it. 607 608 // Make sure node2 stops processing events when it encounters an ObserveProxy. 609 node2.BlockOnEvent(Event::Type::kObserveProxy); 610 611 EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F)); 612 WaitForIdle(); 613 614 // Simulate node 1 and 2 disconnecting. 615 EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name())); 616 617 // Let node2 continue processing events and wait for everyone to go idle. 618 node2.Unblock(); 619 WaitForIdle(); 620 621 // Port F should be gone. 622 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F)); 623 624 // Port E should have detected peer closure despite the fact that there is 625 // no longer a continuous route from F to E over which the event could travel. 626 PortStatus status; 627 EXPECT_EQ(OK, node0.node().GetStatus(E, &status)); 628 EXPECT_TRUE(status.peer_closed); 629 630 EXPECT_EQ(OK, node0.node().ClosePort(A)); 631 EXPECT_EQ(OK, node1.node().ClosePort(B)); 632 EXPECT_EQ(OK, node1.node().ClosePort(C)); 633 EXPECT_EQ(OK, node0.node().ClosePort(E)); 634 635 WaitForIdle(); 636 637 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 638 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 639 } 640 641 TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) { 642 // Tests that a proxy gets cleaned up when its direct peer lives on a lost 643 // node and it's predecessor lives on the same node. 644 645 TestNode node0(0); 646 AddNode(&node0); 647 648 TestNode node1(1); 649 AddNode(&node1); 650 651 PortRef A, B; 652 CreatePortPair(&node0, &A, &node1, &B); 653 654 PortRef C, D; 655 EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D)); 656 657 // Send D but block node0 on an ObserveProxy event. 658 node0.BlockOnEvent(Event::Type::kObserveProxy); 659 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D)); 660 661 // node0 won't collapse the proxy but node1 will receive the message before 662 // going idle. 663 WaitForIdle(); 664 665 ScopedMessage message; 666 ASSERT_TRUE(node1.ReadMessage(B, &message)); 667 ASSERT_EQ(1u, message->num_ports()); 668 PortRef E; 669 EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E)); 670 671 RemoveNode(&node1); 672 673 node0.Unblock(); 674 WaitForIdle(); 675 676 // Port C should have detected peer closure. 677 PortStatus status; 678 EXPECT_EQ(OK, node0.node().GetStatus(C, &status)); 679 EXPECT_TRUE(status.peer_closed); 680 681 EXPECT_EQ(OK, node0.node().ClosePort(A)); 682 EXPECT_EQ(OK, node1.node().ClosePort(B)); 683 EXPECT_EQ(OK, node0.node().ClosePort(C)); 684 EXPECT_EQ(OK, node1.node().ClosePort(E)); 685 686 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 687 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 688 } 689 690 TEST_F(PortsTest, GetMessage1) { 691 TestNode node(0); 692 AddNode(&node); 693 694 PortRef a0, a1; 695 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); 696 697 ScopedMessage message; 698 EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); 699 EXPECT_FALSE(message); 700 701 EXPECT_EQ(OK, node.node().ClosePort(a1)); 702 703 WaitForIdle(); 704 705 EXPECT_EQ(ERROR_PORT_PEER_CLOSED, 706 node.node().GetMessage(a0, &message, nullptr)); 707 EXPECT_FALSE(message); 708 709 EXPECT_EQ(OK, node.node().ClosePort(a0)); 710 711 WaitForIdle(); 712 713 EXPECT_TRUE(node.node().CanShutdownCleanly()); 714 } 715 716 TEST_F(PortsTest, GetMessage2) { 717 TestNode node(0); 718 AddNode(&node); 719 720 PortRef a0, a1; 721 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); 722 723 EXPECT_EQ(OK, node.SendStringMessage(a1, "1")); 724 725 ScopedMessage message; 726 EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); 727 728 ASSERT_TRUE(message); 729 EXPECT_TRUE(MessageEquals(message, "1")); 730 731 EXPECT_EQ(OK, node.node().ClosePort(a0)); 732 EXPECT_EQ(OK, node.node().ClosePort(a1)); 733 734 EXPECT_TRUE(node.node().CanShutdownCleanly()); 735 } 736 737 TEST_F(PortsTest, GetMessage3) { 738 TestNode node(0); 739 AddNode(&node); 740 741 PortRef a0, a1; 742 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); 743 744 const char* kStrings[] = {"1", "2", "3"}; 745 746 for (size_t i = 0; i < sizeof(kStrings) / sizeof(kStrings[0]); ++i) 747 EXPECT_EQ(OK, node.SendStringMessage(a1, kStrings[i])); 748 749 ScopedMessage message; 750 for (size_t i = 0; i < sizeof(kStrings) / sizeof(kStrings[0]); ++i) { 751 EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); 752 ASSERT_TRUE(message); 753 EXPECT_TRUE(MessageEquals(message, kStrings[i])); 754 } 755 756 EXPECT_EQ(OK, node.node().ClosePort(a0)); 757 EXPECT_EQ(OK, node.node().ClosePort(a1)); 758 759 EXPECT_TRUE(node.node().CanShutdownCleanly()); 760 } 761 762 TEST_F(PortsTest, Delegation1) { 763 TestNode node0(0); 764 AddNode(&node0); 765 766 TestNode node1(1); 767 AddNode(&node1); 768 769 PortRef x0, x1; 770 CreatePortPair(&node0, &x0, &node1, &x1); 771 772 // In this test, we send a message to a port that has been moved. 773 774 PortRef a0, a1; 775 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); 776 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1)); 777 WaitForIdle(); 778 779 ScopedMessage message; 780 ASSERT_TRUE(node1.ReadMessage(x1, &message)); 781 ASSERT_EQ(1u, message->num_ports()); 782 EXPECT_TRUE(MessageEquals(message, "a1")); 783 784 // This is "a1" from the point of view of node1. 785 PortName a2_name = message->ports()[0]; 786 EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name)); 787 EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello")); 788 789 WaitForIdle(); 790 791 ASSERT_TRUE(node0.ReadMessage(x0, &message)); 792 ASSERT_EQ(1u, message->num_ports()); 793 EXPECT_TRUE(MessageEquals(message, "a2")); 794 795 // This is "a2" from the point of view of node1. 796 PortName a3_name = message->ports()[0]; 797 798 PortRef a3; 799 EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3)); 800 801 ASSERT_TRUE(node0.ReadMessage(a3, &message)); 802 EXPECT_EQ(0u, message->num_ports()); 803 EXPECT_TRUE(MessageEquals(message, "hello")); 804 805 EXPECT_EQ(OK, node0.node().ClosePort(a0)); 806 EXPECT_EQ(OK, node0.node().ClosePort(a3)); 807 808 EXPECT_EQ(OK, node0.node().ClosePort(x0)); 809 EXPECT_EQ(OK, node1.node().ClosePort(x1)); 810 811 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 812 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 813 } 814 815 TEST_F(PortsTest, Delegation2) { 816 TestNode node0(0); 817 AddNode(&node0); 818 819 TestNode node1(1); 820 AddNode(&node1); 821 822 for (int i = 0; i < 100; ++i) { 823 // Setup pipe a<->b between node0 and node1. 824 PortRef A, B; 825 CreatePortPair(&node0, &A, &node1, &B); 826 827 PortRef C, D; 828 EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D)); 829 830 PortRef E, F; 831 EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F)); 832 833 node1.set_save_messages(true); 834 835 // Pass D over A to B. 836 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D)); 837 838 // Pass F over C to D. 839 EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F)); 840 841 // This message should find its way to node1. 842 EXPECT_EQ(OK, node0.SendStringMessage(E, "hello")); 843 844 WaitForIdle(); 845 846 EXPECT_EQ(OK, node0.node().ClosePort(C)); 847 EXPECT_EQ(OK, node0.node().ClosePort(E)); 848 849 EXPECT_EQ(OK, node0.node().ClosePort(A)); 850 EXPECT_EQ(OK, node1.node().ClosePort(B)); 851 852 bool got_hello = false; 853 ScopedMessage message; 854 while (node1.GetSavedMessage(&message)) { 855 node1.ClosePortsInEvent(message.get()); 856 if (MessageEquals(message, "hello")) { 857 got_hello = true; 858 break; 859 } 860 } 861 862 EXPECT_TRUE(got_hello); 863 864 WaitForIdle(); // Because closing ports may have generated tasks. 865 } 866 867 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 868 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 869 } 870 871 TEST_F(PortsTest, SendUninitialized) { 872 TestNode node(0); 873 AddNode(&node); 874 875 PortRef x0; 876 EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0)); 877 EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops")); 878 EXPECT_EQ(OK, node.node().ClosePort(x0)); 879 EXPECT_TRUE(node.node().CanShutdownCleanly()); 880 } 881 882 TEST_F(PortsTest, SendFailure) { 883 TestNode node(0); 884 AddNode(&node); 885 886 node.set_save_messages(true); 887 888 PortRef A, B; 889 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); 890 891 // Try to send A over itself. 892 893 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF, 894 node.SendStringMessageWithPort(A, "oops", A)); 895 896 // Try to send B over A. 897 898 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER, 899 node.SendStringMessageWithPort(A, "nope", B)); 900 901 // B should be closed immediately. 902 EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B)); 903 904 WaitForIdle(); 905 906 // There should have been no messages accepted. 907 ScopedMessage message; 908 EXPECT_FALSE(node.GetSavedMessage(&message)); 909 910 EXPECT_EQ(OK, node.node().ClosePort(A)); 911 912 WaitForIdle(); 913 914 EXPECT_TRUE(node.node().CanShutdownCleanly()); 915 } 916 917 TEST_F(PortsTest, DontLeakUnreceivedPorts) { 918 TestNode node(0); 919 AddNode(&node); 920 921 PortRef A, B, C, D; 922 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); 923 EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D)); 924 925 EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D)); 926 927 EXPECT_EQ(OK, node.node().ClosePort(C)); 928 EXPECT_EQ(OK, node.node().ClosePort(A)); 929 EXPECT_EQ(OK, node.node().ClosePort(B)); 930 931 WaitForIdle(); 932 933 EXPECT_TRUE(node.node().CanShutdownCleanly()); 934 } 935 936 TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) { 937 TestNode node(0); 938 AddNode(&node); 939 940 PortRef A, B, C, D; 941 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); 942 EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D)); 943 944 EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D)); 945 946 ScopedMessage message; 947 EXPECT_TRUE(node.ReadMessage(B, &message)); 948 ASSERT_EQ(1u, message->num_ports()); 949 EXPECT_TRUE(MessageEquals(message, "foo")); 950 PortRef E; 951 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); 952 953 EXPECT_TRUE( 954 node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); 955 956 WaitForIdle(); 957 958 EXPECT_TRUE( 959 node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); 960 EXPECT_FALSE(node.node().CanShutdownCleanly()); 961 962 EXPECT_EQ(OK, node.node().ClosePort(A)); 963 EXPECT_EQ(OK, node.node().ClosePort(B)); 964 EXPECT_EQ(OK, node.node().ClosePort(C)); 965 EXPECT_EQ(OK, node.node().ClosePort(E)); 966 967 WaitForIdle(); 968 969 EXPECT_TRUE(node.node().CanShutdownCleanly()); 970 } 971 972 TEST_F(PortsTest, ProxyCollapse1) { 973 TestNode node(0); 974 AddNode(&node); 975 976 PortRef A, B; 977 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); 978 979 PortRef X, Y; 980 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); 981 982 ScopedMessage message; 983 984 // Send B and receive it as C. 985 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); 986 ASSERT_TRUE(node.ReadMessage(Y, &message)); 987 ASSERT_EQ(1u, message->num_ports()); 988 PortRef C; 989 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); 990 991 // Send C and receive it as D. 992 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C)); 993 ASSERT_TRUE(node.ReadMessage(Y, &message)); 994 ASSERT_EQ(1u, message->num_ports()); 995 PortRef D; 996 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D)); 997 998 // Send D and receive it as E. 999 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D)); 1000 ASSERT_TRUE(node.ReadMessage(Y, &message)); 1001 ASSERT_EQ(1u, message->num_ports()); 1002 PortRef E; 1003 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); 1004 1005 EXPECT_EQ(OK, node.node().ClosePort(X)); 1006 EXPECT_EQ(OK, node.node().ClosePort(Y)); 1007 1008 EXPECT_EQ(OK, node.node().ClosePort(A)); 1009 EXPECT_EQ(OK, node.node().ClosePort(E)); 1010 1011 // The node should not idle until all proxies are collapsed. 1012 WaitForIdle(); 1013 1014 EXPECT_TRUE(node.node().CanShutdownCleanly()); 1015 } 1016 1017 TEST_F(PortsTest, ProxyCollapse2) { 1018 TestNode node(0); 1019 AddNode(&node); 1020 1021 PortRef A, B; 1022 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); 1023 1024 PortRef X, Y; 1025 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); 1026 1027 ScopedMessage message; 1028 1029 // Send B and A to create proxies in each direction. 1030 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); 1031 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A)); 1032 1033 EXPECT_EQ(OK, node.node().ClosePort(X)); 1034 EXPECT_EQ(OK, node.node().ClosePort(Y)); 1035 1036 // At this point we have a scenario with: 1037 // 1038 // D -> [B] -> C -> [A] 1039 // 1040 // Ensure that the proxies can collapse. The sent ports will be closed 1041 // eventually as a result of Y's closure. 1042 1043 WaitForIdle(); 1044 1045 EXPECT_TRUE(node.node().CanShutdownCleanly()); 1046 } 1047 1048 TEST_F(PortsTest, SendWithClosedPeer) { 1049 // This tests that if a port is sent when its peer is already known to be 1050 // closed, the newly created port will be aware of that peer closure, and the 1051 // proxy will eventually collapse. 1052 1053 TestNode node(0); 1054 AddNode(&node); 1055 1056 // Send a message from A to B, then close A. 1057 PortRef A, B; 1058 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); 1059 EXPECT_EQ(OK, node.SendStringMessage(A, "hey")); 1060 EXPECT_EQ(OK, node.node().ClosePort(A)); 1061 1062 // Now send B over X-Y as new port C. 1063 PortRef X, Y; 1064 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); 1065 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); 1066 ScopedMessage message; 1067 ASSERT_TRUE(node.ReadMessage(Y, &message)); 1068 ASSERT_EQ(1u, message->num_ports()); 1069 PortRef C; 1070 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); 1071 1072 EXPECT_EQ(OK, node.node().ClosePort(X)); 1073 EXPECT_EQ(OK, node.node().ClosePort(Y)); 1074 1075 WaitForIdle(); 1076 1077 // C should have received the message originally sent to B, and it should also 1078 // be aware of A's closure. 1079 1080 ASSERT_TRUE(node.ReadMessage(C, &message)); 1081 EXPECT_TRUE(MessageEquals(message, "hey")); 1082 1083 PortStatus status; 1084 EXPECT_EQ(OK, node.node().GetStatus(C, &status)); 1085 EXPECT_FALSE(status.receiving_messages); 1086 EXPECT_FALSE(status.has_messages); 1087 EXPECT_TRUE(status.peer_closed); 1088 1089 node.node().ClosePort(C); 1090 1091 WaitForIdle(); 1092 1093 EXPECT_TRUE(node.node().CanShutdownCleanly()); 1094 } 1095 1096 TEST_F(PortsTest, SendWithClosedPeerSent) { 1097 // This tests that if a port is closed while some number of proxies are still 1098 // routing messages (directly or indirectly) to it, that the peer port is 1099 // eventually notified of the closure, and the dead-end proxies will 1100 // eventually be removed. 1101 1102 TestNode node(0); 1103 AddNode(&node); 1104 1105 PortRef X, Y; 1106 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); 1107 1108 PortRef A, B; 1109 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); 1110 1111 ScopedMessage message; 1112 1113 // Send A as new port C. 1114 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A)); 1115 1116 ASSERT_TRUE(node.ReadMessage(Y, &message)); 1117 ASSERT_EQ(1u, message->num_ports()); 1118 PortRef C; 1119 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); 1120 1121 // Send C as new port D. 1122 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C)); 1123 1124 ASSERT_TRUE(node.ReadMessage(Y, &message)); 1125 ASSERT_EQ(1u, message->num_ports()); 1126 PortRef D; 1127 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D)); 1128 1129 // Send a message to B through D, then close D. 1130 EXPECT_EQ(OK, node.SendStringMessage(D, "hey")); 1131 EXPECT_EQ(OK, node.node().ClosePort(D)); 1132 1133 // Now send B as new port E. 1134 1135 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); 1136 EXPECT_EQ(OK, node.node().ClosePort(X)); 1137 1138 ASSERT_TRUE(node.ReadMessage(Y, &message)); 1139 ASSERT_EQ(1u, message->num_ports()); 1140 PortRef E; 1141 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); 1142 1143 EXPECT_EQ(OK, node.node().ClosePort(Y)); 1144 1145 WaitForIdle(); 1146 1147 // E should receive the message originally sent to B, and it should also be 1148 // aware of D's closure. 1149 1150 ASSERT_TRUE(node.ReadMessage(E, &message)); 1151 EXPECT_TRUE(MessageEquals(message, "hey")); 1152 1153 PortStatus status; 1154 EXPECT_EQ(OK, node.node().GetStatus(E, &status)); 1155 EXPECT_FALSE(status.receiving_messages); 1156 EXPECT_FALSE(status.has_messages); 1157 EXPECT_TRUE(status.peer_closed); 1158 1159 EXPECT_EQ(OK, node.node().ClosePort(E)); 1160 1161 WaitForIdle(); 1162 1163 EXPECT_TRUE(node.node().CanShutdownCleanly()); 1164 } 1165 1166 TEST_F(PortsTest, MergePorts) { 1167 TestNode node0(0); 1168 AddNode(&node0); 1169 1170 TestNode node1(1); 1171 AddNode(&node1); 1172 1173 // Setup two independent port pairs, A-B on node0 and C-D on node1. 1174 PortRef A, B, C, D; 1175 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); 1176 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); 1177 1178 // Write a message on A. 1179 EXPECT_EQ(OK, node0.SendStringMessage(A, "hey")); 1180 1181 // Initiate a merge between B and C. 1182 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); 1183 1184 WaitForIdle(); 1185 1186 // Expect all proxies to be gone once idle. 1187 EXPECT_TRUE( 1188 node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); 1189 EXPECT_TRUE( 1190 node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); 1191 1192 // Expect D to have received the message sent on A. 1193 ScopedMessage message; 1194 ASSERT_TRUE(node1.ReadMessage(D, &message)); 1195 EXPECT_TRUE(MessageEquals(message, "hey")); 1196 1197 EXPECT_EQ(OK, node0.node().ClosePort(A)); 1198 EXPECT_EQ(OK, node1.node().ClosePort(D)); 1199 1200 // No more ports should be open. 1201 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1202 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1203 } 1204 1205 TEST_F(PortsTest, MergePortWithClosedPeer1) { 1206 // This tests that the right thing happens when initiating a merge on a port 1207 // whose peer has already been closed. 1208 1209 TestNode node0(0); 1210 AddNode(&node0); 1211 1212 TestNode node1(1); 1213 AddNode(&node1); 1214 1215 // Setup two independent port pairs, A-B on node0 and C-D on node1. 1216 PortRef A, B, C, D; 1217 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); 1218 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); 1219 1220 // Write a message on A. 1221 EXPECT_EQ(OK, node0.SendStringMessage(A, "hey")); 1222 1223 // Close A. 1224 EXPECT_EQ(OK, node0.node().ClosePort(A)); 1225 1226 // Initiate a merge between B and C. 1227 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); 1228 1229 WaitForIdle(); 1230 1231 // Expect all proxies to be gone once idle. node0 should have no ports since 1232 // A was explicitly closed. 1233 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1234 EXPECT_TRUE( 1235 node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); 1236 1237 // Expect D to have received the message sent on A. 1238 ScopedMessage message; 1239 ASSERT_TRUE(node1.ReadMessage(D, &message)); 1240 EXPECT_TRUE(MessageEquals(message, "hey")); 1241 1242 EXPECT_EQ(OK, node1.node().ClosePort(D)); 1243 1244 // No more ports should be open. 1245 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1246 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1247 } 1248 1249 TEST_F(PortsTest, MergePortWithClosedPeer2) { 1250 // This tests that the right thing happens when merging into a port whose peer 1251 // has already been closed. 1252 1253 TestNode node0(0); 1254 AddNode(&node0); 1255 1256 TestNode node1(1); 1257 AddNode(&node1); 1258 1259 // Setup two independent port pairs, A-B on node0 and C-D on node1. 1260 PortRef A, B, C, D; 1261 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); 1262 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); 1263 1264 // Write a message on D and close it. 1265 EXPECT_EQ(OK, node0.SendStringMessage(D, "hey")); 1266 EXPECT_EQ(OK, node1.node().ClosePort(D)); 1267 1268 // Initiate a merge between B and C. 1269 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); 1270 1271 WaitForIdle(); 1272 1273 // Expect all proxies to be gone once idle. node1 should have no ports since 1274 // D was explicitly closed. 1275 EXPECT_TRUE( 1276 node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); 1277 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1278 1279 // Expect A to have received the message sent on D. 1280 ScopedMessage message; 1281 ASSERT_TRUE(node0.ReadMessage(A, &message)); 1282 EXPECT_TRUE(MessageEquals(message, "hey")); 1283 1284 EXPECT_EQ(OK, node0.node().ClosePort(A)); 1285 1286 // No more ports should be open. 1287 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1288 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1289 } 1290 1291 TEST_F(PortsTest, MergePortsWithClosedPeers) { 1292 // This tests that no residual ports are left behind if two ports are merged 1293 // when both of their peers have been closed. 1294 1295 TestNode node0(0); 1296 AddNode(&node0); 1297 1298 TestNode node1(1); 1299 AddNode(&node1); 1300 1301 // Setup two independent port pairs, A-B on node0 and C-D on node1. 1302 PortRef A, B, C, D; 1303 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); 1304 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); 1305 1306 // Close A and D. 1307 EXPECT_EQ(OK, node0.node().ClosePort(A)); 1308 EXPECT_EQ(OK, node1.node().ClosePort(D)); 1309 1310 WaitForIdle(); 1311 1312 // Initiate a merge between B and C. 1313 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); 1314 1315 WaitForIdle(); 1316 1317 // Expect everything to have gone away. 1318 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1319 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1320 } 1321 1322 TEST_F(PortsTest, MergePortsWithMovedPeers) { 1323 // This tests that ports can be merged successfully even if their peers are 1324 // moved around. 1325 1326 TestNode node0(0); 1327 AddNode(&node0); 1328 1329 TestNode node1(1); 1330 AddNode(&node1); 1331 1332 // Setup two independent port pairs, A-B on node0 and C-D on node1. 1333 PortRef A, B, C, D; 1334 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); 1335 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); 1336 1337 // Set up another pair X-Y for moving ports on node0. 1338 PortRef X, Y; 1339 EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y)); 1340 1341 ScopedMessage message; 1342 1343 // Move A to new port E. 1344 EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A)); 1345 ASSERT_TRUE(node0.ReadMessage(Y, &message)); 1346 ASSERT_EQ(1u, message->num_ports()); 1347 PortRef E; 1348 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E)); 1349 1350 EXPECT_EQ(OK, node0.node().ClosePort(X)); 1351 EXPECT_EQ(OK, node0.node().ClosePort(Y)); 1352 1353 // Write messages on E and D. 1354 EXPECT_EQ(OK, node0.SendStringMessage(E, "hey")); 1355 EXPECT_EQ(OK, node1.SendStringMessage(D, "hi")); 1356 1357 // Initiate a merge between B and C. 1358 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); 1359 1360 WaitForIdle(); 1361 1362 // Expect to receive D's message on E and E's message on D. 1363 ASSERT_TRUE(node0.ReadMessage(E, &message)); 1364 EXPECT_TRUE(MessageEquals(message, "hi")); 1365 ASSERT_TRUE(node1.ReadMessage(D, &message)); 1366 EXPECT_TRUE(MessageEquals(message, "hey")); 1367 1368 // Close E and D. 1369 EXPECT_EQ(OK, node0.node().ClosePort(E)); 1370 EXPECT_EQ(OK, node1.node().ClosePort(D)); 1371 1372 WaitForIdle(); 1373 1374 // Expect everything to have gone away. 1375 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1376 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1377 } 1378 1379 TEST_F(PortsTest, MergePortsFailsGracefully) { 1380 // This tests that the system remains in a well-defined state if something 1381 // goes wrong during port merge. 1382 1383 TestNode node0(0); 1384 AddNode(&node0); 1385 1386 TestNode node1(1); 1387 AddNode(&node1); 1388 1389 // Setup two independent port pairs, A-B on node0 and C-D on node1. 1390 PortRef A, B, C, D; 1391 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); 1392 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); 1393 1394 ScopedMessage message; 1395 PortRef X, Y; 1396 EXPECT_EQ(OK, node0.node().CreateUninitializedPort(&X)); 1397 EXPECT_EQ(OK, node1.node().CreateUninitializedPort(&Y)); 1398 EXPECT_EQ(OK, node0.node().InitializePort(X, node1.name(), Y.name())); 1399 EXPECT_EQ(OK, node1.node().InitializePort(Y, node0.name(), X.name())); 1400 1401 // Block the merge from proceeding until we can do something stupid with port 1402 // C. This avoids the test logic racing with async merge logic. 1403 node1.BlockOnEvent(Event::Type::kMergePort); 1404 1405 // Initiate the merge between B and C. 1406 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); 1407 1408 // Move C to a new port E. This is not a sane use of Node's public API but 1409 // is still hypothetically possible. It allows us to force a merge failure 1410 // because C will be in an invalid state by the time the merge is processed. 1411 // As a result, B should be closed. 1412 EXPECT_EQ(OK, node1.SendStringMessageWithPort(Y, "foo", C)); 1413 1414 node1.Unblock(); 1415 1416 WaitForIdle(); 1417 1418 ASSERT_TRUE(node0.ReadMessage(X, &message)); 1419 ASSERT_EQ(1u, message->num_ports()); 1420 PortRef E; 1421 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E)); 1422 1423 EXPECT_EQ(OK, node0.node().ClosePort(X)); 1424 EXPECT_EQ(OK, node1.node().ClosePort(Y)); 1425 1426 WaitForIdle(); 1427 1428 // C goes away as a result of normal proxy removal. B should have been closed 1429 // cleanly by the failed MergePorts. 1430 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C)); 1431 EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B)); 1432 1433 // Close A, D, and E. 1434 EXPECT_EQ(OK, node0.node().ClosePort(A)); 1435 EXPECT_EQ(OK, node1.node().ClosePort(D)); 1436 EXPECT_EQ(OK, node0.node().ClosePort(E)); 1437 1438 WaitForIdle(); 1439 1440 // Expect everything to have gone away. 1441 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1442 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1443 } 1444 1445 TEST_F(PortsTest, RemotePeerStatus) { 1446 TestNode node0(0); 1447 AddNode(&node0); 1448 1449 TestNode node1(1); 1450 AddNode(&node1); 1451 1452 // Create a local port pair. Neither port should appear to have a remote peer. 1453 PortRef a, b; 1454 PortStatus status; 1455 node0.node().CreatePortPair(&a, &b); 1456 ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); 1457 EXPECT_FALSE(status.peer_remote); 1458 ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); 1459 EXPECT_FALSE(status.peer_remote); 1460 1461 // Create a port pair spanning the two nodes. Both spanning ports should 1462 // immediately appear to have a remote peer. 1463 PortRef x0, x1; 1464 CreatePortPair(&node0, &x0, &node1, &x1); 1465 1466 ASSERT_EQ(OK, node0.node().GetStatus(x0, &status)); 1467 EXPECT_TRUE(status.peer_remote); 1468 ASSERT_EQ(OK, node1.node().GetStatus(x1, &status)); 1469 EXPECT_TRUE(status.peer_remote); 1470 1471 PortRef x2, x3; 1472 CreatePortPair(&node0, &x2, &node1, &x3); 1473 1474 // Transfer |b| to |node1| and |x1| to |node0|. i.e., make the local peers 1475 // remote and the remote peers local. 1476 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", b)); 1477 EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", x1)); 1478 WaitForIdle(); 1479 1480 ScopedMessage message; 1481 ASSERT_TRUE(node0.ReadMessage(x2, &message)); 1482 ASSERT_EQ(1u, message->num_ports()); 1483 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &x1)); 1484 1485 ASSERT_TRUE(node1.ReadMessage(x3, &message)); 1486 ASSERT_EQ(1u, message->num_ports()); 1487 ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &b)); 1488 1489 // Now x0-x1 should be local to node0 and a-b should span the nodes. 1490 ASSERT_EQ(OK, node0.node().GetStatus(x0, &status)); 1491 EXPECT_FALSE(status.peer_remote); 1492 ASSERT_EQ(OK, node0.node().GetStatus(x1, &status)); 1493 EXPECT_FALSE(status.peer_remote); 1494 ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); 1495 EXPECT_TRUE(status.peer_remote); 1496 ASSERT_EQ(OK, node1.node().GetStatus(b, &status)); 1497 EXPECT_TRUE(status.peer_remote); 1498 1499 // And swap them back one more time. 1500 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", x1)); 1501 EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", b)); 1502 WaitForIdle(); 1503 1504 ASSERT_TRUE(node0.ReadMessage(x2, &message)); 1505 ASSERT_EQ(1u, message->num_ports()); 1506 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &b)); 1507 1508 ASSERT_TRUE(node1.ReadMessage(x3, &message)); 1509 ASSERT_EQ(1u, message->num_ports()); 1510 ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &x1)); 1511 1512 ASSERT_EQ(OK, node0.node().GetStatus(x0, &status)); 1513 EXPECT_TRUE(status.peer_remote); 1514 ASSERT_EQ(OK, node1.node().GetStatus(x1, &status)); 1515 EXPECT_TRUE(status.peer_remote); 1516 ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); 1517 EXPECT_FALSE(status.peer_remote); 1518 ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); 1519 EXPECT_FALSE(status.peer_remote); 1520 1521 EXPECT_EQ(OK, node0.node().ClosePort(x0)); 1522 EXPECT_EQ(OK, node1.node().ClosePort(x1)); 1523 EXPECT_EQ(OK, node0.node().ClosePort(x2)); 1524 EXPECT_EQ(OK, node1.node().ClosePort(x3)); 1525 EXPECT_EQ(OK, node0.node().ClosePort(a)); 1526 EXPECT_EQ(OK, node0.node().ClosePort(b)); 1527 1528 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1529 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1530 } 1531 1532 TEST_F(PortsTest, RemotePeerStatusAfterLocalPortMerge) { 1533 TestNode node0(0); 1534 AddNode(&node0); 1535 1536 TestNode node1(1); 1537 AddNode(&node1); 1538 1539 // Set up a-b on node0 and c-d spanning node0-node1. 1540 PortRef a, b, c, d; 1541 node0.node().CreatePortPair(&a, &b); 1542 CreatePortPair(&node0, &c, &node1, &d); 1543 1544 PortStatus status; 1545 ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); 1546 EXPECT_FALSE(status.peer_remote); 1547 ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); 1548 EXPECT_FALSE(status.peer_remote); 1549 ASSERT_EQ(OK, node0.node().GetStatus(c, &status)); 1550 EXPECT_TRUE(status.peer_remote); 1551 ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); 1552 EXPECT_TRUE(status.peer_remote); 1553 1554 EXPECT_EQ(OK, node0.node().MergeLocalPorts(b, c)); 1555 WaitForIdle(); 1556 1557 ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); 1558 EXPECT_TRUE(status.peer_remote); 1559 ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); 1560 EXPECT_TRUE(status.peer_remote); 1561 1562 EXPECT_EQ(OK, node0.node().ClosePort(a)); 1563 EXPECT_EQ(OK, node1.node().ClosePort(d)); 1564 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1565 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1566 } 1567 1568 TEST_F(PortsTest, RemotePeerStatusAfterRemotePortMerge) { 1569 TestNode node0(0); 1570 AddNode(&node0); 1571 1572 TestNode node1(1); 1573 AddNode(&node1); 1574 1575 // Set up a-b on node0 and c-d on node1. 1576 PortRef a, b, c, d; 1577 node0.node().CreatePortPair(&a, &b); 1578 node1.node().CreatePortPair(&c, &d); 1579 1580 PortStatus status; 1581 ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); 1582 EXPECT_FALSE(status.peer_remote); 1583 ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); 1584 EXPECT_FALSE(status.peer_remote); 1585 ASSERT_EQ(OK, node1.node().GetStatus(c, &status)); 1586 EXPECT_FALSE(status.peer_remote); 1587 ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); 1588 EXPECT_FALSE(status.peer_remote); 1589 1590 EXPECT_EQ(OK, node0.node().MergePorts(b, node1.name(), c.name())); 1591 WaitForIdle(); 1592 1593 ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); 1594 EXPECT_TRUE(status.peer_remote); 1595 ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); 1596 EXPECT_TRUE(status.peer_remote); 1597 1598 EXPECT_EQ(OK, node0.node().ClosePort(a)); 1599 EXPECT_EQ(OK, node1.node().ClosePort(d)); 1600 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1601 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1602 } 1603 1604 TEST_F(PortsTest, RetransmitUserMessageEvents) { 1605 // Ensures that user message events can be retransmitted properly. 1606 TestNode node0(0); 1607 AddNode(&node0); 1608 1609 PortRef a, b; 1610 node0.node().CreatePortPair(&a, &b); 1611 1612 // Ping. 1613 const char* kMessage = "hey"; 1614 ScopedMessage message; 1615 EXPECT_EQ(OK, node0.SendStringMessage(a, kMessage)); 1616 ASSERT_TRUE(node0.ReadMessage(b, &message)); 1617 EXPECT_TRUE(MessageEquals(message, kMessage)); 1618 1619 // Pong. 1620 EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message))); 1621 EXPECT_FALSE(message); 1622 ASSERT_TRUE(node0.ReadMessage(a, &message)); 1623 EXPECT_TRUE(MessageEquals(message, kMessage)); 1624 1625 // Ping again. 1626 EXPECT_EQ(OK, node0.node().SendUserMessage(a, std::move(message))); 1627 EXPECT_FALSE(message); 1628 ASSERT_TRUE(node0.ReadMessage(b, &message)); 1629 EXPECT_TRUE(MessageEquals(message, kMessage)); 1630 1631 // Pong again! 1632 EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message))); 1633 EXPECT_FALSE(message); 1634 ASSERT_TRUE(node0.ReadMessage(a, &message)); 1635 EXPECT_TRUE(MessageEquals(message, kMessage)); 1636 1637 EXPECT_EQ(OK, node0.node().ClosePort(a)); 1638 EXPECT_EQ(OK, node0.node().ClosePort(b)); 1639 } 1640 1641 } // namespace test 1642 } // namespace ports 1643 } // namespace core 1644 } // namespace mojo 1645