1 // Copyright 2016 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/edk/system/node_controller.h" 6 7 #include <algorithm> 8 #include <limits> 9 10 #include "base/bind.h" 11 #include "base/location.h" 12 #include "base/logging.h" 13 #include "base/macros.h" 14 #include "base/message_loop/message_loop.h" 15 #include "base/metrics/histogram_macros.h" 16 #include "base/process/process_handle.h" 17 #include "base/rand_util.h" 18 #include "base/time/time.h" 19 #include "base/timer/elapsed_timer.h" 20 #include "mojo/edk/embedder/embedder_internal.h" 21 #include "mojo/edk/embedder/platform_channel_pair.h" 22 #include "mojo/edk/system/broker.h" 23 #include "mojo/edk/system/broker_host.h" 24 #include "mojo/edk/system/core.h" 25 #include "mojo/edk/system/ports_message.h" 26 #include "mojo/edk/system/request_context.h" 27 28 #if defined(OS_MACOSX) && !defined(OS_IOS) 29 #include "mojo/edk/system/mach_port_relay.h" 30 #endif 31 32 #if !defined(OS_NACL) 33 #include "crypto/random.h" 34 #endif 35 36 namespace mojo { 37 namespace edk { 38 39 namespace { 40 41 #if defined(OS_NACL) 42 template <typename T> 43 void GenerateRandomName(T* out) { base::RandBytes(out, sizeof(T)); } 44 #else 45 template <typename T> 46 void GenerateRandomName(T* out) { crypto::RandBytes(out, sizeof(T)); } 47 #endif 48 49 ports::NodeName GetRandomNodeName() { 50 ports::NodeName name; 51 GenerateRandomName(&name); 52 return name; 53 } 54 55 void RecordPeerCount(size_t count) { 56 DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max())); 57 58 // 8k is the maximum number of file descriptors allowed in Chrome. 59 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.ConnectedPeers", 60 static_cast<int32_t>(count), 61 0 /* min */, 62 8000 /* max */, 63 50 /* bucket count */); 64 } 65 66 void RecordPendingChildCount(size_t count) { 67 DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max())); 68 69 // 8k is the maximum number of file descriptors allowed in Chrome. 70 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.PendingChildren", 71 static_cast<int32_t>(count), 72 0 /* min */, 73 8000 /* max */, 74 50 /* bucket count */); 75 } 76 77 bool ParsePortsMessage(Channel::Message* message, 78 void** data, 79 size_t* num_data_bytes, 80 size_t* num_header_bytes, 81 size_t* num_payload_bytes, 82 size_t* num_ports_bytes) { 83 DCHECK(data && num_data_bytes && num_header_bytes && num_payload_bytes && 84 num_ports_bytes); 85 86 NodeChannel::GetPortsMessageData(message, data, num_data_bytes); 87 if (!*num_data_bytes) 88 return false; 89 90 if (!ports::Message::Parse(*data, *num_data_bytes, num_header_bytes, 91 num_payload_bytes, num_ports_bytes)) { 92 return false; 93 } 94 95 return true; 96 } 97 98 // Used by NodeController to watch for shutdown. Since no IO can happen once 99 // the IO thread is killed, the NodeController can cleanly drop all its peers 100 // at that time. 101 class ThreadDestructionObserver : 102 public base::MessageLoop::DestructionObserver { 103 public: 104 static void Create(scoped_refptr<base::TaskRunner> task_runner, 105 const base::Closure& callback) { 106 if (task_runner->RunsTasksOnCurrentThread()) { 107 // Owns itself. 108 new ThreadDestructionObserver(callback); 109 } else { 110 task_runner->PostTask(FROM_HERE, 111 base::Bind(&Create, task_runner, callback)); 112 } 113 } 114 115 private: 116 explicit ThreadDestructionObserver(const base::Closure& callback) 117 : callback_(callback) { 118 base::MessageLoop::current()->AddDestructionObserver(this); 119 } 120 121 ~ThreadDestructionObserver() override { 122 base::MessageLoop::current()->RemoveDestructionObserver(this); 123 } 124 125 // base::MessageLoop::DestructionObserver: 126 void WillDestroyCurrentMessageLoop() override { 127 callback_.Run(); 128 delete this; 129 } 130 131 const base::Closure callback_; 132 133 DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver); 134 }; 135 136 } // namespace 137 138 NodeController::~NodeController() {} 139 140 NodeController::NodeController(Core* core) 141 : core_(core), 142 name_(GetRandomNodeName()), 143 node_(new ports::Node(name_, this)) { 144 DVLOG(1) << "Initializing node " << name_; 145 } 146 147 #if defined(OS_MACOSX) && !defined(OS_IOS) 148 void NodeController::CreateMachPortRelay( 149 base::PortProvider* port_provider) { 150 base::AutoLock lock(mach_port_relay_lock_); 151 DCHECK(!mach_port_relay_); 152 mach_port_relay_.reset(new MachPortRelay(port_provider)); 153 } 154 #endif 155 156 void NodeController::SetIOTaskRunner( 157 scoped_refptr<base::TaskRunner> task_runner) { 158 io_task_runner_ = task_runner; 159 ThreadDestructionObserver::Create( 160 io_task_runner_, 161 base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); 162 } 163 164 void NodeController::ConnectToChild( 165 base::ProcessHandle process_handle, 166 ScopedPlatformHandle platform_handle, 167 const std::string& child_token, 168 const ProcessErrorCallback& process_error_callback) { 169 // Generate the temporary remote node name here so that it can be associated 170 // with the embedder's child_token. If an error occurs in the child process 171 // after it is launched, but before any reserved ports are connected, this can 172 // be used to clean up any dangling ports. 173 ports::NodeName node_name; 174 GenerateRandomName(&node_name); 175 176 { 177 base::AutoLock lock(reserved_ports_lock_); 178 bool inserted = pending_child_tokens_.insert( 179 std::make_pair(node_name, child_token)).second; 180 DCHECK(inserted); 181 } 182 183 #if defined(OS_WIN) 184 // On Windows, we need to duplicate the process handle because we have no 185 // control over its lifetime and it may become invalid by the time the posted 186 // task runs. 187 HANDLE dup_handle = INVALID_HANDLE_VALUE; 188 BOOL ok = ::DuplicateHandle( 189 base::GetCurrentProcessHandle(), process_handle, 190 base::GetCurrentProcessHandle(), &dup_handle, 191 0, FALSE, DUPLICATE_SAME_ACCESS); 192 DPCHECK(ok); 193 process_handle = dup_handle; 194 #endif 195 196 io_task_runner_->PostTask( 197 FROM_HERE, 198 base::Bind(&NodeController::ConnectToChildOnIOThread, 199 base::Unretained(this), 200 process_handle, 201 base::Passed(&platform_handle), 202 node_name, 203 process_error_callback)); 204 } 205 206 void NodeController::CloseChildPorts(const std::string& child_token) { 207 std::vector<ports::PortRef> ports_to_close; 208 { 209 std::vector<std::string> port_tokens; 210 base::AutoLock lock(reserved_ports_lock_); 211 for (const auto& port : reserved_ports_) { 212 if (port.second.child_token == child_token) { 213 DVLOG(1) << "Closing reserved port " << port.second.port.name(); 214 ports_to_close.push_back(port.second.port); 215 port_tokens.push_back(port.first); 216 } 217 } 218 219 for (const auto& token : port_tokens) 220 reserved_ports_.erase(token); 221 } 222 223 for (const auto& port : ports_to_close) 224 node_->ClosePort(port); 225 226 // Ensure local port closure messages are processed. 227 AcceptIncomingMessages(); 228 } 229 230 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { 231 // TODO(amistry): Consider the need for a broker on Windows. 232 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL_SFI) 233 // On posix, use the bootstrap channel for the broker and receive the node's 234 // channel synchronously as the first message from the broker. 235 base::ElapsedTimer timer; 236 broker_.reset(new Broker(std::move(platform_handle))); 237 platform_handle = broker_->GetParentPlatformHandle(); 238 UMA_HISTOGRAM_TIMES("Mojo.System.GetParentPlatformHandleSyncTime", 239 timer.Elapsed()); 240 241 if (!platform_handle.is_valid()) { 242 // Most likely the browser side of the channel has already been closed and 243 // the broker was unable to negotiate a NodeChannel pipe. In this case we 244 // can cancel parent connection. 245 DVLOG(1) << "Cannot connect to invalid parent channel."; 246 return; 247 } 248 #endif 249 250 io_task_runner_->PostTask( 251 FROM_HERE, 252 base::Bind(&NodeController::ConnectToParentOnIOThread, 253 base::Unretained(this), 254 base::Passed(&platform_handle))); 255 } 256 257 void NodeController::SetPortObserver( 258 const ports::PortRef& port, 259 const scoped_refptr<PortObserver>& observer) { 260 node_->SetUserData(port, observer); 261 } 262 263 void NodeController::ClosePort(const ports::PortRef& port) { 264 SetPortObserver(port, nullptr); 265 int rv = node_->ClosePort(port); 266 DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name(); 267 268 AcceptIncomingMessages(); 269 } 270 271 int NodeController::SendMessage(const ports::PortRef& port, 272 std::unique_ptr<PortsMessage> message) { 273 ports::ScopedMessage ports_message(message.release()); 274 int rv = node_->SendMessage(port, std::move(ports_message)); 275 276 AcceptIncomingMessages(); 277 return rv; 278 } 279 280 void NodeController::ReservePort(const std::string& token, 281 const ports::PortRef& port, 282 const std::string& child_token) { 283 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " 284 << token; 285 286 base::AutoLock lock(reserved_ports_lock_); 287 auto result = reserved_ports_.insert( 288 std::make_pair(token, ReservedPort{port, child_token})); 289 DCHECK(result.second); 290 } 291 292 void NodeController::MergePortIntoParent(const std::string& token, 293 const ports::PortRef& port) { 294 bool was_merged = false; 295 { 296 // This request may be coming from within the process that reserved the 297 // "parent" side (e.g. for Chrome single-process mode), so if this token is 298 // reserved locally, merge locally instead. 299 base::AutoLock lock(reserved_ports_lock_); 300 auto it = reserved_ports_.find(token); 301 if (it != reserved_ports_.end()) { 302 node_->MergePorts(port, name_, it->second.port.name()); 303 reserved_ports_.erase(it); 304 was_merged = true; 305 } 306 } 307 if (was_merged) { 308 AcceptIncomingMessages(); 309 return; 310 } 311 312 scoped_refptr<NodeChannel> parent; 313 bool reject_merge = false; 314 { 315 // Hold |pending_port_merges_lock_| while getting |parent|. Otherwise, 316 // there is a race where the parent can be set, and |pending_port_merges_| 317 // be processed between retrieving |parent| and adding the merge to 318 // |pending_port_merges_|. 319 base::AutoLock lock(pending_port_merges_lock_); 320 parent = GetParentChannel(); 321 if (reject_pending_merges_) { 322 reject_merge = true; 323 } else if (!parent) { 324 pending_port_merges_.push_back(std::make_pair(token, port)); 325 return; 326 } 327 } 328 if (reject_merge) { 329 node_->ClosePort(port); 330 DVLOG(2) << "Rejecting port merge for token " << token 331 << " due to closed parent channel."; 332 AcceptIncomingMessages(); 333 return; 334 } 335 336 parent->RequestPortMerge(port.name(), token); 337 } 338 339 int NodeController::MergeLocalPorts(const ports::PortRef& port0, 340 const ports::PortRef& port1) { 341 int rv = node_->MergeLocalPorts(port0, port1); 342 AcceptIncomingMessages(); 343 return rv; 344 } 345 346 scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer( 347 size_t num_bytes) { 348 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL_SFI) 349 // Shared buffer creation failure is fatal, so always use the broker when we 350 // have one. This does mean that a non-root process that has children will use 351 // the broker for shared buffer creation even though that process is 352 // privileged. 353 if (broker_) { 354 return broker_->GetSharedBuffer(num_bytes); 355 } 356 #endif 357 return PlatformSharedBuffer::Create(num_bytes); 358 } 359 360 void NodeController::RequestShutdown(const base::Closure& callback) { 361 { 362 base::AutoLock lock(shutdown_lock_); 363 shutdown_callback_ = callback; 364 shutdown_callback_flag_.Set(true); 365 } 366 367 AttemptShutdownIfRequested(); 368 } 369 370 void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node, 371 const std::string& error) { 372 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node); 373 if (peer) 374 peer->NotifyBadMessage(error); 375 } 376 377 void NodeController::ConnectToChildOnIOThread( 378 base::ProcessHandle process_handle, 379 ScopedPlatformHandle platform_handle, 380 ports::NodeName token, 381 const ProcessErrorCallback& process_error_callback) { 382 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 383 384 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL) 385 PlatformChannelPair node_channel; 386 // BrokerHost owns itself. 387 BrokerHost* broker_host = new BrokerHost(std::move(platform_handle)); 388 broker_host->SendChannel(node_channel.PassClientHandle()); 389 scoped_refptr<NodeChannel> channel = NodeChannel::Create( 390 this, node_channel.PassServerHandle(), io_task_runner_, 391 process_error_callback); 392 #else 393 scoped_refptr<NodeChannel> channel = 394 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_, 395 process_error_callback); 396 #endif 397 398 // We set up the child channel with a temporary name so it can be identified 399 // as a pending child if it writes any messages to the channel. We may start 400 // receiving messages from it (though we shouldn't) as soon as Start() is 401 // called below. 402 403 pending_children_.insert(std::make_pair(token, channel)); 404 RecordPendingChildCount(pending_children_.size()); 405 406 channel->SetRemoteNodeName(token); 407 channel->SetRemoteProcessHandle(process_handle); 408 channel->Start(); 409 410 channel->AcceptChild(name_, token); 411 } 412 413 void NodeController::ConnectToParentOnIOThread( 414 ScopedPlatformHandle platform_handle) { 415 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 416 417 { 418 base::AutoLock lock(parent_lock_); 419 DCHECK(parent_name_ == ports::kInvalidNodeName); 420 421 // At this point we don't know the parent's name, so we can't yet insert it 422 // into our |peers_| map. That will happen as soon as we receive an 423 // AcceptChild message from them. 424 bootstrap_parent_channel_ = 425 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_, 426 ProcessErrorCallback()); 427 // Prevent the parent pipe handle from being closed on shutdown. Pipe 428 // closure is used by the parent to detect the child process has exited. 429 // Relying on message pipes to be closed is not enough because the parent 430 // may see the message pipe closure before the child is dead, causing the 431 // child process to be unexpectedly SIGKILL'd. 432 bootstrap_parent_channel_->LeakHandleOnShutdown(); 433 } 434 bootstrap_parent_channel_->Start(); 435 } 436 437 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( 438 const ports::NodeName& name) { 439 base::AutoLock lock(peers_lock_); 440 auto it = peers_.find(name); 441 if (it == peers_.end()) 442 return nullptr; 443 return it->second; 444 } 445 446 scoped_refptr<NodeChannel> NodeController::GetParentChannel() { 447 ports::NodeName parent_name; 448 { 449 base::AutoLock lock(parent_lock_); 450 parent_name = parent_name_; 451 } 452 return GetPeerChannel(parent_name); 453 } 454 455 scoped_refptr<NodeChannel> NodeController::GetBrokerChannel() { 456 ports::NodeName broker_name; 457 { 458 base::AutoLock lock(broker_lock_); 459 broker_name = broker_name_; 460 } 461 return GetPeerChannel(broker_name); 462 } 463 464 void NodeController::AddPeer(const ports::NodeName& name, 465 scoped_refptr<NodeChannel> channel, 466 bool start_channel) { 467 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 468 469 DCHECK(name != ports::kInvalidNodeName); 470 DCHECK(channel); 471 472 channel->SetRemoteNodeName(name); 473 474 OutgoingMessageQueue pending_messages; 475 { 476 base::AutoLock lock(peers_lock_); 477 if (peers_.find(name) != peers_.end()) { 478 // This can happen normally if two nodes race to be introduced to each 479 // other. The losing pipe will be silently closed and introduction should 480 // not be affected. 481 DVLOG(1) << "Ignoring duplicate peer name " << name; 482 return; 483 } 484 485 auto result = peers_.insert(std::make_pair(name, channel)); 486 DCHECK(result.second); 487 488 DVLOG(2) << "Accepting new peer " << name << " on node " << name_; 489 490 RecordPeerCount(peers_.size()); 491 492 auto it = pending_peer_messages_.find(name); 493 if (it != pending_peer_messages_.end()) { 494 std::swap(pending_messages, it->second); 495 pending_peer_messages_.erase(it); 496 } 497 } 498 499 if (start_channel) 500 channel->Start(); 501 502 // Flush any queued message we need to deliver to this node. 503 while (!pending_messages.empty()) { 504 channel->PortsMessage(std::move(pending_messages.front())); 505 pending_messages.pop(); 506 } 507 } 508 509 void NodeController::DropPeer(const ports::NodeName& name, 510 NodeChannel* channel) { 511 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 512 513 { 514 base::AutoLock lock(peers_lock_); 515 auto it = peers_.find(name); 516 517 if (it != peers_.end()) { 518 ports::NodeName peer = it->first; 519 peers_.erase(it); 520 DVLOG(1) << "Dropped peer " << peer; 521 } 522 523 pending_peer_messages_.erase(name); 524 pending_children_.erase(name); 525 526 RecordPeerCount(peers_.size()); 527 RecordPendingChildCount(pending_children_.size()); 528 } 529 530 std::vector<ports::PortRef> ports_to_close; 531 { 532 // Clean up any reserved ports. 533 base::AutoLock lock(reserved_ports_lock_); 534 auto it = pending_child_tokens_.find(name); 535 if (it != pending_child_tokens_.end()) { 536 const std::string& child_token = it->second; 537 538 std::vector<std::string> port_tokens; 539 for (const auto& port : reserved_ports_) { 540 if (port.second.child_token == child_token) { 541 DVLOG(1) << "Closing reserved port: " << port.second.port.name(); 542 ports_to_close.push_back(port.second.port); 543 port_tokens.push_back(port.first); 544 } 545 } 546 547 // We have to erase reserved ports in a two-step manner because the usual 548 // manner of using the returned iterator from map::erase isn't technically 549 // valid in C++11 (although it is in C++14). 550 for (const auto& token : port_tokens) 551 reserved_ports_.erase(token); 552 553 pending_child_tokens_.erase(it); 554 } 555 } 556 557 bool is_parent; 558 { 559 base::AutoLock lock(parent_lock_); 560 is_parent = (name == parent_name_ || channel == bootstrap_parent_channel_); 561 } 562 // If the error comes from the parent channel, we also need to cancel any 563 // port merge requests, so that errors can be propagated to the message 564 // pipes. 565 if (is_parent) { 566 base::AutoLock lock(pending_port_merges_lock_); 567 reject_pending_merges_ = true; 568 569 for (const auto& port : pending_port_merges_) 570 ports_to_close.push_back(port.second); 571 pending_port_merges_.clear(); 572 } 573 574 for (const auto& port : ports_to_close) 575 node_->ClosePort(port); 576 577 node_->LostConnectionToNode(name); 578 579 AcceptIncomingMessages(); 580 } 581 582 void NodeController::SendPeerMessage(const ports::NodeName& name, 583 ports::ScopedMessage message) { 584 Channel::MessagePtr channel_message = 585 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); 586 587 scoped_refptr<NodeChannel> peer = GetPeerChannel(name); 588 #if defined(OS_WIN) 589 if (channel_message->has_handles()) { 590 // If we're sending a message with handles we aren't the destination 591 // node's parent or broker (i.e. we don't know its process handle), ask 592 // the broker to relay for us. 593 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 594 if (!peer || !peer->HasRemoteProcessHandle()) { 595 if (broker) { 596 broker->RelayPortsMessage(name, std::move(channel_message)); 597 } else { 598 base::AutoLock lock(broker_lock_); 599 pending_relay_messages_[name].emplace(std::move(channel_message)); 600 } 601 return; 602 } 603 } 604 #elif defined(OS_MACOSX) && !defined(OS_IOS) 605 if (channel_message->has_mach_ports()) { 606 // Messages containing Mach ports are always routed through the broker, even 607 // if the broker process is the intended recipient. 608 bool use_broker = false; 609 { 610 base::AutoLock lock(parent_lock_); 611 use_broker = (bootstrap_parent_channel_ || 612 parent_name_ != ports::kInvalidNodeName); 613 } 614 if (use_broker) { 615 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 616 if (broker) { 617 broker->RelayPortsMessage(name, std::move(channel_message)); 618 } else { 619 base::AutoLock lock(broker_lock_); 620 pending_relay_messages_[name].emplace(std::move(channel_message)); 621 } 622 return; 623 } 624 } 625 #endif // defined(OS_WIN) 626 627 if (peer) { 628 peer->PortsMessage(std::move(channel_message)); 629 return; 630 } 631 632 // If we don't know who the peer is, queue the message for delivery. If this 633 // is the first message queued for the peer, we also ask the broker to 634 // introduce us to them. 635 636 bool needs_introduction = false; 637 { 638 base::AutoLock lock(peers_lock_); 639 auto& queue = pending_peer_messages_[name]; 640 needs_introduction = queue.empty(); 641 queue.emplace(std::move(channel_message)); 642 } 643 644 if (needs_introduction) { 645 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 646 if (!broker) { 647 DVLOG(1) << "Dropping message for unknown peer: " << name; 648 return; 649 } 650 broker->RequestIntroduction(name); 651 } 652 } 653 654 void NodeController::AcceptIncomingMessages() { 655 { 656 base::AutoLock lock(messages_lock_); 657 if (!incoming_messages_.empty()) { 658 // libstdc++'s deque creates an internal buffer on construction, even when 659 // the size is 0. So avoid creating it until it is necessary. 660 std::queue<ports::ScopedMessage> messages; 661 std::swap(messages, incoming_messages_); 662 base::AutoUnlock unlock(messages_lock_); 663 664 while (!messages.empty()) { 665 node_->AcceptMessage(std::move(messages.front())); 666 messages.pop(); 667 } 668 } 669 } 670 671 AttemptShutdownIfRequested(); 672 } 673 674 void NodeController::ProcessIncomingMessages() { 675 RequestContext request_context(RequestContext::Source::SYSTEM); 676 677 { 678 base::AutoLock lock(messages_lock_); 679 // Allow a new incoming messages processing task to be posted. This can't be 680 // done after AcceptIncomingMessages() otherwise a message might be missed. 681 // Doing it here may result in at most two tasks existing at the same time; 682 // this running one, and one pending in the task runner. 683 incoming_messages_task_posted_ = false; 684 } 685 686 AcceptIncomingMessages(); 687 } 688 689 void NodeController::DropAllPeers() { 690 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 691 692 std::vector<scoped_refptr<NodeChannel>> all_peers; 693 { 694 base::AutoLock lock(parent_lock_); 695 if (bootstrap_parent_channel_) { 696 // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its 697 // existence to determine whether or not this is the root node. Once 698 // bootstrap_parent_channel_->ShutDown() has been called, 699 // |bootstrap_parent_channel_| is essentially a dead object and it doesn't 700 // matter if it's deleted now or when |this| is deleted. 701 // Note: |bootstrap_parent_channel_| is only modified on the IO thread. 702 all_peers.push_back(bootstrap_parent_channel_); 703 } 704 } 705 706 { 707 base::AutoLock lock(peers_lock_); 708 for (const auto& peer : peers_) 709 all_peers.push_back(peer.second); 710 for (const auto& peer : pending_children_) 711 all_peers.push_back(peer.second); 712 peers_.clear(); 713 pending_children_.clear(); 714 pending_peer_messages_.clear(); 715 } 716 717 for (const auto& peer : all_peers) 718 peer->ShutDown(); 719 720 if (destroy_on_io_thread_shutdown_) 721 delete this; 722 } 723 724 void NodeController::GenerateRandomPortName(ports::PortName* port_name) { 725 GenerateRandomName(port_name); 726 } 727 728 void NodeController::AllocMessage(size_t num_header_bytes, 729 ports::ScopedMessage* message) { 730 message->reset(new PortsMessage(num_header_bytes, 0, 0, nullptr)); 731 } 732 733 void NodeController::ForwardMessage(const ports::NodeName& node, 734 ports::ScopedMessage message) { 735 DCHECK(message); 736 bool schedule_pump_task = false; 737 if (node == name_) { 738 // NOTE: We need to avoid re-entering the Node instance within 739 // ForwardMessage. Because ForwardMessage is only ever called 740 // (synchronously) in response to Node's ClosePort, SendMessage, or 741 // AcceptMessage, we flush the queue after calling any of those methods. 742 base::AutoLock lock(messages_lock_); 743 // |io_task_runner_| may be null in tests or processes that don't require 744 // multi-process Mojo. 745 schedule_pump_task = incoming_messages_.empty() && io_task_runner_ && 746 !incoming_messages_task_posted_; 747 incoming_messages_task_posted_ |= schedule_pump_task; 748 incoming_messages_.emplace(std::move(message)); 749 } else { 750 SendPeerMessage(node, std::move(message)); 751 } 752 753 if (schedule_pump_task) { 754 // Normally, the queue is processed after the action that added the local 755 // message is done (i.e. SendMessage, ClosePort, etc). However, it's also 756 // possible for a local message to be added as a result of a remote message, 757 // and OnChannelMessage() doesn't process this queue (although 758 // OnPortsMessage() does). There may also be other code paths, now or added 759 // in the future, which cause local messages to be added but don't process 760 // this message queue. 761 // 762 // Instead of adding a call to AcceptIncomingMessages() on every possible 763 // code path, post a task to the IO thread to process the queue. If the 764 // current call stack processes the queue, this may end up doing nothing. 765 io_task_runner_->PostTask( 766 FROM_HERE, 767 base::Bind(&NodeController::ProcessIncomingMessages, 768 base::Unretained(this))); 769 } 770 } 771 772 void NodeController::BroadcastMessage(ports::ScopedMessage message) { 773 CHECK_EQ(message->num_ports(), 0u); 774 Channel::MessagePtr channel_message = 775 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); 776 CHECK(!channel_message->has_handles()); 777 778 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 779 if (broker) 780 broker->Broadcast(std::move(channel_message)); 781 else 782 OnBroadcast(name_, std::move(channel_message)); 783 } 784 785 void NodeController::PortStatusChanged(const ports::PortRef& port) { 786 scoped_refptr<ports::UserData> user_data; 787 node_->GetUserData(port, &user_data); 788 789 PortObserver* observer = static_cast<PortObserver*>(user_data.get()); 790 if (observer) { 791 observer->OnPortStatusChanged(); 792 } else { 793 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " 794 << "doesn't have an observer."; 795 } 796 } 797 798 void NodeController::OnAcceptChild(const ports::NodeName& from_node, 799 const ports::NodeName& parent_name, 800 const ports::NodeName& token) { 801 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 802 803 scoped_refptr<NodeChannel> parent; 804 { 805 base::AutoLock lock(parent_lock_); 806 if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) { 807 parent_name_ = parent_name; 808 parent = bootstrap_parent_channel_; 809 } 810 } 811 812 if (!parent) { 813 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node; 814 DropPeer(from_node, nullptr); 815 return; 816 } 817 818 parent->SetRemoteNodeName(parent_name); 819 parent->AcceptParent(token, name_); 820 821 // NOTE: The child does not actually add its parent as a peer until 822 // receiving an AcceptBrokerClient message from the broker. The parent 823 // will request that said message be sent upon receiving AcceptParent. 824 825 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; 826 } 827 828 void NodeController::OnAcceptParent(const ports::NodeName& from_node, 829 const ports::NodeName& token, 830 const ports::NodeName& child_name) { 831 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 832 833 auto it = pending_children_.find(from_node); 834 if (it == pending_children_.end() || token != from_node) { 835 DLOG(ERROR) << "Received unexpected AcceptParent message from " 836 << from_node; 837 DropPeer(from_node, nullptr); 838 return; 839 } 840 841 scoped_refptr<NodeChannel> channel = it->second; 842 pending_children_.erase(it); 843 844 DCHECK(channel); 845 846 DVLOG(1) << "Parent " << name_ << " accepted child " << child_name; 847 848 AddPeer(child_name, channel, false /* start_channel */); 849 850 // TODO(rockot/amistry): We could simplify child initialization if we could 851 // synchronously get a new async broker channel from the broker. For now we do 852 // it asynchronously since it's only used to facilitate handle passing, not 853 // handle creation. 854 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 855 if (broker) { 856 // Inform the broker of this new child. 857 broker->AddBrokerClient(child_name, channel->CopyRemoteProcessHandle()); 858 } else { 859 // If we have no broker, either we need to wait for one, or we *are* the 860 // broker. 861 scoped_refptr<NodeChannel> parent = GetParentChannel(); 862 if (!parent) { 863 base::AutoLock lock(parent_lock_); 864 parent = bootstrap_parent_channel_; 865 } 866 867 if (!parent) { 868 // Yes, we're the broker. We can initialize the child directly. 869 channel->AcceptBrokerClient(name_, ScopedPlatformHandle()); 870 } else { 871 // We aren't the broker, so wait for a broker connection. 872 base::AutoLock lock(broker_lock_); 873 pending_broker_clients_.push(child_name); 874 } 875 } 876 } 877 878 void NodeController::OnAddBrokerClient(const ports::NodeName& from_node, 879 const ports::NodeName& client_name, 880 base::ProcessHandle process_handle) { 881 #if defined(OS_WIN) 882 // Scoped handle to avoid leaks on error. 883 ScopedPlatformHandle scoped_process_handle = 884 ScopedPlatformHandle(PlatformHandle(process_handle)); 885 #endif 886 scoped_refptr<NodeChannel> sender = GetPeerChannel(from_node); 887 if (!sender) { 888 DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender."; 889 return; 890 } 891 892 if (GetPeerChannel(client_name)) { 893 DLOG(ERROR) << "Ignoring AddBrokerClient for known client."; 894 DropPeer(from_node, nullptr); 895 return; 896 } 897 898 PlatformChannelPair broker_channel; 899 scoped_refptr<NodeChannel> client = NodeChannel::Create( 900 this, broker_channel.PassServerHandle(), io_task_runner_, 901 ProcessErrorCallback()); 902 903 #if defined(OS_WIN) 904 // The broker must have a working handle to the client process in order to 905 // properly copy other handles to and from the client. 906 if (!scoped_process_handle.is_valid()) { 907 DLOG(ERROR) << "Broker rejecting client with invalid process handle."; 908 return; 909 } 910 client->SetRemoteProcessHandle(scoped_process_handle.release().handle); 911 #else 912 client->SetRemoteProcessHandle(process_handle); 913 #endif 914 915 AddPeer(client_name, client, true /* start_channel */); 916 917 DVLOG(1) << "Broker " << name_ << " accepting client " << client_name 918 << " from peer " << from_node; 919 920 sender->BrokerClientAdded(client_name, broker_channel.PassClientHandle()); 921 } 922 923 void NodeController::OnBrokerClientAdded(const ports::NodeName& from_node, 924 const ports::NodeName& client_name, 925 ScopedPlatformHandle broker_channel) { 926 scoped_refptr<NodeChannel> client = GetPeerChannel(client_name); 927 if (!client) { 928 DLOG(ERROR) << "BrokerClientAdded for unknown child " << client_name; 929 return; 930 } 931 932 // This should have come from our own broker. 933 if (GetBrokerChannel() != GetPeerChannel(from_node)) { 934 DLOG(ERROR) << "BrokerClientAdded from non-broker node " << from_node; 935 return; 936 } 937 938 DVLOG(1) << "Child " << client_name << " accepted by broker " << from_node; 939 940 client->AcceptBrokerClient(from_node, std::move(broker_channel)); 941 } 942 943 void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node, 944 const ports::NodeName& broker_name, 945 ScopedPlatformHandle broker_channel) { 946 // This node should already have a parent in bootstrap mode. 947 ports::NodeName parent_name; 948 scoped_refptr<NodeChannel> parent; 949 { 950 base::AutoLock lock(parent_lock_); 951 parent_name = parent_name_; 952 parent = bootstrap_parent_channel_; 953 bootstrap_parent_channel_ = nullptr; 954 } 955 DCHECK(parent_name == from_node); 956 DCHECK(parent); 957 958 std::queue<ports::NodeName> pending_broker_clients; 959 std::unordered_map<ports::NodeName, OutgoingMessageQueue> 960 pending_relay_messages; 961 { 962 base::AutoLock lock(broker_lock_); 963 broker_name_ = broker_name; 964 std::swap(pending_broker_clients, pending_broker_clients_); 965 std::swap(pending_relay_messages, pending_relay_messages_); 966 } 967 DCHECK(broker_name != ports::kInvalidNodeName); 968 969 // It's now possible to add both the broker and the parent as peers. 970 // Note that the broker and parent may be the same node. 971 scoped_refptr<NodeChannel> broker; 972 if (broker_name == parent_name) { 973 DCHECK(!broker_channel.is_valid()); 974 broker = parent; 975 } else { 976 DCHECK(broker_channel.is_valid()); 977 broker = NodeChannel::Create(this, std::move(broker_channel), 978 io_task_runner_, ProcessErrorCallback()); 979 AddPeer(broker_name, broker, true /* start_channel */); 980 } 981 982 AddPeer(parent_name, parent, false /* start_channel */); 983 984 { 985 // Complete any port merge requests we have waiting for the parent. 986 base::AutoLock lock(pending_port_merges_lock_); 987 for (const auto& request : pending_port_merges_) 988 parent->RequestPortMerge(request.second.name(), request.first); 989 pending_port_merges_.clear(); 990 } 991 992 // Feed the broker any pending children of our own. 993 while (!pending_broker_clients.empty()) { 994 const ports::NodeName& child_name = pending_broker_clients.front(); 995 auto it = pending_children_.find(child_name); 996 DCHECK(it != pending_children_.end()); 997 broker->AddBrokerClient(child_name, it->second->CopyRemoteProcessHandle()); 998 pending_broker_clients.pop(); 999 } 1000 1001 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 1002 // Have the broker relay any messages we have waiting. 1003 for (auto& entry : pending_relay_messages) { 1004 const ports::NodeName& destination = entry.first; 1005 auto& message_queue = entry.second; 1006 while (!message_queue.empty()) { 1007 broker->RelayPortsMessage(destination, std::move(message_queue.front())); 1008 message_queue.pop(); 1009 } 1010 } 1011 #endif 1012 1013 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name; 1014 } 1015 1016 void NodeController::OnPortsMessage(const ports::NodeName& from_node, 1017 Channel::MessagePtr channel_message) { 1018 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1019 1020 void* data; 1021 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; 1022 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes, 1023 &num_header_bytes, &num_payload_bytes, 1024 &num_ports_bytes)) { 1025 DropPeer(from_node, nullptr); 1026 return; 1027 } 1028 1029 CHECK(channel_message); 1030 std::unique_ptr<PortsMessage> ports_message( 1031 new PortsMessage(num_header_bytes, 1032 num_payload_bytes, 1033 num_ports_bytes, 1034 std::move(channel_message))); 1035 ports_message->set_source_node(from_node); 1036 node_->AcceptMessage(ports::ScopedMessage(ports_message.release())); 1037 AcceptIncomingMessages(); 1038 } 1039 1040 void NodeController::OnRequestPortMerge( 1041 const ports::NodeName& from_node, 1042 const ports::PortName& connector_port_name, 1043 const std::string& token) { 1044 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1045 1046 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token " 1047 << token << " and port " << connector_port_name << "@" << from_node; 1048 1049 ports::PortRef local_port; 1050 { 1051 base::AutoLock lock(reserved_ports_lock_); 1052 auto it = reserved_ports_.find(token); 1053 if (it == reserved_ports_.end()) { 1054 DVLOG(1) << "Ignoring request to connect to port for unknown token " 1055 << token; 1056 return; 1057 } 1058 local_port = it->second.port; 1059 } 1060 1061 int rv = node_->MergePorts(local_port, from_node, connector_port_name); 1062 if (rv != ports::OK) 1063 DLOG(ERROR) << "MergePorts failed: " << rv; 1064 1065 AcceptIncomingMessages(); 1066 } 1067 1068 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, 1069 const ports::NodeName& name) { 1070 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1071 1072 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); 1073 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { 1074 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " 1075 << from_node; 1076 DropPeer(from_node, nullptr); 1077 return; 1078 } 1079 1080 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name); 1081 if (!new_friend) { 1082 // We don't know who they're talking about! 1083 requestor->Introduce(name, ScopedPlatformHandle()); 1084 } else { 1085 PlatformChannelPair new_channel; 1086 requestor->Introduce(name, new_channel.PassServerHandle()); 1087 new_friend->Introduce(from_node, new_channel.PassClientHandle()); 1088 } 1089 } 1090 1091 void NodeController::OnIntroduce(const ports::NodeName& from_node, 1092 const ports::NodeName& name, 1093 ScopedPlatformHandle channel_handle) { 1094 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1095 1096 if (!channel_handle.is_valid()) { 1097 node_->LostConnectionToNode(name); 1098 1099 DLOG(ERROR) << "Could not be introduced to peer " << name; 1100 base::AutoLock lock(peers_lock_); 1101 pending_peer_messages_.erase(name); 1102 return; 1103 } 1104 1105 scoped_refptr<NodeChannel> channel = 1106 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_, 1107 ProcessErrorCallback()); 1108 1109 DVLOG(1) << "Adding new peer " << name << " via parent introduction."; 1110 AddPeer(name, channel, true /* start_channel */); 1111 } 1112 1113 void NodeController::OnBroadcast(const ports::NodeName& from_node, 1114 Channel::MessagePtr message) { 1115 DCHECK(!message->has_handles()); 1116 1117 void* data; 1118 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; 1119 if (!ParsePortsMessage(message.get(), &data, &num_data_bytes, 1120 &num_header_bytes, &num_payload_bytes, 1121 &num_ports_bytes)) { 1122 DropPeer(from_node, nullptr); 1123 return; 1124 } 1125 1126 // Broadcast messages must not contain ports. 1127 if (num_ports_bytes > 0) { 1128 DropPeer(from_node, nullptr); 1129 return; 1130 } 1131 1132 base::AutoLock lock(peers_lock_); 1133 for (auto& iter : peers_) { 1134 // Copy and send the message to each known peer. 1135 Channel::MessagePtr peer_message( 1136 new Channel::Message(message->payload_size(), 0)); 1137 memcpy(peer_message->mutable_payload(), message->payload(), 1138 message->payload_size()); 1139 iter.second->PortsMessage(std::move(peer_message)); 1140 } 1141 } 1142 1143 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 1144 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, 1145 base::ProcessHandle from_process, 1146 const ports::NodeName& destination, 1147 Channel::MessagePtr message) { 1148 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1149 1150 if (GetBrokerChannel()) { 1151 // Only the broker should be asked to relay a message. 1152 LOG(ERROR) << "Non-broker refusing to relay message."; 1153 DropPeer(from_node, nullptr); 1154 return; 1155 } 1156 1157 // The parent should always know which process this came from. 1158 DCHECK(from_process != base::kNullProcessHandle); 1159 1160 #if defined(OS_WIN) 1161 // Rewrite the handles to this (the parent) process. If the message is 1162 // destined for another child process, the handles will be rewritten to that 1163 // process before going out (see NodeChannel::WriteChannelMessage). 1164 // 1165 // TODO: We could avoid double-duplication. 1166 // 1167 // Note that we explicitly mark the handles as being owned by the sending 1168 // process before rewriting them, in order to accommodate RewriteHandles' 1169 // internal sanity checks. 1170 ScopedPlatformHandleVectorPtr handles = message->TakeHandles(); 1171 for (size_t i = 0; i < handles->size(); ++i) 1172 (*handles)[i].owning_process = from_process; 1173 if (!Channel::Message::RewriteHandles(from_process, 1174 base::GetCurrentProcessHandle(), 1175 handles.get())) { 1176 DLOG(ERROR) << "Failed to relay one or more handles."; 1177 } 1178 message->SetHandles(std::move(handles)); 1179 #else 1180 MachPortRelay* relay = GetMachPortRelay(); 1181 if (!relay) { 1182 LOG(ERROR) << "Receiving Mach ports without a port relay from " 1183 << from_node << ". Dropping message."; 1184 return; 1185 } 1186 if (!relay->ExtractPortRights(message.get(), from_process)) { 1187 // NodeChannel should ensure that MachPortRelay is ready for the remote 1188 // process. At this point, if the port extraction failed, either something 1189 // went wrong in the mach stuff, or the remote process died. 1190 LOG(ERROR) << "Error on receiving Mach ports " << from_node 1191 << ". Dropping message."; 1192 return; 1193 } 1194 #endif // defined(OS_WIN) 1195 1196 if (destination == name_) { 1197 // Great, we can deliver this message locally. 1198 OnPortsMessage(from_node, std::move(message)); 1199 return; 1200 } 1201 1202 scoped_refptr<NodeChannel> peer = GetPeerChannel(destination); 1203 if (peer) 1204 peer->PortsMessageFromRelay(from_node, std::move(message)); 1205 else 1206 DLOG(ERROR) << "Dropping relay message for unknown node " << destination; 1207 } 1208 1209 void NodeController::OnPortsMessageFromRelay(const ports::NodeName& from_node, 1210 const ports::NodeName& source_node, 1211 Channel::MessagePtr message) { 1212 if (GetPeerChannel(from_node) != GetBrokerChannel()) { 1213 LOG(ERROR) << "Refusing relayed message from non-broker node."; 1214 DropPeer(from_node, nullptr); 1215 return; 1216 } 1217 1218 OnPortsMessage(source_node, std::move(message)); 1219 } 1220 #endif 1221 1222 void NodeController::OnChannelError(const ports::NodeName& from_node, 1223 NodeChannel* channel) { 1224 if (io_task_runner_->RunsTasksOnCurrentThread()) { 1225 DropPeer(from_node, channel); 1226 // DropPeer may have caused local port closures, so be sure to process any 1227 // pending local messages. 1228 AcceptIncomingMessages(); 1229 } else { 1230 io_task_runner_->PostTask( 1231 FROM_HERE, 1232 base::Bind(&NodeController::OnChannelError, base::Unretained(this), 1233 from_node, channel)); 1234 } 1235 } 1236 1237 #if defined(OS_MACOSX) && !defined(OS_IOS) 1238 MachPortRelay* NodeController::GetMachPortRelay() { 1239 { 1240 base::AutoLock lock(parent_lock_); 1241 // Return null if we're not the root. 1242 if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) 1243 return nullptr; 1244 } 1245 1246 base::AutoLock lock(mach_port_relay_lock_); 1247 return mach_port_relay_.get(); 1248 } 1249 #endif 1250 1251 void NodeController::DestroyOnIOThreadShutdown() { 1252 destroy_on_io_thread_shutdown_ = true; 1253 } 1254 1255 void NodeController::AttemptShutdownIfRequested() { 1256 if (!shutdown_callback_flag_) 1257 return; 1258 1259 base::Closure callback; 1260 { 1261 base::AutoLock lock(shutdown_lock_); 1262 if (shutdown_callback_.is_null()) 1263 return; 1264 if (!node_->CanShutdownCleanly(true /* allow_local_ports */)) { 1265 DVLOG(2) << "Unable to cleanly shut down node " << name_; 1266 return; 1267 } 1268 1269 callback = shutdown_callback_; 1270 shutdown_callback_.Reset(); 1271 shutdown_callback_flag_.Set(false); 1272 } 1273 1274 DCHECK(!callback.is_null()); 1275 1276 callback.Run(); 1277 } 1278 1279 } // namespace edk 1280 } // namespace mojo 1281