1 // Copyright 2016 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/edk/system/node_controller.h" 6 7 #include <algorithm> 8 #include <limits> 9 10 #include "base/bind.h" 11 #include "base/location.h" 12 #include "base/logging.h" 13 #include "base/macros.h" 14 #include "base/message_loop/message_loop.h" 15 #include "base/metrics/histogram_macros.h" 16 #include "base/process/process_handle.h" 17 #include "base/rand_util.h" 18 #include "base/time/time.h" 19 #include "base/timer/elapsed_timer.h" 20 #include "mojo/edk/embedder/embedder_internal.h" 21 #include "mojo/edk/embedder/named_platform_channel_pair.h" 22 #include "mojo/edk/embedder/named_platform_handle.h" 23 #include "mojo/edk/embedder/platform_channel_pair.h" 24 #include "mojo/edk/system/broker.h" 25 #include "mojo/edk/system/broker_host.h" 26 #include "mojo/edk/system/core.h" 27 #include "mojo/edk/system/ports_message.h" 28 #include "mojo/edk/system/request_context.h" 29 30 #if defined(OS_MACOSX) && !defined(OS_IOS) 31 #include "mojo/edk/system/mach_port_relay.h" 32 #endif 33 34 #if !defined(OS_NACL) 35 #include "crypto/random.h" 36 #endif 37 38 namespace mojo { 39 namespace edk { 40 41 namespace { 42 43 #if defined(OS_NACL) 44 template <typename T> 45 void GenerateRandomName(T* out) { base::RandBytes(out, sizeof(T)); } 46 #else 47 template <typename T> 48 void GenerateRandomName(T* out) { crypto::RandBytes(out, sizeof(T)); } 49 #endif 50 51 ports::NodeName GetRandomNodeName() { 52 ports::NodeName name; 53 GenerateRandomName(&name); 54 return name; 55 } 56 57 void RecordPeerCount(size_t count) { 58 DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max())); 59 60 // 8k is the maximum number of file descriptors allowed in Chrome. 61 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.ConnectedPeers", 62 static_cast<int32_t>(count), 63 1 /* min */, 64 8000 /* max */, 65 50 /* bucket count */); 66 } 67 68 void RecordPendingChildCount(size_t count) { 69 DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max())); 70 71 // 8k is the maximum number of file descriptors allowed in Chrome. 72 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.PendingChildren", 73 static_cast<int32_t>(count), 74 1 /* min */, 75 8000 /* max */, 76 50 /* bucket count */); 77 } 78 79 bool ParsePortsMessage(Channel::Message* message, 80 void** data, 81 size_t* num_data_bytes, 82 size_t* num_header_bytes, 83 size_t* num_payload_bytes, 84 size_t* num_ports_bytes) { 85 DCHECK(data && num_data_bytes && num_header_bytes && num_payload_bytes && 86 num_ports_bytes); 87 88 NodeChannel::GetPortsMessageData(message, data, num_data_bytes); 89 if (!*num_data_bytes) 90 return false; 91 92 if (!ports::Message::Parse(*data, *num_data_bytes, num_header_bytes, 93 num_payload_bytes, num_ports_bytes)) { 94 return false; 95 } 96 97 return true; 98 } 99 100 // Used by NodeController to watch for shutdown. Since no IO can happen once 101 // the IO thread is killed, the NodeController can cleanly drop all its peers 102 // at that time. 103 class ThreadDestructionObserver : 104 public base::MessageLoop::DestructionObserver { 105 public: 106 static void Create(scoped_refptr<base::TaskRunner> task_runner, 107 const base::Closure& callback) { 108 if (task_runner->RunsTasksOnCurrentThread()) { 109 // Owns itself. 110 new ThreadDestructionObserver(callback); 111 } else { 112 task_runner->PostTask(FROM_HERE, 113 base::Bind(&Create, task_runner, callback)); 114 } 115 } 116 117 private: 118 explicit ThreadDestructionObserver(const base::Closure& callback) 119 : callback_(callback) { 120 base::MessageLoop::current()->AddDestructionObserver(this); 121 } 122 123 ~ThreadDestructionObserver() override { 124 base::MessageLoop::current()->RemoveDestructionObserver(this); 125 } 126 127 // base::MessageLoop::DestructionObserver: 128 void WillDestroyCurrentMessageLoop() override { 129 callback_.Run(); 130 delete this; 131 } 132 133 const base::Closure callback_; 134 135 DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver); 136 }; 137 138 } // namespace 139 140 NodeController::~NodeController() {} 141 142 NodeController::NodeController(Core* core) 143 : core_(core), 144 name_(GetRandomNodeName()), 145 node_(new ports::Node(name_, this)) { 146 DVLOG(1) << "Initializing node " << name_; 147 } 148 149 #if defined(OS_MACOSX) && !defined(OS_IOS) 150 void NodeController::CreateMachPortRelay( 151 base::PortProvider* port_provider) { 152 base::AutoLock lock(mach_port_relay_lock_); 153 DCHECK(!mach_port_relay_); 154 mach_port_relay_.reset(new MachPortRelay(port_provider)); 155 } 156 #endif 157 158 void NodeController::SetIOTaskRunner( 159 scoped_refptr<base::TaskRunner> task_runner) { 160 io_task_runner_ = task_runner; 161 ThreadDestructionObserver::Create( 162 io_task_runner_, 163 base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); 164 } 165 166 void NodeController::ConnectToChild( 167 base::ProcessHandle process_handle, 168 ConnectionParams connection_params, 169 const std::string& child_token, 170 const ProcessErrorCallback& process_error_callback) { 171 // Generate the temporary remote node name here so that it can be associated 172 // with the embedder's child_token. If an error occurs in the child process 173 // after it is launched, but before any reserved ports are connected, this can 174 // be used to clean up any dangling ports. 175 ports::NodeName node_name; 176 GenerateRandomName(&node_name); 177 178 { 179 base::AutoLock lock(reserved_ports_lock_); 180 bool inserted = pending_child_tokens_.insert( 181 std::make_pair(node_name, child_token)).second; 182 DCHECK(inserted); 183 } 184 185 #if defined(OS_WIN) 186 // On Windows, we need to duplicate the process handle because we have no 187 // control over its lifetime and it may become invalid by the time the posted 188 // task runs. 189 HANDLE dup_handle = INVALID_HANDLE_VALUE; 190 BOOL ok = ::DuplicateHandle( 191 base::GetCurrentProcessHandle(), process_handle, 192 base::GetCurrentProcessHandle(), &dup_handle, 193 0, FALSE, DUPLICATE_SAME_ACCESS); 194 DPCHECK(ok); 195 process_handle = dup_handle; 196 #endif 197 198 io_task_runner_->PostTask( 199 FROM_HERE, base::Bind(&NodeController::ConnectToChildOnIOThread, 200 base::Unretained(this), process_handle, 201 base::Passed(&connection_params), node_name, 202 process_error_callback)); 203 } 204 205 void NodeController::CloseChildPorts(const std::string& child_token) { 206 std::vector<ports::PortRef> ports_to_close; 207 { 208 std::vector<std::string> port_tokens; 209 base::AutoLock lock(reserved_ports_lock_); 210 for (const auto& port : reserved_ports_) { 211 if (port.second.child_token == child_token) { 212 DVLOG(1) << "Closing reserved port " << port.second.port.name(); 213 ports_to_close.push_back(port.second.port); 214 port_tokens.push_back(port.first); 215 } 216 } 217 218 for (const auto& token : port_tokens) 219 reserved_ports_.erase(token); 220 } 221 222 for (const auto& port : ports_to_close) 223 node_->ClosePort(port); 224 225 // Ensure local port closure messages are processed. 226 AcceptIncomingMessages(); 227 } 228 229 void NodeController::ClosePeerConnection(const std::string& peer_token) { 230 io_task_runner_->PostTask( 231 FROM_HERE, base::Bind(&NodeController::ClosePeerConnectionOnIOThread, 232 base::Unretained(this), peer_token)); 233 } 234 235 void NodeController::ConnectToParent(ConnectionParams connection_params) { 236 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) 237 // Use the bootstrap channel for the broker and receive the node's channel 238 // synchronously as the first message from the broker. 239 base::ElapsedTimer timer; 240 broker_.reset(new Broker(connection_params.TakeChannelHandle())); 241 ScopedPlatformHandle platform_handle = broker_->GetParentPlatformHandle(); 242 UMA_HISTOGRAM_TIMES("Mojo.System.GetParentPlatformHandleSyncTime", 243 timer.Elapsed()); 244 245 if (!platform_handle.is_valid()) { 246 // Most likely the browser side of the channel has already been closed and 247 // the broker was unable to negotiate a NodeChannel pipe. In this case we 248 // can cancel parent connection. 249 DVLOG(1) << "Cannot connect to invalid parent channel."; 250 CancelPendingPortMerges(); 251 return; 252 } 253 connection_params = ConnectionParams(std::move(platform_handle)); 254 #endif 255 256 io_task_runner_->PostTask( 257 FROM_HERE, 258 base::Bind(&NodeController::ConnectToParentOnIOThread, 259 base::Unretained(this), base::Passed(&connection_params))); 260 } 261 262 void NodeController::ConnectToPeer(ConnectionParams connection_params, 263 const ports::PortRef& port, 264 const std::string& peer_token) { 265 ports::NodeName node_name; 266 GenerateRandomName(&node_name); 267 io_task_runner_->PostTask( 268 FROM_HERE, 269 base::Bind(&NodeController::ConnectToPeerOnIOThread, 270 base::Unretained(this), base::Passed(&connection_params), 271 node_name, port, peer_token)); 272 } 273 274 void NodeController::SetPortObserver(const ports::PortRef& port, 275 scoped_refptr<PortObserver> observer) { 276 node_->SetUserData(port, std::move(observer)); 277 } 278 279 void NodeController::ClosePort(const ports::PortRef& port) { 280 SetPortObserver(port, nullptr); 281 int rv = node_->ClosePort(port); 282 DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name(); 283 284 AcceptIncomingMessages(); 285 } 286 287 int NodeController::SendMessage(const ports::PortRef& port, 288 std::unique_ptr<PortsMessage> message) { 289 ports::ScopedMessage ports_message(message.release()); 290 int rv = node_->SendMessage(port, std::move(ports_message)); 291 292 AcceptIncomingMessages(); 293 return rv; 294 } 295 296 void NodeController::ReservePort(const std::string& token, 297 const ports::PortRef& port, 298 const std::string& child_token) { 299 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " 300 << token; 301 302 base::AutoLock lock(reserved_ports_lock_); 303 auto result = reserved_ports_.insert( 304 std::make_pair(token, ReservedPort{port, child_token})); 305 DCHECK(result.second); 306 } 307 308 void NodeController::MergePortIntoParent(const std::string& token, 309 const ports::PortRef& port) { 310 bool was_merged = false; 311 { 312 // This request may be coming from within the process that reserved the 313 // "parent" side (e.g. for Chrome single-process mode), so if this token is 314 // reserved locally, merge locally instead. 315 base::AutoLock lock(reserved_ports_lock_); 316 auto it = reserved_ports_.find(token); 317 if (it != reserved_ports_.end()) { 318 node_->MergePorts(port, name_, it->second.port.name()); 319 reserved_ports_.erase(it); 320 was_merged = true; 321 } 322 } 323 if (was_merged) { 324 AcceptIncomingMessages(); 325 return; 326 } 327 328 scoped_refptr<NodeChannel> parent; 329 bool reject_merge = false; 330 { 331 // Hold |pending_port_merges_lock_| while getting |parent|. Otherwise, 332 // there is a race where the parent can be set, and |pending_port_merges_| 333 // be processed between retrieving |parent| and adding the merge to 334 // |pending_port_merges_|. 335 base::AutoLock lock(pending_port_merges_lock_); 336 parent = GetParentChannel(); 337 if (reject_pending_merges_) { 338 reject_merge = true; 339 } else if (!parent) { 340 pending_port_merges_.push_back(std::make_pair(token, port)); 341 return; 342 } 343 } 344 if (reject_merge) { 345 node_->ClosePort(port); 346 DVLOG(2) << "Rejecting port merge for token " << token 347 << " due to closed parent channel."; 348 AcceptIncomingMessages(); 349 return; 350 } 351 352 parent->RequestPortMerge(port.name(), token); 353 } 354 355 int NodeController::MergeLocalPorts(const ports::PortRef& port0, 356 const ports::PortRef& port1) { 357 int rv = node_->MergeLocalPorts(port0, port1); 358 AcceptIncomingMessages(); 359 return rv; 360 } 361 362 scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer( 363 size_t num_bytes) { 364 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) 365 // Shared buffer creation failure is fatal, so always use the broker when we 366 // have one. This does mean that a non-root process that has children will use 367 // the broker for shared buffer creation even though that process is 368 // privileged. 369 if (broker_) { 370 return broker_->GetSharedBuffer(num_bytes); 371 } 372 #endif 373 return PlatformSharedBuffer::Create(num_bytes); 374 } 375 376 void NodeController::RequestShutdown(const base::Closure& callback) { 377 { 378 base::AutoLock lock(shutdown_lock_); 379 shutdown_callback_ = callback; 380 shutdown_callback_flag_.Set(true); 381 } 382 383 AttemptShutdownIfRequested(); 384 } 385 386 void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node, 387 const std::string& error) { 388 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node); 389 if (peer) 390 peer->NotifyBadMessage(error); 391 } 392 393 void NodeController::ConnectToChildOnIOThread( 394 base::ProcessHandle process_handle, 395 ConnectionParams connection_params, 396 ports::NodeName token, 397 const ProcessErrorCallback& process_error_callback) { 398 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 399 400 #if !defined(OS_MACOSX) && !defined(OS_NACL) 401 PlatformChannelPair node_channel; 402 ScopedPlatformHandle server_handle = node_channel.PassServerHandle(); 403 // BrokerHost owns itself. 404 BrokerHost* broker_host = 405 new BrokerHost(process_handle, connection_params.TakeChannelHandle()); 406 bool channel_ok = broker_host->SendChannel(node_channel.PassClientHandle()); 407 408 #if defined(OS_WIN) 409 if (!channel_ok) { 410 // On Windows the above operation may fail if the channel is crossing a 411 // session boundary. In that case we fall back to a named pipe. 412 NamedPlatformChannelPair named_channel; 413 server_handle = named_channel.PassServerHandle(); 414 broker_host->SendNamedChannel(named_channel.handle().name); 415 } 416 #else 417 CHECK(channel_ok); 418 #endif // defined(OS_WIN) 419 420 scoped_refptr<NodeChannel> channel = 421 NodeChannel::Create(this, ConnectionParams(std::move(server_handle)), 422 io_task_runner_, process_error_callback); 423 424 #else // !defined(OS_MACOSX) && !defined(OS_NACL) 425 scoped_refptr<NodeChannel> channel = 426 NodeChannel::Create(this, std::move(connection_params), io_task_runner_, 427 process_error_callback); 428 #endif // !defined(OS_MACOSX) && !defined(OS_NACL) 429 430 // We set up the child channel with a temporary name so it can be identified 431 // as a pending child if it writes any messages to the channel. We may start 432 // receiving messages from it (though we shouldn't) as soon as Start() is 433 // called below. 434 435 pending_children_.insert(std::make_pair(token, channel)); 436 RecordPendingChildCount(pending_children_.size()); 437 438 channel->SetRemoteNodeName(token); 439 channel->SetRemoteProcessHandle(process_handle); 440 channel->Start(); 441 442 channel->AcceptChild(name_, token); 443 } 444 445 void NodeController::ConnectToParentOnIOThread( 446 ConnectionParams connection_params) { 447 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 448 449 { 450 base::AutoLock lock(parent_lock_); 451 DCHECK(parent_name_ == ports::kInvalidNodeName); 452 453 // At this point we don't know the parent's name, so we can't yet insert it 454 // into our |peers_| map. That will happen as soon as we receive an 455 // AcceptChild message from them. 456 bootstrap_parent_channel_ = 457 NodeChannel::Create(this, std::move(connection_params), io_task_runner_, 458 ProcessErrorCallback()); 459 // Prevent the parent pipe handle from being closed on shutdown. Pipe 460 // closure is used by the parent to detect the child process has exited. 461 // Relying on message pipes to be closed is not enough because the parent 462 // may see the message pipe closure before the child is dead, causing the 463 // child process to be unexpectedly SIGKILL'd. 464 bootstrap_parent_channel_->LeakHandleOnShutdown(); 465 } 466 bootstrap_parent_channel_->Start(); 467 } 468 469 void NodeController::ConnectToPeerOnIOThread(ConnectionParams connection_params, 470 ports::NodeName token, 471 ports::PortRef port, 472 const std::string& peer_token) { 473 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 474 475 scoped_refptr<NodeChannel> channel = NodeChannel::Create( 476 this, std::move(connection_params), io_task_runner_, {}); 477 peer_connections_.insert( 478 {token, PeerConnection{channel, port, peer_token}}); 479 peers_by_token_.insert({peer_token, token}); 480 481 channel->SetRemoteNodeName(token); 482 channel->Start(); 483 484 channel->AcceptPeer(name_, token, port.name()); 485 } 486 487 void NodeController::ClosePeerConnectionOnIOThread( 488 const std::string& peer_token) { 489 RequestContext request_context(RequestContext::Source::SYSTEM); 490 auto peer = peers_by_token_.find(peer_token); 491 // The connection may already be closed. 492 if (peer == peers_by_token_.end()) 493 return; 494 495 // |peer| may be removed so make a copy of |name|. 496 ports::NodeName name = peer->second; 497 DropPeer(name, nullptr); 498 } 499 500 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( 501 const ports::NodeName& name) { 502 base::AutoLock lock(peers_lock_); 503 auto it = peers_.find(name); 504 if (it == peers_.end()) 505 return nullptr; 506 return it->second; 507 } 508 509 scoped_refptr<NodeChannel> NodeController::GetParentChannel() { 510 ports::NodeName parent_name; 511 { 512 base::AutoLock lock(parent_lock_); 513 parent_name = parent_name_; 514 } 515 return GetPeerChannel(parent_name); 516 } 517 518 scoped_refptr<NodeChannel> NodeController::GetBrokerChannel() { 519 ports::NodeName broker_name; 520 { 521 base::AutoLock lock(broker_lock_); 522 broker_name = broker_name_; 523 } 524 return GetPeerChannel(broker_name); 525 } 526 527 void NodeController::AddPeer(const ports::NodeName& name, 528 scoped_refptr<NodeChannel> channel, 529 bool start_channel) { 530 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 531 532 DCHECK(name != ports::kInvalidNodeName); 533 DCHECK(channel); 534 535 channel->SetRemoteNodeName(name); 536 537 OutgoingMessageQueue pending_messages; 538 { 539 base::AutoLock lock(peers_lock_); 540 if (peers_.find(name) != peers_.end()) { 541 // This can happen normally if two nodes race to be introduced to each 542 // other. The losing pipe will be silently closed and introduction should 543 // not be affected. 544 DVLOG(1) << "Ignoring duplicate peer name " << name; 545 return; 546 } 547 548 auto result = peers_.insert(std::make_pair(name, channel)); 549 DCHECK(result.second); 550 551 DVLOG(2) << "Accepting new peer " << name << " on node " << name_; 552 553 RecordPeerCount(peers_.size()); 554 555 auto it = pending_peer_messages_.find(name); 556 if (it != pending_peer_messages_.end()) { 557 std::swap(pending_messages, it->second); 558 pending_peer_messages_.erase(it); 559 } 560 } 561 562 if (start_channel) 563 channel->Start(); 564 565 // Flush any queued message we need to deliver to this node. 566 while (!pending_messages.empty()) { 567 channel->PortsMessage(std::move(pending_messages.front())); 568 pending_messages.pop(); 569 } 570 } 571 572 void NodeController::DropPeer(const ports::NodeName& name, 573 NodeChannel* channel) { 574 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 575 576 { 577 base::AutoLock lock(peers_lock_); 578 auto it = peers_.find(name); 579 580 if (it != peers_.end()) { 581 ports::NodeName peer = it->first; 582 peers_.erase(it); 583 DVLOG(1) << "Dropped peer " << peer; 584 } 585 586 pending_peer_messages_.erase(name); 587 pending_children_.erase(name); 588 589 RecordPeerCount(peers_.size()); 590 RecordPendingChildCount(pending_children_.size()); 591 } 592 593 std::vector<ports::PortRef> ports_to_close; 594 { 595 // Clean up any reserved ports. 596 base::AutoLock lock(reserved_ports_lock_); 597 auto it = pending_child_tokens_.find(name); 598 if (it != pending_child_tokens_.end()) { 599 const std::string& child_token = it->second; 600 601 std::vector<std::string> port_tokens; 602 for (const auto& port : reserved_ports_) { 603 if (port.second.child_token == child_token) { 604 DVLOG(1) << "Closing reserved port: " << port.second.port.name(); 605 ports_to_close.push_back(port.second.port); 606 port_tokens.push_back(port.first); 607 } 608 } 609 610 // We have to erase reserved ports in a two-step manner because the usual 611 // manner of using the returned iterator from map::erase isn't technically 612 // valid in C++11 (although it is in C++14). 613 for (const auto& token : port_tokens) 614 reserved_ports_.erase(token); 615 616 pending_child_tokens_.erase(it); 617 } 618 } 619 620 bool is_parent; 621 { 622 base::AutoLock lock(parent_lock_); 623 is_parent = (name == parent_name_ || channel == bootstrap_parent_channel_); 624 } 625 626 // If the error comes from the parent channel, we also need to cancel any 627 // port merge requests, so that errors can be propagated to the message 628 // pipes. 629 if (is_parent) 630 CancelPendingPortMerges(); 631 632 auto peer = peer_connections_.find(name); 633 if (peer != peer_connections_.end()) { 634 peers_by_token_.erase(peer->second.peer_token); 635 ports_to_close.push_back(peer->second.local_port); 636 peer_connections_.erase(peer); 637 } 638 639 for (const auto& port : ports_to_close) 640 node_->ClosePort(port); 641 642 node_->LostConnectionToNode(name); 643 644 AcceptIncomingMessages(); 645 } 646 647 void NodeController::SendPeerMessage(const ports::NodeName& name, 648 ports::ScopedMessage message) { 649 Channel::MessagePtr channel_message = 650 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); 651 652 scoped_refptr<NodeChannel> peer = GetPeerChannel(name); 653 #if defined(OS_WIN) 654 if (channel_message->has_handles()) { 655 // If we're sending a message with handles we aren't the destination 656 // node's parent or broker (i.e. we don't know its process handle), ask 657 // the broker to relay for us. 658 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 659 if (!peer || !peer->HasRemoteProcessHandle()) { 660 if (broker) { 661 broker->RelayPortsMessage(name, std::move(channel_message)); 662 } else { 663 base::AutoLock lock(broker_lock_); 664 pending_relay_messages_[name].emplace(std::move(channel_message)); 665 } 666 return; 667 } 668 } 669 #elif defined(OS_MACOSX) && !defined(OS_IOS) 670 if (channel_message->has_mach_ports()) { 671 // Messages containing Mach ports are always routed through the broker, even 672 // if the broker process is the intended recipient. 673 bool use_broker = false; 674 { 675 base::AutoLock lock(parent_lock_); 676 use_broker = (bootstrap_parent_channel_ || 677 parent_name_ != ports::kInvalidNodeName); 678 } 679 if (use_broker) { 680 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 681 if (broker) { 682 broker->RelayPortsMessage(name, std::move(channel_message)); 683 } else { 684 base::AutoLock lock(broker_lock_); 685 pending_relay_messages_[name].emplace(std::move(channel_message)); 686 } 687 return; 688 } 689 } 690 #endif // defined(OS_WIN) 691 692 if (peer) { 693 peer->PortsMessage(std::move(channel_message)); 694 return; 695 } 696 697 // If we don't know who the peer is, queue the message for delivery. If this 698 // is the first message queued for the peer, we also ask the broker to 699 // introduce us to them. 700 701 bool needs_introduction = false; 702 { 703 base::AutoLock lock(peers_lock_); 704 auto& queue = pending_peer_messages_[name]; 705 needs_introduction = queue.empty(); 706 queue.emplace(std::move(channel_message)); 707 } 708 709 if (needs_introduction) { 710 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 711 if (!broker) { 712 DVLOG(1) << "Dropping message for unknown peer: " << name; 713 return; 714 } 715 broker->RequestIntroduction(name); 716 } 717 } 718 719 void NodeController::AcceptIncomingMessages() { 720 // This is an impactically large value which should never be reached in 721 // practice. See the CHECK below for usage. 722 constexpr size_t kMaxAcceptedMessages = 1000000; 723 724 size_t num_messages_accepted = 0; 725 while (incoming_messages_flag_) { 726 // TODO: We may need to be more careful to avoid starving the rest of the 727 // thread here. Revisit this if it turns out to be a problem. One 728 // alternative would be to schedule a task to continue pumping messages 729 // after flushing once. 730 731 messages_lock_.Acquire(); 732 if (incoming_messages_.empty()) { 733 messages_lock_.Release(); 734 break; 735 } 736 737 // libstdc++'s deque creates an internal buffer on construction, even when 738 // the size is 0. So avoid creating it until it is necessary. 739 std::queue<ports::ScopedMessage> messages; 740 std::swap(messages, incoming_messages_); 741 incoming_messages_flag_.Set(false); 742 messages_lock_.Release(); 743 744 num_messages_accepted += messages.size(); 745 while (!messages.empty()) { 746 node_->AcceptMessage(std::move(messages.front())); 747 messages.pop(); 748 } 749 750 // This is effectively a safeguard against potential bugs which might lead 751 // to runaway message cycles. If any such cycles arise, we'll start seeing 752 // crash reports from this location. 753 CHECK_LE(num_messages_accepted, kMaxAcceptedMessages); 754 } 755 756 if (num_messages_accepted >= 4) { 757 // Note: We avoid logging this histogram for the vast majority of cases. 758 // See https://crbug.com/685763 for more context. 759 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.MessagesAcceptedPerEvent", 760 static_cast<int32_t>(num_messages_accepted), 761 1 /* min */, 762 500 /* max */, 763 50 /* bucket count */); 764 } 765 766 AttemptShutdownIfRequested(); 767 } 768 769 void NodeController::ProcessIncomingMessages() { 770 RequestContext request_context(RequestContext::Source::SYSTEM); 771 772 { 773 base::AutoLock lock(messages_lock_); 774 // Allow a new incoming messages processing task to be posted. This can't be 775 // done after AcceptIncomingMessages() otherwise a message might be missed. 776 // Doing it here may result in at most two tasks existing at the same time; 777 // this running one, and one pending in the task runner. 778 incoming_messages_task_posted_ = false; 779 } 780 781 AcceptIncomingMessages(); 782 } 783 784 void NodeController::DropAllPeers() { 785 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 786 787 std::vector<scoped_refptr<NodeChannel>> all_peers; 788 { 789 base::AutoLock lock(parent_lock_); 790 if (bootstrap_parent_channel_) { 791 // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its 792 // existence to determine whether or not this is the root node. Once 793 // bootstrap_parent_channel_->ShutDown() has been called, 794 // |bootstrap_parent_channel_| is essentially a dead object and it doesn't 795 // matter if it's deleted now or when |this| is deleted. 796 // Note: |bootstrap_parent_channel_| is only modified on the IO thread. 797 all_peers.push_back(bootstrap_parent_channel_); 798 } 799 } 800 801 { 802 base::AutoLock lock(peers_lock_); 803 for (const auto& peer : peers_) 804 all_peers.push_back(peer.second); 805 for (const auto& peer : pending_children_) 806 all_peers.push_back(peer.second); 807 peers_.clear(); 808 pending_children_.clear(); 809 pending_peer_messages_.clear(); 810 peer_connections_.clear(); 811 } 812 813 for (const auto& peer : all_peers) 814 peer->ShutDown(); 815 816 if (destroy_on_io_thread_shutdown_) 817 delete this; 818 } 819 820 void NodeController::GenerateRandomPortName(ports::PortName* port_name) { 821 GenerateRandomName(port_name); 822 } 823 824 void NodeController::AllocMessage(size_t num_header_bytes, 825 ports::ScopedMessage* message) { 826 message->reset(new PortsMessage(num_header_bytes, 0, 0, nullptr)); 827 } 828 829 void NodeController::ForwardMessage(const ports::NodeName& node, 830 ports::ScopedMessage message) { 831 DCHECK(message); 832 bool schedule_pump_task = false; 833 if (node == name_) { 834 // NOTE: We need to avoid re-entering the Node instance within 835 // ForwardMessage. Because ForwardMessage is only ever called 836 // (synchronously) in response to Node's ClosePort, SendMessage, or 837 // AcceptMessage, we flush the queue after calling any of those methods. 838 base::AutoLock lock(messages_lock_); 839 // |io_task_runner_| may be null in tests or processes that don't require 840 // multi-process Mojo. 841 schedule_pump_task = incoming_messages_.empty() && io_task_runner_ && 842 !incoming_messages_task_posted_; 843 incoming_messages_task_posted_ |= schedule_pump_task; 844 incoming_messages_.emplace(std::move(message)); 845 incoming_messages_flag_.Set(true); 846 } else { 847 SendPeerMessage(node, std::move(message)); 848 } 849 850 if (schedule_pump_task) { 851 // Normally, the queue is processed after the action that added the local 852 // message is done (i.e. SendMessage, ClosePort, etc). However, it's also 853 // possible for a local message to be added as a result of a remote message, 854 // and OnChannelMessage() doesn't process this queue (although 855 // OnPortsMessage() does). There may also be other code paths, now or added 856 // in the future, which cause local messages to be added but don't process 857 // this message queue. 858 // 859 // Instead of adding a call to AcceptIncomingMessages() on every possible 860 // code path, post a task to the IO thread to process the queue. If the 861 // current call stack processes the queue, this may end up doing nothing. 862 io_task_runner_->PostTask( 863 FROM_HERE, 864 base::Bind(&NodeController::ProcessIncomingMessages, 865 base::Unretained(this))); 866 } 867 } 868 869 void NodeController::BroadcastMessage(ports::ScopedMessage message) { 870 CHECK_EQ(message->num_ports(), 0u); 871 Channel::MessagePtr channel_message = 872 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); 873 CHECK(!channel_message->has_handles()); 874 875 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 876 if (broker) 877 broker->Broadcast(std::move(channel_message)); 878 else 879 OnBroadcast(name_, std::move(channel_message)); 880 } 881 882 void NodeController::PortStatusChanged(const ports::PortRef& port) { 883 scoped_refptr<ports::UserData> user_data; 884 node_->GetUserData(port, &user_data); 885 886 PortObserver* observer = static_cast<PortObserver*>(user_data.get()); 887 if (observer) { 888 observer->OnPortStatusChanged(); 889 } else { 890 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " 891 << "doesn't have an observer."; 892 } 893 } 894 895 void NodeController::OnAcceptChild(const ports::NodeName& from_node, 896 const ports::NodeName& parent_name, 897 const ports::NodeName& token) { 898 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 899 900 scoped_refptr<NodeChannel> parent; 901 { 902 base::AutoLock lock(parent_lock_); 903 if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) { 904 parent_name_ = parent_name; 905 parent = bootstrap_parent_channel_; 906 } 907 } 908 909 if (!parent) { 910 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node; 911 DropPeer(from_node, nullptr); 912 return; 913 } 914 915 parent->SetRemoteNodeName(parent_name); 916 parent->AcceptParent(token, name_); 917 918 // NOTE: The child does not actually add its parent as a peer until 919 // receiving an AcceptBrokerClient message from the broker. The parent 920 // will request that said message be sent upon receiving AcceptParent. 921 922 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; 923 } 924 925 void NodeController::OnAcceptParent(const ports::NodeName& from_node, 926 const ports::NodeName& token, 927 const ports::NodeName& child_name) { 928 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 929 930 auto it = pending_children_.find(from_node); 931 if (it == pending_children_.end() || token != from_node) { 932 DLOG(ERROR) << "Received unexpected AcceptParent message from " 933 << from_node; 934 DropPeer(from_node, nullptr); 935 return; 936 } 937 938 scoped_refptr<NodeChannel> channel = it->second; 939 pending_children_.erase(it); 940 941 DCHECK(channel); 942 943 DVLOG(1) << "Parent " << name_ << " accepted child " << child_name; 944 945 AddPeer(child_name, channel, false /* start_channel */); 946 947 // TODO(rockot/amistry): We could simplify child initialization if we could 948 // synchronously get a new async broker channel from the broker. For now we do 949 // it asynchronously since it's only used to facilitate handle passing, not 950 // handle creation. 951 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 952 if (broker) { 953 // Inform the broker of this new child. 954 broker->AddBrokerClient(child_name, channel->CopyRemoteProcessHandle()); 955 } else { 956 // If we have no broker, either we need to wait for one, or we *are* the 957 // broker. 958 scoped_refptr<NodeChannel> parent = GetParentChannel(); 959 if (!parent) { 960 base::AutoLock lock(parent_lock_); 961 parent = bootstrap_parent_channel_; 962 } 963 964 if (!parent) { 965 // Yes, we're the broker. We can initialize the child directly. 966 channel->AcceptBrokerClient(name_, ScopedPlatformHandle()); 967 } else { 968 // We aren't the broker, so wait for a broker connection. 969 base::AutoLock lock(broker_lock_); 970 pending_broker_clients_.push(child_name); 971 } 972 } 973 } 974 975 void NodeController::OnAddBrokerClient(const ports::NodeName& from_node, 976 const ports::NodeName& client_name, 977 base::ProcessHandle process_handle) { 978 #if defined(OS_WIN) 979 // Scoped handle to avoid leaks on error. 980 ScopedPlatformHandle scoped_process_handle = 981 ScopedPlatformHandle(PlatformHandle(process_handle)); 982 #endif 983 scoped_refptr<NodeChannel> sender = GetPeerChannel(from_node); 984 if (!sender) { 985 DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender."; 986 return; 987 } 988 989 if (GetPeerChannel(client_name)) { 990 DLOG(ERROR) << "Ignoring AddBrokerClient for known client."; 991 DropPeer(from_node, nullptr); 992 return; 993 } 994 995 PlatformChannelPair broker_channel; 996 ConnectionParams connection_params(broker_channel.PassServerHandle()); 997 scoped_refptr<NodeChannel> client = 998 NodeChannel::Create(this, std::move(connection_params), io_task_runner_, 999 ProcessErrorCallback()); 1000 1001 #if defined(OS_WIN) 1002 // The broker must have a working handle to the client process in order to 1003 // properly copy other handles to and from the client. 1004 if (!scoped_process_handle.is_valid()) { 1005 DLOG(ERROR) << "Broker rejecting client with invalid process handle."; 1006 return; 1007 } 1008 client->SetRemoteProcessHandle(scoped_process_handle.release().handle); 1009 #else 1010 client->SetRemoteProcessHandle(process_handle); 1011 #endif 1012 1013 AddPeer(client_name, client, true /* start_channel */); 1014 1015 DVLOG(1) << "Broker " << name_ << " accepting client " << client_name 1016 << " from peer " << from_node; 1017 1018 sender->BrokerClientAdded(client_name, broker_channel.PassClientHandle()); 1019 } 1020 1021 void NodeController::OnBrokerClientAdded(const ports::NodeName& from_node, 1022 const ports::NodeName& client_name, 1023 ScopedPlatformHandle broker_channel) { 1024 scoped_refptr<NodeChannel> client = GetPeerChannel(client_name); 1025 if (!client) { 1026 DLOG(ERROR) << "BrokerClientAdded for unknown child " << client_name; 1027 return; 1028 } 1029 1030 // This should have come from our own broker. 1031 if (GetBrokerChannel() != GetPeerChannel(from_node)) { 1032 DLOG(ERROR) << "BrokerClientAdded from non-broker node " << from_node; 1033 return; 1034 } 1035 1036 DVLOG(1) << "Child " << client_name << " accepted by broker " << from_node; 1037 1038 client->AcceptBrokerClient(from_node, std::move(broker_channel)); 1039 } 1040 1041 void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node, 1042 const ports::NodeName& broker_name, 1043 ScopedPlatformHandle broker_channel) { 1044 // This node should already have a parent in bootstrap mode. 1045 ports::NodeName parent_name; 1046 scoped_refptr<NodeChannel> parent; 1047 { 1048 base::AutoLock lock(parent_lock_); 1049 parent_name = parent_name_; 1050 parent = bootstrap_parent_channel_; 1051 bootstrap_parent_channel_ = nullptr; 1052 } 1053 DCHECK(parent_name == from_node); 1054 DCHECK(parent); 1055 1056 std::queue<ports::NodeName> pending_broker_clients; 1057 std::unordered_map<ports::NodeName, OutgoingMessageQueue> 1058 pending_relay_messages; 1059 { 1060 base::AutoLock lock(broker_lock_); 1061 broker_name_ = broker_name; 1062 std::swap(pending_broker_clients, pending_broker_clients_); 1063 std::swap(pending_relay_messages, pending_relay_messages_); 1064 } 1065 DCHECK(broker_name != ports::kInvalidNodeName); 1066 1067 // It's now possible to add both the broker and the parent as peers. 1068 // Note that the broker and parent may be the same node. 1069 scoped_refptr<NodeChannel> broker; 1070 if (broker_name == parent_name) { 1071 DCHECK(!broker_channel.is_valid()); 1072 broker = parent; 1073 } else { 1074 DCHECK(broker_channel.is_valid()); 1075 broker = 1076 NodeChannel::Create(this, ConnectionParams(std::move(broker_channel)), 1077 io_task_runner_, ProcessErrorCallback()); 1078 AddPeer(broker_name, broker, true /* start_channel */); 1079 } 1080 1081 AddPeer(parent_name, parent, false /* start_channel */); 1082 1083 { 1084 // Complete any port merge requests we have waiting for the parent. 1085 base::AutoLock lock(pending_port_merges_lock_); 1086 for (const auto& request : pending_port_merges_) 1087 parent->RequestPortMerge(request.second.name(), request.first); 1088 pending_port_merges_.clear(); 1089 } 1090 1091 // Feed the broker any pending children of our own. 1092 while (!pending_broker_clients.empty()) { 1093 const ports::NodeName& child_name = pending_broker_clients.front(); 1094 auto it = pending_children_.find(child_name); 1095 DCHECK(it != pending_children_.end()); 1096 broker->AddBrokerClient(child_name, it->second->CopyRemoteProcessHandle()); 1097 pending_broker_clients.pop(); 1098 } 1099 1100 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 1101 // Have the broker relay any messages we have waiting. 1102 for (auto& entry : pending_relay_messages) { 1103 const ports::NodeName& destination = entry.first; 1104 auto& message_queue = entry.second; 1105 while (!message_queue.empty()) { 1106 broker->RelayPortsMessage(destination, std::move(message_queue.front())); 1107 message_queue.pop(); 1108 } 1109 } 1110 #endif 1111 1112 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name; 1113 } 1114 1115 void NodeController::OnPortsMessage(const ports::NodeName& from_node, 1116 Channel::MessagePtr channel_message) { 1117 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1118 1119 void* data; 1120 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; 1121 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes, 1122 &num_header_bytes, &num_payload_bytes, 1123 &num_ports_bytes)) { 1124 DropPeer(from_node, nullptr); 1125 return; 1126 } 1127 1128 CHECK(channel_message); 1129 std::unique_ptr<PortsMessage> ports_message( 1130 new PortsMessage(num_header_bytes, 1131 num_payload_bytes, 1132 num_ports_bytes, 1133 std::move(channel_message))); 1134 ports_message->set_source_node(from_node); 1135 node_->AcceptMessage(ports::ScopedMessage(ports_message.release())); 1136 AcceptIncomingMessages(); 1137 } 1138 1139 void NodeController::OnRequestPortMerge( 1140 const ports::NodeName& from_node, 1141 const ports::PortName& connector_port_name, 1142 const std::string& token) { 1143 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1144 1145 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token " 1146 << token << " and port " << connector_port_name << "@" << from_node; 1147 1148 ports::PortRef local_port; 1149 { 1150 base::AutoLock lock(reserved_ports_lock_); 1151 auto it = reserved_ports_.find(token); 1152 if (it == reserved_ports_.end()) { 1153 DVLOG(1) << "Ignoring request to connect to port for unknown token " 1154 << token; 1155 return; 1156 } 1157 local_port = it->second.port; 1158 } 1159 1160 int rv = node_->MergePorts(local_port, from_node, connector_port_name); 1161 if (rv != ports::OK) 1162 DLOG(ERROR) << "MergePorts failed: " << rv; 1163 1164 AcceptIncomingMessages(); 1165 } 1166 1167 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, 1168 const ports::NodeName& name) { 1169 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1170 1171 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); 1172 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { 1173 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " 1174 << from_node; 1175 DropPeer(from_node, nullptr); 1176 return; 1177 } 1178 1179 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name); 1180 if (!new_friend) { 1181 // We don't know who they're talking about! 1182 requestor->Introduce(name, ScopedPlatformHandle()); 1183 } else { 1184 PlatformChannelPair new_channel; 1185 requestor->Introduce(name, new_channel.PassServerHandle()); 1186 new_friend->Introduce(from_node, new_channel.PassClientHandle()); 1187 } 1188 } 1189 1190 void NodeController::OnIntroduce(const ports::NodeName& from_node, 1191 const ports::NodeName& name, 1192 ScopedPlatformHandle channel_handle) { 1193 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1194 1195 if (!channel_handle.is_valid()) { 1196 node_->LostConnectionToNode(name); 1197 1198 DVLOG(1) << "Could not be introduced to peer " << name; 1199 base::AutoLock lock(peers_lock_); 1200 pending_peer_messages_.erase(name); 1201 return; 1202 } 1203 1204 scoped_refptr<NodeChannel> channel = 1205 NodeChannel::Create(this, ConnectionParams(std::move(channel_handle)), 1206 io_task_runner_, ProcessErrorCallback()); 1207 1208 DVLOG(1) << "Adding new peer " << name << " via parent introduction."; 1209 AddPeer(name, channel, true /* start_channel */); 1210 } 1211 1212 void NodeController::OnBroadcast(const ports::NodeName& from_node, 1213 Channel::MessagePtr message) { 1214 DCHECK(!message->has_handles()); 1215 1216 void* data; 1217 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; 1218 if (!ParsePortsMessage(message.get(), &data, &num_data_bytes, 1219 &num_header_bytes, &num_payload_bytes, 1220 &num_ports_bytes)) { 1221 DropPeer(from_node, nullptr); 1222 return; 1223 } 1224 1225 // Broadcast messages must not contain ports. 1226 if (num_ports_bytes > 0) { 1227 DropPeer(from_node, nullptr); 1228 return; 1229 } 1230 1231 base::AutoLock lock(peers_lock_); 1232 for (auto& iter : peers_) { 1233 // Copy and send the message to each known peer. 1234 Channel::MessagePtr peer_message( 1235 new Channel::Message(message->payload_size(), 0)); 1236 memcpy(peer_message->mutable_payload(), message->payload(), 1237 message->payload_size()); 1238 iter.second->PortsMessage(std::move(peer_message)); 1239 } 1240 } 1241 1242 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 1243 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, 1244 base::ProcessHandle from_process, 1245 const ports::NodeName& destination, 1246 Channel::MessagePtr message) { 1247 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1248 1249 if (GetBrokerChannel()) { 1250 // Only the broker should be asked to relay a message. 1251 LOG(ERROR) << "Non-broker refusing to relay message."; 1252 DropPeer(from_node, nullptr); 1253 return; 1254 } 1255 1256 // The parent should always know which process this came from. 1257 DCHECK(from_process != base::kNullProcessHandle); 1258 1259 #if defined(OS_WIN) 1260 // Rewrite the handles to this (the parent) process. If the message is 1261 // destined for another child process, the handles will be rewritten to that 1262 // process before going out (see NodeChannel::WriteChannelMessage). 1263 // 1264 // TODO: We could avoid double-duplication. 1265 // 1266 // Note that we explicitly mark the handles as being owned by the sending 1267 // process before rewriting them, in order to accommodate RewriteHandles' 1268 // internal sanity checks. 1269 ScopedPlatformHandleVectorPtr handles = message->TakeHandles(); 1270 for (size_t i = 0; i < handles->size(); ++i) 1271 (*handles)[i].owning_process = from_process; 1272 if (!Channel::Message::RewriteHandles(from_process, 1273 base::GetCurrentProcessHandle(), 1274 handles.get())) { 1275 DLOG(ERROR) << "Failed to relay one or more handles."; 1276 } 1277 message->SetHandles(std::move(handles)); 1278 #else 1279 MachPortRelay* relay = GetMachPortRelay(); 1280 if (!relay) { 1281 LOG(ERROR) << "Receiving Mach ports without a port relay from " 1282 << from_node << ". Dropping message."; 1283 return; 1284 } 1285 if (!relay->ExtractPortRights(message.get(), from_process)) { 1286 // NodeChannel should ensure that MachPortRelay is ready for the remote 1287 // process. At this point, if the port extraction failed, either something 1288 // went wrong in the mach stuff, or the remote process died. 1289 LOG(ERROR) << "Error on receiving Mach ports " << from_node 1290 << ". Dropping message."; 1291 return; 1292 } 1293 #endif // defined(OS_WIN) 1294 1295 if (destination == name_) { 1296 // Great, we can deliver this message locally. 1297 OnPortsMessage(from_node, std::move(message)); 1298 return; 1299 } 1300 1301 scoped_refptr<NodeChannel> peer = GetPeerChannel(destination); 1302 if (peer) 1303 peer->PortsMessageFromRelay(from_node, std::move(message)); 1304 else 1305 DLOG(ERROR) << "Dropping relay message for unknown node " << destination; 1306 } 1307 1308 void NodeController::OnPortsMessageFromRelay(const ports::NodeName& from_node, 1309 const ports::NodeName& source_node, 1310 Channel::MessagePtr message) { 1311 if (GetPeerChannel(from_node) != GetBrokerChannel()) { 1312 LOG(ERROR) << "Refusing relayed message from non-broker node."; 1313 DropPeer(from_node, nullptr); 1314 return; 1315 } 1316 1317 OnPortsMessage(source_node, std::move(message)); 1318 } 1319 #endif 1320 1321 void NodeController::OnAcceptPeer(const ports::NodeName& from_node, 1322 const ports::NodeName& token, 1323 const ports::NodeName& peer_name, 1324 const ports::PortName& port_name) { 1325 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1326 1327 auto it = peer_connections_.find(from_node); 1328 if (it == peer_connections_.end()) { 1329 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node; 1330 DropPeer(from_node, nullptr); 1331 return; 1332 } 1333 1334 scoped_refptr<NodeChannel> channel = std::move(it->second.channel); 1335 ports::PortRef local_port = it->second.local_port; 1336 std::string peer_token = std::move(it->second.peer_token); 1337 peer_connections_.erase(it); 1338 DCHECK(channel); 1339 1340 // If the peer connection is a self connection (which is used in tests), 1341 // drop the channel to it and skip straight to merging the ports. 1342 if (name_ == peer_name) { 1343 peers_by_token_.erase(peer_token); 1344 } else { 1345 peers_by_token_[peer_token] = peer_name; 1346 peer_connections_.insert( 1347 {peer_name, PeerConnection{nullptr, local_port, peer_token}}); 1348 DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name; 1349 1350 AddPeer(peer_name, channel, false /* start_channel */); 1351 } 1352 1353 // We need to choose one side to initiate the port merge. It doesn't matter 1354 // who does it as long as they don't both try. Simple solution: pick the one 1355 // with the "smaller" port name. 1356 if (local_port.name() < port_name) { 1357 node()->MergePorts(local_port, peer_name, port_name); 1358 } 1359 } 1360 1361 void NodeController::OnChannelError(const ports::NodeName& from_node, 1362 NodeChannel* channel) { 1363 if (io_task_runner_->RunsTasksOnCurrentThread()) { 1364 DropPeer(from_node, channel); 1365 // DropPeer may have caused local port closures, so be sure to process any 1366 // pending local messages. 1367 AcceptIncomingMessages(); 1368 } else { 1369 io_task_runner_->PostTask( 1370 FROM_HERE, 1371 base::Bind(&NodeController::OnChannelError, base::Unretained(this), 1372 from_node, channel)); 1373 } 1374 } 1375 1376 #if defined(OS_MACOSX) && !defined(OS_IOS) 1377 MachPortRelay* NodeController::GetMachPortRelay() { 1378 { 1379 base::AutoLock lock(parent_lock_); 1380 // Return null if we're not the root. 1381 if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) 1382 return nullptr; 1383 } 1384 1385 base::AutoLock lock(mach_port_relay_lock_); 1386 return mach_port_relay_.get(); 1387 } 1388 #endif 1389 1390 void NodeController::CancelPendingPortMerges() { 1391 std::vector<ports::PortRef> ports_to_close; 1392 1393 { 1394 base::AutoLock lock(pending_port_merges_lock_); 1395 reject_pending_merges_ = true; 1396 for (const auto& port : pending_port_merges_) 1397 ports_to_close.push_back(port.second); 1398 pending_port_merges_.clear(); 1399 } 1400 1401 for (const auto& port : ports_to_close) 1402 node_->ClosePort(port); 1403 } 1404 1405 void NodeController::DestroyOnIOThreadShutdown() { 1406 destroy_on_io_thread_shutdown_ = true; 1407 } 1408 1409 void NodeController::AttemptShutdownIfRequested() { 1410 if (!shutdown_callback_flag_) 1411 return; 1412 1413 base::Closure callback; 1414 { 1415 base::AutoLock lock(shutdown_lock_); 1416 if (shutdown_callback_.is_null()) 1417 return; 1418 if (!node_->CanShutdownCleanly( 1419 ports::Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)) { 1420 DVLOG(2) << "Unable to cleanly shut down node " << name_; 1421 return; 1422 } 1423 1424 callback = shutdown_callback_; 1425 shutdown_callback_.Reset(); 1426 shutdown_callback_flag_.Set(false); 1427 } 1428 1429 DCHECK(!callback.is_null()); 1430 1431 callback.Run(); 1432 } 1433 1434 NodeController::PeerConnection::PeerConnection() = default; 1435 1436 NodeController::PeerConnection::PeerConnection( 1437 const PeerConnection& other) = default; 1438 1439 NodeController::PeerConnection::PeerConnection( 1440 PeerConnection&& other) = default; 1441 1442 NodeController::PeerConnection::PeerConnection( 1443 scoped_refptr<NodeChannel> channel, 1444 const ports::PortRef& local_port, 1445 const std::string& peer_token) 1446 : channel(std::move(channel)), 1447 local_port(local_port), 1448 peer_token(peer_token) {} 1449 1450 NodeController::PeerConnection::~PeerConnection() = default; 1451 1452 NodeController::PeerConnection& NodeController::PeerConnection:: 1453 operator=(const PeerConnection& other) = default; 1454 1455 NodeController::PeerConnection& NodeController::PeerConnection:: 1456 operator=(PeerConnection&& other) = default; 1457 1458 } // namespace edk 1459 } // namespace mojo 1460