1 // Copyright 2016 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/edk/system/ports/node.h" 6 7 #include <string.h> 8 9 #include <utility> 10 11 #include "base/logging.h" 12 #include "base/memory/ref_counted.h" 13 #include "base/synchronization/lock.h" 14 #include "mojo/edk/system/ports/node_delegate.h" 15 16 namespace mojo { 17 namespace edk { 18 namespace ports { 19 20 namespace { 21 22 int DebugError(const char* message, int error_code) { 23 CHECK(false) << "Oops: " << message; 24 return error_code; 25 } 26 27 #define OOPS(x) DebugError(#x, x) 28 29 bool CanAcceptMoreMessages(const Port* port) { 30 // Have we already doled out the last message (i.e., do we expect to NOT 31 // receive further messages)? 32 uint64_t next_sequence_num = port->message_queue.next_sequence_num(); 33 if (port->state == Port::kClosed) 34 return false; 35 if (port->peer_closed || port->remove_proxy_on_last_message) { 36 if (port->last_sequence_num_to_receive == next_sequence_num - 1) 37 return false; 38 } 39 return true; 40 } 41 42 } // namespace 43 44 class Node::LockedPort { 45 public: 46 explicit LockedPort(Port* port) : port_(port) { 47 port_->lock.AssertAcquired(); 48 } 49 50 Port* get() const { return port_; } 51 Port* operator->() const { return port_; } 52 53 private: 54 Port* const port_; 55 }; 56 57 Node::Node(const NodeName& name, NodeDelegate* delegate) 58 : name_(name), 59 delegate_(delegate) { 60 } 61 62 Node::~Node() { 63 if (!ports_.empty()) 64 DLOG(WARNING) << "Unclean shutdown for node " << name_; 65 } 66 67 bool Node::CanShutdownCleanly(bool allow_local_ports) { 68 base::AutoLock ports_lock(ports_lock_); 69 70 if (!allow_local_ports) { 71 #if DCHECK_IS_ON() 72 for (auto entry : ports_) { 73 DVLOG(2) << "Port " << entry.first << " referencing node " 74 << entry.second->peer_node_name << " is blocking shutdown of " 75 << "node " << name_ << " (state=" << entry.second->state << ")"; 76 } 77 #endif 78 return ports_.empty(); 79 } 80 81 // NOTE: This is not efficient, though it probably doesn't need to be since 82 // relatively few ports should be open during shutdown and shutdown doesn't 83 // need to be blazingly fast. 84 bool can_shutdown = true; 85 for (auto entry : ports_) { 86 base::AutoLock lock(entry.second->lock); 87 if (entry.second->peer_node_name != name_ && 88 entry.second->state != Port::kReceiving) { 89 can_shutdown = false; 90 #if DCHECK_IS_ON() 91 DVLOG(2) << "Port " << entry.first << " referencing node " 92 << entry.second->peer_node_name << " is blocking shutdown of " 93 << "node " << name_ << " (state=" << entry.second->state << ")"; 94 #else 95 // Exit early when not debugging. 96 break; 97 #endif 98 } 99 } 100 101 return can_shutdown; 102 } 103 104 int Node::GetPort(const PortName& port_name, PortRef* port_ref) { 105 scoped_refptr<Port> port = GetPort(port_name); 106 if (!port) 107 return ERROR_PORT_UNKNOWN; 108 109 *port_ref = PortRef(port_name, std::move(port)); 110 return OK; 111 } 112 113 int Node::CreateUninitializedPort(PortRef* port_ref) { 114 PortName port_name; 115 delegate_->GenerateRandomPortName(&port_name); 116 117 scoped_refptr<Port> port = make_scoped_refptr(new Port(kInitialSequenceNum, 118 kInitialSequenceNum)); 119 int rv = AddPortWithName(port_name, port); 120 if (rv != OK) 121 return rv; 122 123 *port_ref = PortRef(port_name, std::move(port)); 124 return OK; 125 } 126 127 int Node::InitializePort(const PortRef& port_ref, 128 const NodeName& peer_node_name, 129 const PortName& peer_port_name) { 130 Port* port = port_ref.port(); 131 132 { 133 base::AutoLock lock(port->lock); 134 if (port->state != Port::kUninitialized) 135 return ERROR_PORT_STATE_UNEXPECTED; 136 137 port->state = Port::kReceiving; 138 port->peer_node_name = peer_node_name; 139 port->peer_port_name = peer_port_name; 140 } 141 142 delegate_->PortStatusChanged(port_ref); 143 144 return OK; 145 } 146 147 int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) { 148 int rv; 149 150 rv = CreateUninitializedPort(port0_ref); 151 if (rv != OK) 152 return rv; 153 154 rv = CreateUninitializedPort(port1_ref); 155 if (rv != OK) 156 return rv; 157 158 rv = InitializePort(*port0_ref, name_, port1_ref->name()); 159 if (rv != OK) 160 return rv; 161 162 rv = InitializePort(*port1_ref, name_, port0_ref->name()); 163 if (rv != OK) 164 return rv; 165 166 return OK; 167 } 168 169 int Node::SetUserData(const PortRef& port_ref, 170 const scoped_refptr<UserData>& user_data) { 171 Port* port = port_ref.port(); 172 173 base::AutoLock lock(port->lock); 174 if (port->state == Port::kClosed) 175 return ERROR_PORT_STATE_UNEXPECTED; 176 177 port->user_data = std::move(user_data); 178 179 return OK; 180 } 181 182 int Node::GetUserData(const PortRef& port_ref, 183 scoped_refptr<UserData>* user_data) { 184 Port* port = port_ref.port(); 185 186 base::AutoLock lock(port->lock); 187 if (port->state == Port::kClosed) 188 return ERROR_PORT_STATE_UNEXPECTED; 189 190 *user_data = port->user_data; 191 192 return OK; 193 } 194 195 int Node::ClosePort(const PortRef& port_ref) { 196 std::deque<PortName> referenced_port_names; 197 198 ObserveClosureEventData data; 199 200 NodeName peer_node_name; 201 PortName peer_port_name; 202 Port* port = port_ref.port(); 203 { 204 // We may need to erase the port, which requires ports_lock_ to be held, 205 // but ports_lock_ must be acquired before any individual port locks. 206 base::AutoLock ports_lock(ports_lock_); 207 208 base::AutoLock lock(port->lock); 209 if (port->state == Port::kUninitialized) { 210 // If the port was not yet initialized, there's nothing interesting to do. 211 ErasePort_Locked(port_ref.name()); 212 return OK; 213 } 214 215 if (port->state != Port::kReceiving) 216 return ERROR_PORT_STATE_UNEXPECTED; 217 218 port->state = Port::kClosed; 219 220 // We pass along the sequence number of the last message sent from this 221 // port to allow the peer to have the opportunity to consume all inbound 222 // messages before notifying the embedder that this port is closed. 223 data.last_sequence_num = port->next_sequence_num_to_send - 1; 224 225 peer_node_name = port->peer_node_name; 226 peer_port_name = port->peer_port_name; 227 228 // If the port being closed still has unread messages, then we need to take 229 // care to close those ports so as to avoid leaking memory. 230 port->message_queue.GetReferencedPorts(&referenced_port_names); 231 232 ErasePort_Locked(port_ref.name()); 233 } 234 235 DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@" << name_ 236 << " to " << peer_port_name << "@" << peer_node_name; 237 238 delegate_->ForwardMessage( 239 peer_node_name, 240 NewInternalMessage(peer_port_name, EventType::kObserveClosure, data)); 241 242 for (const auto& name : referenced_port_names) { 243 PortRef ref; 244 if (GetPort(name, &ref) == OK) 245 ClosePort(ref); 246 } 247 return OK; 248 } 249 250 int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) { 251 Port* port = port_ref.port(); 252 253 base::AutoLock lock(port->lock); 254 255 if (port->state != Port::kReceiving) 256 return ERROR_PORT_STATE_UNEXPECTED; 257 258 port_status->has_messages = port->message_queue.HasNextMessage(); 259 port_status->receiving_messages = CanAcceptMoreMessages(port); 260 port_status->peer_closed = port->peer_closed; 261 return OK; 262 } 263 264 int Node::GetMessage(const PortRef& port_ref, ScopedMessage* message) { 265 return GetMessageIf(port_ref, nullptr, message); 266 } 267 268 int Node::GetMessageIf(const PortRef& port_ref, 269 std::function<bool(const Message&)> selector, 270 ScopedMessage* message) { 271 *message = nullptr; 272 273 DVLOG(2) << "GetMessageIf for " << port_ref.name() << "@" << name_; 274 275 Port* port = port_ref.port(); 276 { 277 base::AutoLock lock(port->lock); 278 279 // This could also be treated like the port being unknown since the 280 // embedder should no longer be referring to a port that has been sent. 281 if (port->state != Port::kReceiving) 282 return ERROR_PORT_STATE_UNEXPECTED; 283 284 // Let the embedder get messages until there are no more before reporting 285 // that the peer closed its end. 286 if (!CanAcceptMoreMessages(port)) 287 return ERROR_PORT_PEER_CLOSED; 288 289 port->message_queue.GetNextMessageIf(std::move(selector), message); 290 } 291 292 // Allow referenced ports to trigger PortStatusChanged calls. 293 if (*message) { 294 for (size_t i = 0; i < (*message)->num_ports(); ++i) { 295 const PortName& new_port_name = (*message)->ports()[i]; 296 scoped_refptr<Port> new_port = GetPort(new_port_name); 297 298 DCHECK(new_port) << "Port " << new_port_name << "@" << name_ 299 << " does not exist!"; 300 301 base::AutoLock lock(new_port->lock); 302 303 DCHECK(new_port->state == Port::kReceiving); 304 new_port->message_queue.set_signalable(true); 305 } 306 } 307 308 return OK; 309 } 310 311 int Node::SendMessage(const PortRef& port_ref, ScopedMessage message) { 312 int rv = SendMessageInternal(port_ref, &message); 313 if (rv != OK) { 314 // If send failed, close all carried ports. Note that we're careful not to 315 // close the sending port itself if it happened to be one of the encoded 316 // ports (an invalid but possible condition.) 317 for (size_t i = 0; i < message->num_ports(); ++i) { 318 if (message->ports()[i] == port_ref.name()) 319 continue; 320 321 PortRef port; 322 if (GetPort(message->ports()[i], &port) == OK) 323 ClosePort(port); 324 } 325 } 326 return rv; 327 } 328 329 int Node::AcceptMessage(ScopedMessage message) { 330 const EventHeader* header = GetEventHeader(*message); 331 switch (header->type) { 332 case EventType::kUser: 333 return OnUserMessage(std::move(message)); 334 case EventType::kPortAccepted: 335 return OnPortAccepted(header->port_name); 336 case EventType::kObserveProxy: 337 return OnObserveProxy( 338 header->port_name, 339 *GetEventData<ObserveProxyEventData>(*message)); 340 case EventType::kObserveProxyAck: 341 return OnObserveProxyAck( 342 header->port_name, 343 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num); 344 case EventType::kObserveClosure: 345 return OnObserveClosure( 346 header->port_name, 347 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); 348 case EventType::kMergePort: 349 return OnMergePort(header->port_name, 350 *GetEventData<MergePortEventData>(*message)); 351 } 352 return OOPS(ERROR_NOT_IMPLEMENTED); 353 } 354 355 int Node::MergePorts(const PortRef& port_ref, 356 const NodeName& destination_node_name, 357 const PortName& destination_port_name) { 358 Port* port = port_ref.port(); 359 MergePortEventData data; 360 { 361 base::AutoLock lock(port->lock); 362 363 DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_ 364 << " to " << destination_port_name << "@" << destination_node_name; 365 366 // Send the port-to-merge over to the destination node so it can be merged 367 // into the port cycle atomically there. 368 data.new_port_name = port_ref.name(); 369 WillSendPort(LockedPort(port), destination_node_name, &data.new_port_name, 370 &data.new_port_descriptor); 371 } 372 delegate_->ForwardMessage( 373 destination_node_name, 374 NewInternalMessage(destination_port_name, 375 EventType::kMergePort, data)); 376 return OK; 377 } 378 379 int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) { 380 Port* port0 = port0_ref.port(); 381 Port* port1 = port1_ref.port(); 382 int rv; 383 { 384 // |ports_lock_| must be held when acquiring overlapping port locks. 385 base::AutoLock ports_lock(ports_lock_); 386 base::AutoLock port0_lock(port0->lock); 387 base::AutoLock port1_lock(port1->lock); 388 389 DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_ 390 << " and " << port1_ref.name() << "@" << name_; 391 392 if (port0->state != Port::kReceiving || port1->state != Port::kReceiving) 393 rv = ERROR_PORT_STATE_UNEXPECTED; 394 else 395 rv = MergePorts_Locked(port0_ref, port1_ref); 396 } 397 398 if (rv != OK) { 399 ClosePort(port0_ref); 400 ClosePort(port1_ref); 401 } 402 403 return rv; 404 } 405 406 int Node::LostConnectionToNode(const NodeName& node_name) { 407 // We can no longer send events to the given node. We also can't expect any 408 // PortAccepted events. 409 410 DVLOG(1) << "Observing lost connection from node " << name_ 411 << " to node " << node_name; 412 413 DestroyAllPortsWithPeer(node_name, kInvalidPortName); 414 return OK; 415 } 416 417 int Node::OnUserMessage(ScopedMessage message) { 418 PortName port_name = GetEventHeader(*message)->port_name; 419 const auto* event = GetEventData<UserEventData>(*message); 420 421 #if DCHECK_IS_ON() 422 std::ostringstream ports_buf; 423 for (size_t i = 0; i < message->num_ports(); ++i) { 424 if (i > 0) 425 ports_buf << ","; 426 ports_buf << message->ports()[i]; 427 } 428 429 DVLOG(2) << "AcceptMessage " << event->sequence_num 430 << " [ports=" << ports_buf.str() << "] at " 431 << port_name << "@" << name_; 432 #endif 433 434 scoped_refptr<Port> port = GetPort(port_name); 435 436 // Even if this port does not exist, cannot receive anymore messages or is 437 // buffering or proxying messages, we still need these ports to be bound to 438 // this node. When the message is forwarded, these ports will get transferred 439 // following the usual method. If the message cannot be accepted, then the 440 // newly bound ports will simply be closed. 441 442 for (size_t i = 0; i < message->num_ports(); ++i) { 443 int rv = AcceptPort(message->ports()[i], GetPortDescriptors(event)[i]); 444 if (rv != OK) 445 return rv; 446 } 447 448 bool has_next_message = false; 449 bool message_accepted = false; 450 451 if (port) { 452 // We may want to forward messages once the port lock is held, so we must 453 // acquire |ports_lock_| first. 454 base::AutoLock ports_lock(ports_lock_); 455 base::AutoLock lock(port->lock); 456 457 // Reject spurious messages if we've already received the last expected 458 // message. 459 if (CanAcceptMoreMessages(port.get())) { 460 message_accepted = true; 461 port->message_queue.AcceptMessage(std::move(message), &has_next_message); 462 463 if (port->state == Port::kBuffering) { 464 has_next_message = false; 465 } else if (port->state == Port::kProxying) { 466 has_next_message = false; 467 468 // Forward messages. We forward messages in sequential order here so 469 // that we maintain the message queue's notion of next sequence number. 470 // That's useful for the proxy removal process as we can tell when this 471 // port has seen all of the messages it is expected to see. 472 int rv = ForwardMessages_Locked(LockedPort(port.get()), port_name); 473 if (rv != OK) 474 return rv; 475 476 MaybeRemoveProxy_Locked(LockedPort(port.get()), port_name); 477 } 478 } 479 } 480 481 if (!message_accepted) { 482 DVLOG(2) << "Message not accepted!\n"; 483 // Close all newly accepted ports as they are effectively orphaned. 484 for (size_t i = 0; i < message->num_ports(); ++i) { 485 PortRef port_ref; 486 if (GetPort(message->ports()[i], &port_ref) == OK) { 487 ClosePort(port_ref); 488 } else { 489 DLOG(WARNING) << "Cannot close non-existent port!\n"; 490 } 491 } 492 } else if (has_next_message) { 493 PortRef port_ref(port_name, port); 494 delegate_->PortStatusChanged(port_ref); 495 } 496 497 return OK; 498 } 499 500 int Node::OnPortAccepted(const PortName& port_name) { 501 scoped_refptr<Port> port = GetPort(port_name); 502 if (!port) 503 return ERROR_PORT_UNKNOWN; 504 505 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_ 506 << " pointing to " 507 << port->peer_port_name << "@" << port->peer_node_name; 508 509 return BeginProxying(PortRef(port_name, port)); 510 } 511 512 int Node::OnObserveProxy(const PortName& port_name, 513 const ObserveProxyEventData& event) { 514 if (port_name == kInvalidPortName) { 515 // An ObserveProxy with an invalid target port name is a broadcast used to 516 // inform ports when their peer (which was itself a proxy) has become 517 // defunct due to unexpected node disconnection. 518 // 519 // Receiving ports affected by this treat it as equivalent to peer closure. 520 // Proxies affected by this can be removed and will in turn broadcast their 521 // own death with a similar message. 522 CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName); 523 CHECK_EQ(event.proxy_to_port_name, kInvalidPortName); 524 DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name); 525 return OK; 526 } 527 528 // The port may have already been closed locally, in which case the 529 // ObserveClosure message will contain the last_sequence_num field. 530 // We can then silently ignore this message. 531 scoped_refptr<Port> port = GetPort(port_name); 532 if (!port) { 533 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; 534 535 if (port_name != event.proxy_port_name && 536 port_name != event.proxy_to_port_name) { 537 // The receiving port may have been removed while this message was in 538 // transit. In this case, we restart the ObserveProxy circulation from 539 // the referenced proxy port to avoid leaking the proxy. 540 delegate_->ForwardMessage( 541 event.proxy_node_name, 542 NewInternalMessage( 543 event.proxy_port_name, EventType::kObserveProxy, event)); 544 } 545 return OK; 546 } 547 548 DVLOG(2) << "ObserveProxy at " << port_name << "@" << name_ << ", proxy at " 549 << event.proxy_port_name << "@" 550 << event.proxy_node_name << " pointing to " 551 << event.proxy_to_port_name << "@" 552 << event.proxy_to_node_name; 553 554 { 555 base::AutoLock lock(port->lock); 556 557 if (port->peer_node_name == event.proxy_node_name && 558 port->peer_port_name == event.proxy_port_name) { 559 if (port->state == Port::kReceiving) { 560 port->peer_node_name = event.proxy_to_node_name; 561 port->peer_port_name = event.proxy_to_port_name; 562 563 ObserveProxyAckEventData ack; 564 ack.last_sequence_num = port->next_sequence_num_to_send - 1; 565 566 delegate_->ForwardMessage( 567 event.proxy_node_name, 568 NewInternalMessage(event.proxy_port_name, 569 EventType::kObserveProxyAck, 570 ack)); 571 } else { 572 // As a proxy ourselves, we don't know how to honor the ObserveProxy 573 // event or to populate the last_sequence_num field of ObserveProxyAck. 574 // Afterall, another port could be sending messages to our peer now 575 // that we've sent out our own ObserveProxy event. Instead, we will 576 // send an ObserveProxyAck indicating that the ObserveProxy event 577 // should be re-sent (last_sequence_num set to kInvalidSequenceNum). 578 // However, this has to be done after we are removed as a proxy. 579 // Otherwise, we might just find ourselves back here again, which 580 // would be akin to a busy loop. 581 582 DVLOG(2) << "Delaying ObserveProxyAck to " 583 << event.proxy_port_name << "@" << event.proxy_node_name; 584 585 ObserveProxyAckEventData ack; 586 ack.last_sequence_num = kInvalidSequenceNum; 587 588 port->send_on_proxy_removal.reset( 589 new std::pair<NodeName, ScopedMessage>( 590 event.proxy_node_name, 591 NewInternalMessage(event.proxy_port_name, 592 EventType::kObserveProxyAck, 593 ack))); 594 } 595 } else { 596 // Forward this event along to our peer. Eventually, it should find the 597 // port referring to the proxy. 598 delegate_->ForwardMessage( 599 port->peer_node_name, 600 NewInternalMessage(port->peer_port_name, 601 EventType::kObserveProxy, 602 event)); 603 } 604 } 605 return OK; 606 } 607 608 int Node::OnObserveProxyAck(const PortName& port_name, 609 uint64_t last_sequence_num) { 610 DVLOG(2) << "ObserveProxyAck at " << port_name << "@" << name_ 611 << " (last_sequence_num=" << last_sequence_num << ")"; 612 613 scoped_refptr<Port> port = GetPort(port_name); 614 if (!port) 615 return ERROR_PORT_UNKNOWN; // The port may have observed closure first, so 616 // this is not an "Oops". 617 618 { 619 base::AutoLock lock(port->lock); 620 621 if (port->state != Port::kProxying) 622 return OOPS(ERROR_PORT_STATE_UNEXPECTED); 623 624 if (last_sequence_num == kInvalidSequenceNum) { 625 // Send again. 626 InitiateProxyRemoval(LockedPort(port.get()), port_name); 627 return OK; 628 } 629 630 // We can now remove this port once we have received and forwarded the last 631 // message addressed to this port. 632 port->remove_proxy_on_last_message = true; 633 port->last_sequence_num_to_receive = last_sequence_num; 634 } 635 TryRemoveProxy(PortRef(port_name, port)); 636 return OK; 637 } 638 639 int Node::OnObserveClosure(const PortName& port_name, 640 uint64_t last_sequence_num) { 641 // OK if the port doesn't exist, as it may have been closed already. 642 scoped_refptr<Port> port = GetPort(port_name); 643 if (!port) 644 return OK; 645 646 // This message tells the port that it should no longer expect more messages 647 // beyond last_sequence_num. This message is forwarded along until we reach 648 // the receiving end, and this message serves as an equivalent to 649 // ObserveProxyAck. 650 651 bool notify_delegate = false; 652 ObserveClosureEventData forwarded_data; 653 NodeName peer_node_name; 654 PortName peer_port_name; 655 bool try_remove_proxy = false; 656 { 657 base::AutoLock lock(port->lock); 658 659 port->peer_closed = true; 660 port->last_sequence_num_to_receive = last_sequence_num; 661 662 DVLOG(2) << "ObserveClosure at " << port_name << "@" << name_ 663 << " (state=" << port->state << ") pointing to " 664 << port->peer_port_name << "@" << port->peer_node_name 665 << " (last_sequence_num=" << last_sequence_num << ")"; 666 667 // We always forward ObserveClosure, even beyond the receiving port which 668 // cares about it. This ensures that any dead-end proxies beyond that port 669 // are notified to remove themselves. 670 671 if (port->state == Port::kReceiving) { 672 notify_delegate = true; 673 674 // When forwarding along the other half of the port cycle, this will only 675 // reach dead-end proxies. Tell them we've sent our last message so they 676 // can go away. 677 // 678 // TODO: Repurposing ObserveClosure for this has the desired result but 679 // may be semantically confusing since the forwarding port is not actually 680 // closed. Consider replacing this with a new event type. 681 forwarded_data.last_sequence_num = port->next_sequence_num_to_send - 1; 682 } else { 683 // We haven't yet reached the receiving peer of the closed port, so 684 // forward the message along as-is. 685 forwarded_data.last_sequence_num = last_sequence_num; 686 687 // See about removing the port if it is a proxy as our peer won't be able 688 // to participate in proxy removal. 689 port->remove_proxy_on_last_message = true; 690 if (port->state == Port::kProxying) 691 try_remove_proxy = true; 692 } 693 694 DVLOG(2) << "Forwarding ObserveClosure from " 695 << port_name << "@" << name_ << " to peer " 696 << port->peer_port_name << "@" << port->peer_node_name 697 << " (last_sequence_num=" << forwarded_data.last_sequence_num 698 << ")"; 699 700 peer_node_name = port->peer_node_name; 701 peer_port_name = port->peer_port_name; 702 } 703 if (try_remove_proxy) 704 TryRemoveProxy(PortRef(port_name, port)); 705 706 delegate_->ForwardMessage( 707 peer_node_name, 708 NewInternalMessage(peer_port_name, EventType::kObserveClosure, 709 forwarded_data)); 710 711 if (notify_delegate) { 712 PortRef port_ref(port_name, port); 713 delegate_->PortStatusChanged(port_ref); 714 } 715 return OK; 716 } 717 718 int Node::OnMergePort(const PortName& port_name, 719 const MergePortEventData& event) { 720 scoped_refptr<Port> port = GetPort(port_name); 721 722 DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state=" 723 << (port ? port->state : -1) << ") merging with proxy " 724 << event.new_port_name 725 << "@" << name_ << " pointing to " 726 << event.new_port_descriptor.peer_port_name << "@" 727 << event.new_port_descriptor.peer_node_name << " referred by " 728 << event.new_port_descriptor.referring_port_name << "@" 729 << event.new_port_descriptor.referring_node_name; 730 731 bool close_target_port = false; 732 bool close_new_port = false; 733 734 // Accept the new port. This is now the receiving end of the other port cycle 735 // to be merged with ours. 736 int rv = AcceptPort(event.new_port_name, event.new_port_descriptor); 737 if (rv != OK) { 738 close_target_port = true; 739 } else if (port) { 740 // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn 741 // needs to hold |ports_lock_|. We also acquire multiple port locks within. 742 base::AutoLock ports_lock(ports_lock_); 743 base::AutoLock lock(port->lock); 744 745 if (port->state != Port::kReceiving) { 746 close_new_port = true; 747 } else { 748 scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name); 749 base::AutoLock new_port_lock(new_port->lock); 750 DCHECK(new_port->state == Port::kReceiving); 751 752 // Both ports are locked. Now all we have to do is swap their peer 753 // information and set them up as proxies. 754 755 PortRef port0_ref(port_name, port); 756 PortRef port1_ref(event.new_port_name, new_port); 757 int rv = MergePorts_Locked(port0_ref, port1_ref); 758 if (rv == OK) 759 return rv; 760 761 close_new_port = true; 762 close_target_port = true; 763 } 764 } else { 765 close_new_port = true; 766 } 767 768 if (close_target_port) { 769 PortRef target_port; 770 rv = GetPort(port_name, &target_port); 771 DCHECK(rv == OK); 772 773 ClosePort(target_port); 774 } 775 776 if (close_new_port) { 777 PortRef new_port; 778 rv = GetPort(event.new_port_name, &new_port); 779 DCHECK(rv == OK); 780 781 ClosePort(new_port); 782 } 783 784 return ERROR_PORT_STATE_UNEXPECTED; 785 } 786 787 int Node::AddPortWithName(const PortName& port_name, 788 const scoped_refptr<Port>& port) { 789 base::AutoLock lock(ports_lock_); 790 791 if (!ports_.insert(std::make_pair(port_name, port)).second) 792 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. 793 794 DVLOG(2) << "Created port " << port_name << "@" << name_; 795 return OK; 796 } 797 798 void Node::ErasePort(const PortName& port_name) { 799 base::AutoLock lock(ports_lock_); 800 ErasePort_Locked(port_name); 801 } 802 803 void Node::ErasePort_Locked(const PortName& port_name) { 804 ports_lock_.AssertAcquired(); 805 ports_.erase(port_name); 806 DVLOG(2) << "Deleted port " << port_name << "@" << name_; 807 } 808 809 scoped_refptr<Port> Node::GetPort(const PortName& port_name) { 810 base::AutoLock lock(ports_lock_); 811 return GetPort_Locked(port_name); 812 } 813 814 scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) { 815 ports_lock_.AssertAcquired(); 816 auto iter = ports_.find(port_name); 817 if (iter == ports_.end()) 818 return nullptr; 819 820 return iter->second; 821 } 822 823 int Node::SendMessageInternal(const PortRef& port_ref, ScopedMessage* message) { 824 ScopedMessage& m = *message; 825 for (size_t i = 0; i < m->num_ports(); ++i) { 826 if (m->ports()[i] == port_ref.name()) 827 return ERROR_PORT_CANNOT_SEND_SELF; 828 } 829 830 Port* port = port_ref.port(); 831 NodeName peer_node_name; 832 { 833 // We must acquire |ports_lock_| before grabbing any port locks, because 834 // WillSendMessage_Locked may need to lock multiple ports out of order. 835 base::AutoLock ports_lock(ports_lock_); 836 base::AutoLock lock(port->lock); 837 838 if (port->state != Port::kReceiving) 839 return ERROR_PORT_STATE_UNEXPECTED; 840 841 if (port->peer_closed) 842 return ERROR_PORT_PEER_CLOSED; 843 844 int rv = WillSendMessage_Locked(LockedPort(port), port_ref.name(), m.get()); 845 if (rv != OK) 846 return rv; 847 848 // Beyond this point there's no sense in returning anything but OK. Even if 849 // message forwarding or acceptance fails, there's nothing the embedder can 850 // do to recover. Assume that failure beyond this point must be treated as a 851 // transport failure. 852 853 peer_node_name = port->peer_node_name; 854 } 855 856 if (peer_node_name != name_) { 857 delegate_->ForwardMessage(peer_node_name, std::move(m)); 858 return OK; 859 } 860 861 int rv = AcceptMessage(std::move(m)); 862 if (rv != OK) { 863 // See comment above for why we don't return an error in this case. 864 DVLOG(2) << "AcceptMessage failed: " << rv; 865 } 866 867 return OK; 868 } 869 870 int Node::MergePorts_Locked(const PortRef& port0_ref, 871 const PortRef& port1_ref) { 872 Port* port0 = port0_ref.port(); 873 Port* port1 = port1_ref.port(); 874 875 ports_lock_.AssertAcquired(); 876 port0->lock.AssertAcquired(); 877 port1->lock.AssertAcquired(); 878 879 CHECK(port0->state == Port::kReceiving); 880 CHECK(port1->state == Port::kReceiving); 881 882 // Ports cannot be merged with their own receiving peer! 883 if (port0->peer_node_name == name_ && 884 port0->peer_port_name == port1_ref.name()) 885 return ERROR_PORT_STATE_UNEXPECTED; 886 887 if (port1->peer_node_name == name_ && 888 port1->peer_port_name == port0_ref.name()) 889 return ERROR_PORT_STATE_UNEXPECTED; 890 891 // Only merge if both ports have never sent a message. 892 if (port0->next_sequence_num_to_send == kInitialSequenceNum && 893 port1->next_sequence_num_to_send == kInitialSequenceNum) { 894 // Swap the ports' peer information and switch them both into buffering 895 // (eventually proxying) mode. 896 897 std::swap(port0->peer_node_name, port1->peer_node_name); 898 std::swap(port0->peer_port_name, port1->peer_port_name); 899 900 port0->state = Port::kBuffering; 901 if (port0->peer_closed) 902 port0->remove_proxy_on_last_message = true; 903 904 port1->state = Port::kBuffering; 905 if (port1->peer_closed) 906 port1->remove_proxy_on_last_message = true; 907 908 int rv1 = BeginProxying_Locked(LockedPort(port0), port0_ref.name()); 909 int rv2 = BeginProxying_Locked(LockedPort(port1), port1_ref.name()); 910 911 if (rv1 == OK && rv2 == OK) { 912 // If either merged port had a closed peer, its new peer needs to be 913 // informed of this. 914 if (port1->peer_closed) { 915 ObserveClosureEventData data; 916 data.last_sequence_num = port0->last_sequence_num_to_receive; 917 delegate_->ForwardMessage( 918 port0->peer_node_name, 919 NewInternalMessage(port0->peer_port_name, 920 EventType::kObserveClosure, data)); 921 } 922 923 if (port0->peer_closed) { 924 ObserveClosureEventData data; 925 data.last_sequence_num = port1->last_sequence_num_to_receive; 926 delegate_->ForwardMessage( 927 port1->peer_node_name, 928 NewInternalMessage(port1->peer_port_name, 929 EventType::kObserveClosure, data)); 930 } 931 932 return OK; 933 } 934 935 // If either proxy failed to initialize (e.g. had undeliverable messages 936 // or ended up in a bad state somehow), we keep the system in a consistent 937 // state by undoing the peer swap. 938 std::swap(port0->peer_node_name, port1->peer_node_name); 939 std::swap(port0->peer_port_name, port1->peer_port_name); 940 port0->remove_proxy_on_last_message = false; 941 port1->remove_proxy_on_last_message = false; 942 port0->state = Port::kReceiving; 943 port1->state = Port::kReceiving; 944 } 945 946 return ERROR_PORT_STATE_UNEXPECTED; 947 } 948 949 void Node::WillSendPort(const LockedPort& port, 950 const NodeName& to_node_name, 951 PortName* port_name, 952 PortDescriptor* port_descriptor) { 953 port->lock.AssertAcquired(); 954 955 PortName local_port_name = *port_name; 956 957 PortName new_port_name; 958 delegate_->GenerateRandomPortName(&new_port_name); 959 960 // Make sure we don't send messages to the new peer until after we know it 961 // exists. In the meantime, just buffer messages locally. 962 DCHECK(port->state == Port::kReceiving); 963 port->state = Port::kBuffering; 964 965 // If we already know our peer is closed, we already know this proxy can 966 // be removed once it receives and forwards its last expected message. 967 if (port->peer_closed) 968 port->remove_proxy_on_last_message = true; 969 970 *port_name = new_port_name; 971 972 port_descriptor->peer_node_name = port->peer_node_name; 973 port_descriptor->peer_port_name = port->peer_port_name; 974 port_descriptor->referring_node_name = name_; 975 port_descriptor->referring_port_name = local_port_name; 976 port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send; 977 port_descriptor->next_sequence_num_to_receive = 978 port->message_queue.next_sequence_num(); 979 port_descriptor->last_sequence_num_to_receive = 980 port->last_sequence_num_to_receive; 981 port_descriptor->peer_closed = port->peer_closed; 982 memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding)); 983 984 // Configure the local port to point to the new port. 985 port->peer_node_name = to_node_name; 986 port->peer_port_name = new_port_name; 987 } 988 989 int Node::AcceptPort(const PortName& port_name, 990 const PortDescriptor& port_descriptor) { 991 scoped_refptr<Port> port = make_scoped_refptr( 992 new Port(port_descriptor.next_sequence_num_to_send, 993 port_descriptor.next_sequence_num_to_receive)); 994 port->state = Port::kReceiving; 995 port->peer_node_name = port_descriptor.peer_node_name; 996 port->peer_port_name = port_descriptor.peer_port_name; 997 port->last_sequence_num_to_receive = 998 port_descriptor.last_sequence_num_to_receive; 999 port->peer_closed = port_descriptor.peer_closed; 1000 1001 DVLOG(2) << "Accepting port " << port_name << " [peer_closed=" 1002 << port->peer_closed << "; last_sequence_num_to_receive=" 1003 << port->last_sequence_num_to_receive << "]"; 1004 1005 // A newly accepted port is not signalable until the message referencing the 1006 // new port finds its way to the consumer (see GetMessageIf). 1007 port->message_queue.set_signalable(false); 1008 1009 int rv = AddPortWithName(port_name, port); 1010 if (rv != OK) 1011 return rv; 1012 1013 // Allow referring port to forward messages. 1014 delegate_->ForwardMessage( 1015 port_descriptor.referring_node_name, 1016 NewInternalMessage(port_descriptor.referring_port_name, 1017 EventType::kPortAccepted)); 1018 return OK; 1019 } 1020 1021 int Node::WillSendMessage_Locked(const LockedPort& port, 1022 const PortName& port_name, 1023 Message* message) { 1024 ports_lock_.AssertAcquired(); 1025 port->lock.AssertAcquired(); 1026 1027 DCHECK(message); 1028 1029 // Messages may already have a sequence number if they're being forwarded 1030 // by a proxy. Otherwise, use the next outgoing sequence number. 1031 uint64_t* sequence_num = 1032 &GetMutableEventData<UserEventData>(message)->sequence_num; 1033 if (*sequence_num == 0) 1034 *sequence_num = port->next_sequence_num_to_send++; 1035 1036 #if DCHECK_IS_ON() 1037 std::ostringstream ports_buf; 1038 for (size_t i = 0; i < message->num_ports(); ++i) { 1039 if (i > 0) 1040 ports_buf << ","; 1041 ports_buf << message->ports()[i]; 1042 } 1043 #endif 1044 1045 if (message->num_ports() > 0) { 1046 // Note: Another thread could be trying to send the same ports, so we need 1047 // to ensure that they are ours to send before we mutate their state. 1048 1049 std::vector<scoped_refptr<Port>> ports; 1050 ports.resize(message->num_ports()); 1051 1052 { 1053 for (size_t i = 0; i < message->num_ports(); ++i) { 1054 ports[i] = GetPort_Locked(message->ports()[i]); 1055 DCHECK(ports[i]); 1056 1057 ports[i]->lock.Acquire(); 1058 int error = OK; 1059 if (ports[i]->state != Port::kReceiving) 1060 error = ERROR_PORT_STATE_UNEXPECTED; 1061 else if (message->ports()[i] == port->peer_port_name) 1062 error = ERROR_PORT_CANNOT_SEND_PEER; 1063 1064 if (error != OK) { 1065 // Oops, we cannot send this port. 1066 for (size_t j = 0; j <= i; ++j) 1067 ports[i]->lock.Release(); 1068 // Backpedal on the sequence number. 1069 port->next_sequence_num_to_send--; 1070 return error; 1071 } 1072 } 1073 } 1074 1075 PortDescriptor* port_descriptors = 1076 GetMutablePortDescriptors(GetMutableEventData<UserEventData>(message)); 1077 1078 for (size_t i = 0; i < message->num_ports(); ++i) { 1079 WillSendPort(LockedPort(ports[i].get()), 1080 port->peer_node_name, 1081 message->mutable_ports() + i, 1082 port_descriptors + i); 1083 } 1084 1085 for (size_t i = 0; i < message->num_ports(); ++i) 1086 ports[i]->lock.Release(); 1087 } 1088 1089 #if DCHECK_IS_ON() 1090 DVLOG(2) << "Sending message " 1091 << GetEventData<UserEventData>(*message)->sequence_num 1092 << " [ports=" << ports_buf.str() << "]" 1093 << " from " << port_name << "@" << name_ 1094 << " to " << port->peer_port_name << "@" << port->peer_node_name; 1095 #endif 1096 1097 GetMutableEventHeader(message)->port_name = port->peer_port_name; 1098 return OK; 1099 } 1100 1101 int Node::BeginProxying_Locked(const LockedPort& port, 1102 const PortName& port_name) { 1103 ports_lock_.AssertAcquired(); 1104 port->lock.AssertAcquired(); 1105 1106 if (port->state != Port::kBuffering) 1107 return OOPS(ERROR_PORT_STATE_UNEXPECTED); 1108 1109 port->state = Port::kProxying; 1110 1111 int rv = ForwardMessages_Locked(LockedPort(port), port_name); 1112 if (rv != OK) 1113 return rv; 1114 1115 // We may have observed closure while buffering. In that case, we can advance 1116 // to removing the proxy without sending out an ObserveProxy message. We 1117 // already know the last expected message, etc. 1118 1119 if (port->remove_proxy_on_last_message) { 1120 MaybeRemoveProxy_Locked(LockedPort(port), port_name); 1121 1122 // Make sure we propagate closure to our current peer. 1123 ObserveClosureEventData data; 1124 data.last_sequence_num = port->last_sequence_num_to_receive; 1125 delegate_->ForwardMessage( 1126 port->peer_node_name, 1127 NewInternalMessage(port->peer_port_name, 1128 EventType::kObserveClosure, data)); 1129 } else { 1130 InitiateProxyRemoval(LockedPort(port), port_name); 1131 } 1132 1133 return OK; 1134 } 1135 1136 int Node::BeginProxying(PortRef port_ref) { 1137 Port* port = port_ref.port(); 1138 { 1139 base::AutoLock ports_lock(ports_lock_); 1140 base::AutoLock lock(port->lock); 1141 1142 if (port->state != Port::kBuffering) 1143 return OOPS(ERROR_PORT_STATE_UNEXPECTED); 1144 1145 port->state = Port::kProxying; 1146 1147 int rv = ForwardMessages_Locked(LockedPort(port), port_ref.name()); 1148 if (rv != OK) 1149 return rv; 1150 } 1151 1152 bool should_remove; 1153 NodeName peer_node_name; 1154 ScopedMessage closure_message; 1155 { 1156 base::AutoLock lock(port->lock); 1157 if (port->state != Port::kProxying) 1158 return OOPS(ERROR_PORT_STATE_UNEXPECTED); 1159 1160 should_remove = port->remove_proxy_on_last_message; 1161 if (should_remove) { 1162 // Make sure we propagate closure to our current peer. 1163 ObserveClosureEventData data; 1164 data.last_sequence_num = port->last_sequence_num_to_receive; 1165 peer_node_name = port->peer_node_name; 1166 closure_message = NewInternalMessage(port->peer_port_name, 1167 EventType::kObserveClosure, data); 1168 } else { 1169 InitiateProxyRemoval(LockedPort(port), port_ref.name()); 1170 } 1171 } 1172 1173 if (should_remove) { 1174 TryRemoveProxy(port_ref); 1175 delegate_->ForwardMessage(peer_node_name, std::move(closure_message)); 1176 } 1177 1178 return OK; 1179 } 1180 1181 int Node::ForwardMessages_Locked(const LockedPort& port, 1182 const PortName &port_name) { 1183 ports_lock_.AssertAcquired(); 1184 port->lock.AssertAcquired(); 1185 1186 for (;;) { 1187 ScopedMessage message; 1188 port->message_queue.GetNextMessageIf(nullptr, &message); 1189 if (!message) 1190 break; 1191 1192 int rv = WillSendMessage_Locked(LockedPort(port), port_name, message.get()); 1193 if (rv != OK) 1194 return rv; 1195 1196 delegate_->ForwardMessage(port->peer_node_name, std::move(message)); 1197 } 1198 return OK; 1199 } 1200 1201 void Node::InitiateProxyRemoval(const LockedPort& port, 1202 const PortName& port_name) { 1203 port->lock.AssertAcquired(); 1204 1205 // To remove this node, we start by notifying the connected graph that we are 1206 // a proxy. This allows whatever port is referencing this node to skip it. 1207 // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if 1208 // the peer was closed in the meantime). 1209 1210 ObserveProxyEventData data; 1211 data.proxy_node_name = name_; 1212 data.proxy_port_name = port_name; 1213 data.proxy_to_node_name = port->peer_node_name; 1214 data.proxy_to_port_name = port->peer_port_name; 1215 1216 delegate_->ForwardMessage( 1217 port->peer_node_name, 1218 NewInternalMessage(port->peer_port_name, EventType::kObserveProxy, data)); 1219 } 1220 1221 void Node::MaybeRemoveProxy_Locked(const LockedPort& port, 1222 const PortName& port_name) { 1223 // |ports_lock_| must be held so we can potentilaly ErasePort_Locked(). 1224 ports_lock_.AssertAcquired(); 1225 port->lock.AssertAcquired(); 1226 1227 DCHECK(port->state == Port::kProxying); 1228 1229 // Make sure we have seen ObserveProxyAck before removing the port. 1230 if (!port->remove_proxy_on_last_message) 1231 return; 1232 1233 if (!CanAcceptMoreMessages(port.get())) { 1234 // This proxy port is done. We can now remove it! 1235 ErasePort_Locked(port_name); 1236 1237 if (port->send_on_proxy_removal) { 1238 NodeName to_node = port->send_on_proxy_removal->first; 1239 ScopedMessage& message = port->send_on_proxy_removal->second; 1240 1241 delegate_->ForwardMessage(to_node, std::move(message)); 1242 port->send_on_proxy_removal.reset(); 1243 } 1244 } else { 1245 DVLOG(2) << "Cannot remove port " << port_name << "@" << name_ 1246 << " now; waiting for more messages"; 1247 } 1248 } 1249 1250 void Node::TryRemoveProxy(PortRef port_ref) { 1251 Port* port = port_ref.port(); 1252 bool should_erase = false; 1253 ScopedMessage msg; 1254 NodeName to_node; 1255 { 1256 base::AutoLock lock(port->lock); 1257 1258 // Port already removed. Nothing to do. 1259 if (port->state == Port::kClosed) 1260 return; 1261 1262 DCHECK(port->state == Port::kProxying); 1263 1264 // Make sure we have seen ObserveProxyAck before removing the port. 1265 if (!port->remove_proxy_on_last_message) 1266 return; 1267 1268 if (!CanAcceptMoreMessages(port)) { 1269 // This proxy port is done. We can now remove it! 1270 should_erase = true; 1271 1272 if (port->send_on_proxy_removal) { 1273 to_node = port->send_on_proxy_removal->first; 1274 msg = std::move(port->send_on_proxy_removal->second); 1275 port->send_on_proxy_removal.reset(); 1276 } 1277 } else { 1278 DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_ 1279 << " now; waiting for more messages"; 1280 } 1281 } 1282 1283 if (should_erase) 1284 ErasePort(port_ref.name()); 1285 1286 if (msg) 1287 delegate_->ForwardMessage(to_node, std::move(msg)); 1288 } 1289 1290 void Node::DestroyAllPortsWithPeer(const NodeName& node_name, 1291 const PortName& port_name) { 1292 // Wipes out all ports whose peer node matches |node_name| and whose peer port 1293 // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer 1294 // node is matched. 1295 1296 std::vector<PortRef> ports_to_notify; 1297 std::vector<PortName> dead_proxies_to_broadcast; 1298 std::deque<PortName> referenced_port_names; 1299 1300 { 1301 base::AutoLock ports_lock(ports_lock_); 1302 1303 for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) { 1304 Port* port = iter->second.get(); 1305 { 1306 base::AutoLock port_lock(port->lock); 1307 1308 if (port->peer_node_name == node_name && 1309 (port_name == kInvalidPortName || 1310 port->peer_port_name == port_name)) { 1311 if (!port->peer_closed) { 1312 // Treat this as immediate peer closure. It's an exceptional 1313 // condition akin to a broken pipe, so we don't care about losing 1314 // messages. 1315 1316 port->peer_closed = true; 1317 port->last_sequence_num_to_receive = 1318 port->message_queue.next_sequence_num() - 1; 1319 1320 if (port->state == Port::kReceiving) 1321 ports_to_notify.push_back(PortRef(iter->first, port)); 1322 } 1323 1324 // We don't expect to forward any further messages, and we don't 1325 // expect to receive a Port{Accepted,Rejected} event. Because we're 1326 // a proxy with no active peer, we cannot use the normal proxy removal 1327 // procedure of forward-propagating an ObserveProxy. Instead we 1328 // broadcast our own death so it can be back-propagated. This is 1329 // inefficient but rare. 1330 if (port->state != Port::kReceiving) { 1331 dead_proxies_to_broadcast.push_back(iter->first); 1332 iter->second->message_queue.GetReferencedPorts( 1333 &referenced_port_names); 1334 } 1335 } 1336 } 1337 } 1338 1339 for (const auto& proxy_name : dead_proxies_to_broadcast) { 1340 ports_.erase(proxy_name); 1341 DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_; 1342 } 1343 } 1344 1345 // Wake up any receiving ports who have just observed simulated peer closure. 1346 for (const auto& port : ports_to_notify) 1347 delegate_->PortStatusChanged(port); 1348 1349 for (const auto& proxy_name : dead_proxies_to_broadcast) { 1350 // Broadcast an event signifying that this proxy is no longer functioning. 1351 ObserveProxyEventData event; 1352 event.proxy_node_name = name_; 1353 event.proxy_port_name = proxy_name; 1354 event.proxy_to_node_name = kInvalidNodeName; 1355 event.proxy_to_port_name = kInvalidPortName; 1356 delegate_->BroadcastMessage(NewInternalMessage( 1357 kInvalidPortName, EventType::kObserveProxy, event)); 1358 1359 // Also process death locally since the port that points this closed one 1360 // could be on the current node. 1361 // Note: Although this is recursive, only a single port is involved which 1362 // limits the expected branching to 1. 1363 DestroyAllPortsWithPeer(name_, proxy_name); 1364 } 1365 1366 // Close any ports referenced by the closed proxies. 1367 for (const auto& name : referenced_port_names) { 1368 PortRef ref; 1369 if (GetPort(name, &ref) == OK) 1370 ClosePort(ref); 1371 } 1372 } 1373 1374 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name, 1375 const EventType& type, 1376 const void* data, 1377 size_t num_data_bytes) { 1378 ScopedMessage message; 1379 delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message); 1380 1381 EventHeader* header = GetMutableEventHeader(message.get()); 1382 header->port_name = port_name; 1383 header->type = type; 1384 header->padding = 0; 1385 1386 if (num_data_bytes) 1387 memcpy(header + 1, data, num_data_bytes); 1388 1389 return message; 1390 } 1391 1392 } // namespace ports 1393 } // namespace edk 1394 } // namespace mojo 1395