1 // Copyright 2016 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/core/node_controller.h" 6 7 #include <algorithm> 8 #include <limits> 9 #include <vector> 10 11 #include "base/bind.h" 12 #include "base/containers/queue.h" 13 #include "base/location.h" 14 #include "base/logging.h" 15 #include "base/macros.h" 16 #include "base/message_loop/message_loop_current.h" 17 #include "base/metrics/histogram_macros.h" 18 #include "base/process/process_handle.h" 19 #include "base/rand_util.h" 20 #include "base/time/time.h" 21 #include "base/timer/elapsed_timer.h" 22 #include "mojo/core/broker.h" 23 #include "mojo/core/broker_host.h" 24 #include "mojo/core/configuration.h" 25 #include "mojo/core/core.h" 26 #include "mojo/core/request_context.h" 27 #include "mojo/core/user_message_impl.h" 28 #include "mojo/public/cpp/platform/named_platform_channel.h" 29 #include "mojo/public/cpp/platform/platform_channel.h" 30 31 #if defined(OS_WIN) 32 #include <windows.h> 33 #endif 34 35 #if defined(OS_MACOSX) && !defined(OS_IOS) 36 #include "mojo/core/mach_port_relay.h" 37 #endif 38 39 #if !defined(OS_NACL) 40 #include "crypto/random.h" 41 #endif 42 43 namespace mojo { 44 namespace core { 45 46 namespace { 47 48 #if defined(OS_NACL) 49 template <typename T> 50 void GenerateRandomName(T* out) { 51 base::RandBytes(out, sizeof(T)); 52 } 53 #else 54 template <typename T> 55 void GenerateRandomName(T* out) { 56 crypto::RandBytes(out, sizeof(T)); 57 } 58 #endif 59 60 ports::NodeName GetRandomNodeName() { 61 ports::NodeName name; 62 GenerateRandomName(&name); 63 return name; 64 } 65 66 Channel::MessagePtr SerializeEventMessage(ports::ScopedEvent event) { 67 if (event->type() == ports::Event::Type::kUserMessage) { 68 // User message events must already be partially serialized. 69 return UserMessageImpl::FinalizeEventMessage( 70 ports::Event::Cast<ports::UserMessageEvent>(&event)); 71 } 72 73 void* data; 74 size_t size = event->GetSerializedSize(); 75 auto message = NodeChannel::CreateEventMessage(size, size, &data, 0); 76 event->Serialize(data); 77 return message; 78 } 79 80 ports::ScopedEvent DeserializeEventMessage( 81 const ports::NodeName& from_node, 82 Channel::MessagePtr channel_message) { 83 void* data; 84 size_t size; 85 NodeChannel::GetEventMessageData(channel_message.get(), &data, &size); 86 auto event = ports::Event::Deserialize(data, size); 87 if (!event) 88 return nullptr; 89 90 if (event->type() != ports::Event::Type::kUserMessage) 91 return event; 92 93 // User messages require extra parsing. 94 const size_t event_size = event->GetSerializedSize(); 95 96 // Note that if this weren't true, the event couldn't have been deserialized 97 // in the first place. 98 DCHECK_LE(event_size, size); 99 100 auto message_event = ports::Event::Cast<ports::UserMessageEvent>(&event); 101 auto message = UserMessageImpl::CreateFromChannelMessage( 102 message_event.get(), std::move(channel_message), 103 static_cast<uint8_t*>(data) + event_size, size - event_size); 104 message->set_source_node(from_node); 105 106 message_event->AttachMessage(std::move(message)); 107 return std::move(message_event); 108 } 109 110 // Used by NodeController to watch for shutdown. Since no IO can happen once 111 // the IO thread is killed, the NodeController can cleanly drop all its peers 112 // at that time. 113 class ThreadDestructionObserver 114 : public base::MessageLoopCurrent::DestructionObserver { 115 public: 116 static void Create(scoped_refptr<base::TaskRunner> task_runner, 117 const base::Closure& callback) { 118 if (task_runner->RunsTasksInCurrentSequence()) { 119 // Owns itself. 120 new ThreadDestructionObserver(callback); 121 } else { 122 task_runner->PostTask(FROM_HERE, 123 base::Bind(&Create, task_runner, callback)); 124 } 125 } 126 127 private: 128 explicit ThreadDestructionObserver(const base::Closure& callback) 129 : callback_(callback) { 130 base::MessageLoopCurrent::Get()->AddDestructionObserver(this); 131 } 132 133 ~ThreadDestructionObserver() override { 134 base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this); 135 } 136 137 // base::MessageLoopCurrent::DestructionObserver: 138 void WillDestroyCurrentMessageLoop() override { 139 callback_.Run(); 140 delete this; 141 } 142 143 const base::Closure callback_; 144 145 DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver); 146 }; 147 148 } // namespace 149 150 NodeController::~NodeController() {} 151 152 NodeController::NodeController(Core* core) 153 : core_(core), 154 name_(GetRandomNodeName()), 155 node_(new ports::Node(name_, this)) { 156 DVLOG(1) << "Initializing node " << name_; 157 } 158 159 #if defined(OS_MACOSX) && !defined(OS_IOS) 160 void NodeController::CreateMachPortRelay(base::PortProvider* port_provider) { 161 base::AutoLock lock(mach_port_relay_lock_); 162 DCHECK(!mach_port_relay_); 163 mach_port_relay_.reset(new MachPortRelay(port_provider)); 164 } 165 #endif 166 167 void NodeController::SetIOTaskRunner( 168 scoped_refptr<base::TaskRunner> task_runner) { 169 io_task_runner_ = task_runner; 170 ThreadDestructionObserver::Create( 171 io_task_runner_, 172 base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); 173 } 174 175 void NodeController::SendBrokerClientInvitation( 176 base::ProcessHandle target_process, 177 ConnectionParams connection_params, 178 const std::vector<std::pair<std::string, ports::PortRef>>& attached_ports, 179 const ProcessErrorCallback& process_error_callback) { 180 // Generate the temporary remote node name here so that it can be associated 181 // with the ports "attached" to this invitation. 182 ports::NodeName temporary_node_name; 183 GenerateRandomName(&temporary_node_name); 184 185 { 186 base::AutoLock lock(reserved_ports_lock_); 187 PortMap& port_map = reserved_ports_[temporary_node_name]; 188 for (auto& entry : attached_ports) { 189 auto result = port_map.emplace(entry.first, entry.second); 190 DCHECK(result.second) << "Duplicate attachment: " << entry.first; 191 } 192 } 193 194 ScopedProcessHandle scoped_target_process = 195 ScopedProcessHandle::CloneFrom(target_process); 196 io_task_runner_->PostTask( 197 FROM_HERE, 198 base::BindOnce(&NodeController::SendBrokerClientInvitationOnIOThread, 199 base::Unretained(this), std::move(scoped_target_process), 200 std::move(connection_params), temporary_node_name, 201 process_error_callback)); 202 } 203 204 void NodeController::AcceptBrokerClientInvitation( 205 ConnectionParams connection_params) { 206 DCHECK(!GetConfiguration().is_broker_process); 207 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) && !defined(OS_FUCHSIA) 208 // Use the bootstrap channel for the broker and receive the node's channel 209 // synchronously as the first message from the broker. 210 DCHECK(connection_params.endpoint().is_valid()); 211 base::ElapsedTimer timer; 212 broker_ = std::make_unique<Broker>( 213 connection_params.TakeEndpoint().TakePlatformHandle()); 214 PlatformChannelEndpoint endpoint = broker_->GetInviterEndpoint(); 215 216 if (!endpoint.is_valid()) { 217 // Most likely the inviter's side of the channel has already been closed and 218 // the broker was unable to negotiate a NodeChannel pipe. In this case we 219 // can cancel our connection to our inviter. 220 DVLOG(1) << "Cannot connect to invalid inviter channel."; 221 CancelPendingPortMerges(); 222 return; 223 } 224 connection_params = ConnectionParams(std::move(endpoint)); 225 #endif 226 227 io_task_runner_->PostTask( 228 FROM_HERE, 229 base::BindOnce(&NodeController::AcceptBrokerClientInvitationOnIOThread, 230 base::Unretained(this), std::move(connection_params))); 231 } 232 233 void NodeController::ConnectIsolated(ConnectionParams connection_params, 234 const ports::PortRef& port, 235 base::StringPiece connection_name) { 236 io_task_runner_->PostTask( 237 FROM_HERE, 238 base::BindOnce(&NodeController::ConnectIsolatedOnIOThread, 239 base::Unretained(this), base::Passed(&connection_params), 240 port, connection_name.as_string())); 241 } 242 243 void NodeController::SetPortObserver(const ports::PortRef& port, 244 scoped_refptr<PortObserver> observer) { 245 node_->SetUserData(port, std::move(observer)); 246 } 247 248 void NodeController::ClosePort(const ports::PortRef& port) { 249 SetPortObserver(port, nullptr); 250 int rv = node_->ClosePort(port); 251 DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name(); 252 } 253 254 int NodeController::SendUserMessage( 255 const ports::PortRef& port, 256 std::unique_ptr<ports::UserMessageEvent> message) { 257 return node_->SendUserMessage(port, std::move(message)); 258 } 259 260 void NodeController::MergePortIntoInviter(const std::string& name, 261 const ports::PortRef& port) { 262 scoped_refptr<NodeChannel> inviter; 263 bool reject_merge = false; 264 { 265 // Hold |pending_port_merges_lock_| while getting |inviter|. Otherwise, 266 // there is a race where the inviter can be set, and |pending_port_merges_| 267 // be processed between retrieving |inviter| and adding the merge to 268 // |pending_port_merges_|. 269 base::AutoLock lock(pending_port_merges_lock_); 270 inviter = GetInviterChannel(); 271 if (reject_pending_merges_) { 272 reject_merge = true; 273 } else if (!inviter) { 274 pending_port_merges_.push_back(std::make_pair(name, port)); 275 return; 276 } 277 } 278 if (reject_merge) { 279 node_->ClosePort(port); 280 DVLOG(2) << "Rejecting port merge for name " << name 281 << " due to closed inviter channel."; 282 return; 283 } 284 285 inviter->RequestPortMerge(port.name(), name); 286 } 287 288 int NodeController::MergeLocalPorts(const ports::PortRef& port0, 289 const ports::PortRef& port1) { 290 return node_->MergeLocalPorts(port0, port1); 291 } 292 293 base::WritableSharedMemoryRegion NodeController::CreateSharedBuffer( 294 size_t num_bytes) { 295 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) && !defined(OS_FUCHSIA) 296 // Shared buffer creation failure is fatal, so always use the broker when we 297 // have one; unless of course the embedder forces us not to. 298 if (!GetConfiguration().force_direct_shared_memory_allocation && broker_) 299 return broker_->GetWritableSharedMemoryRegion(num_bytes); 300 #endif 301 return base::WritableSharedMemoryRegion::Create(num_bytes); 302 } 303 304 void NodeController::RequestShutdown(const base::Closure& callback) { 305 { 306 base::AutoLock lock(shutdown_lock_); 307 shutdown_callback_ = callback; 308 shutdown_callback_flag_.Set(true); 309 } 310 311 AttemptShutdownIfRequested(); 312 } 313 314 void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node, 315 const std::string& error) { 316 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node); 317 if (peer) 318 peer->NotifyBadMessage(error); 319 } 320 321 void NodeController::SendBrokerClientInvitationOnIOThread( 322 ScopedProcessHandle target_process, 323 ConnectionParams connection_params, 324 ports::NodeName temporary_node_name, 325 const ProcessErrorCallback& process_error_callback) { 326 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 327 328 #if !defined(OS_MACOSX) && !defined(OS_NACL) && !defined(OS_FUCHSIA) 329 PlatformChannel node_channel; 330 ConnectionParams node_connection_params(node_channel.TakeLocalEndpoint()); 331 // BrokerHost owns itself. 332 BrokerHost* broker_host = 333 new BrokerHost(target_process.get(), std::move(connection_params), 334 process_error_callback); 335 bool channel_ok = broker_host->SendChannel( 336 node_channel.TakeRemoteEndpoint().TakePlatformHandle()); 337 338 #if defined(OS_WIN) 339 if (!channel_ok) { 340 // On Windows the above operation may fail if the channel is crossing a 341 // session boundary. In that case we fall back to a named pipe. 342 NamedPlatformChannel::Options options; 343 NamedPlatformChannel named_channel(options); 344 node_connection_params = 345 ConnectionParams(named_channel.TakeServerEndpoint()); 346 broker_host->SendNamedChannel(named_channel.GetServerName()); 347 } 348 #else 349 CHECK(channel_ok); 350 #endif // defined(OS_WIN) 351 352 scoped_refptr<NodeChannel> channel = 353 NodeChannel::Create(this, std::move(node_connection_params), 354 io_task_runner_, process_error_callback); 355 356 #else // !defined(OS_MACOSX) && !defined(OS_NACL) 357 scoped_refptr<NodeChannel> channel = 358 NodeChannel::Create(this, std::move(connection_params), io_task_runner_, 359 process_error_callback); 360 #endif // !defined(OS_MACOSX) && !defined(OS_NACL) 361 362 // We set up the invitee channel with a temporary name so it can be identified 363 // as a pending invitee if it writes any messages to the channel. We may start 364 // receiving messages from it (though we shouldn't) as soon as Start() is 365 // called below. 366 367 pending_invitations_.insert(std::make_pair(temporary_node_name, channel)); 368 369 channel->SetRemoteNodeName(temporary_node_name); 370 channel->SetRemoteProcessHandle(std::move(target_process)); 371 channel->Start(); 372 373 channel->AcceptInvitee(name_, temporary_node_name); 374 } 375 376 void NodeController::AcceptBrokerClientInvitationOnIOThread( 377 ConnectionParams connection_params) { 378 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 379 380 { 381 base::AutoLock lock(inviter_lock_); 382 DCHECK(inviter_name_ == ports::kInvalidNodeName); 383 384 // At this point we don't know the inviter's name, so we can't yet insert it 385 // into our |peers_| map. That will happen as soon as we receive an 386 // AcceptInvitee message from them. 387 bootstrap_inviter_channel_ = 388 NodeChannel::Create(this, std::move(connection_params), io_task_runner_, 389 ProcessErrorCallback()); 390 // Prevent the inviter pipe handle from being closed on shutdown. Pipe 391 // closure may be used by the inviter to detect the invitee process has 392 // exited. 393 bootstrap_inviter_channel_->LeakHandleOnShutdown(); 394 } 395 bootstrap_inviter_channel_->Start(); 396 } 397 398 void NodeController::ConnectIsolatedOnIOThread( 399 ConnectionParams connection_params, 400 ports::PortRef port, 401 const std::string& connection_name) { 402 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 403 404 scoped_refptr<NodeChannel> channel = NodeChannel::Create( 405 this, std::move(connection_params), io_task_runner_, {}); 406 407 RequestContext request_context; 408 ports::NodeName token; 409 GenerateRandomName(&token); 410 pending_isolated_connections_.emplace( 411 token, IsolatedConnection{channel, port, connection_name}); 412 if (!connection_name.empty()) { 413 // If a connection already exists with this name, drop it. 414 auto it = named_isolated_connections_.find(connection_name); 415 if (it != named_isolated_connections_.end()) { 416 ports::NodeName connection_node = it->second; 417 if (connection_node != name_) { 418 DropPeer(connection_node, nullptr); 419 } else { 420 auto pending_it = pending_isolated_connections_.find(connection_node); 421 if (pending_it != pending_isolated_connections_.end()) { 422 node_->ClosePort(pending_it->second.local_port); 423 pending_isolated_connections_.erase(pending_it); 424 } 425 named_isolated_connections_.erase(it); 426 } 427 } 428 named_isolated_connections_.emplace(connection_name, token); 429 } 430 431 channel->SetRemoteNodeName(token); 432 channel->Start(); 433 434 channel->AcceptPeer(name_, token, port.name()); 435 } 436 437 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( 438 const ports::NodeName& name) { 439 base::AutoLock lock(peers_lock_); 440 auto it = peers_.find(name); 441 if (it == peers_.end()) 442 return nullptr; 443 return it->second; 444 } 445 446 scoped_refptr<NodeChannel> NodeController::GetInviterChannel() { 447 ports::NodeName inviter_name; 448 { 449 base::AutoLock lock(inviter_lock_); 450 inviter_name = inviter_name_; 451 } 452 return GetPeerChannel(inviter_name); 453 } 454 455 scoped_refptr<NodeChannel> NodeController::GetBrokerChannel() { 456 if (GetConfiguration().is_broker_process) 457 return nullptr; 458 459 ports::NodeName broker_name; 460 { 461 base::AutoLock lock(broker_lock_); 462 broker_name = broker_name_; 463 } 464 return GetPeerChannel(broker_name); 465 } 466 467 void NodeController::AddPeer(const ports::NodeName& name, 468 scoped_refptr<NodeChannel> channel, 469 bool start_channel) { 470 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 471 472 DCHECK(name != ports::kInvalidNodeName); 473 DCHECK(channel); 474 475 channel->SetRemoteNodeName(name); 476 477 OutgoingMessageQueue pending_messages; 478 { 479 base::AutoLock lock(peers_lock_); 480 if (peers_.find(name) != peers_.end()) { 481 // This can happen normally if two nodes race to be introduced to each 482 // other. The losing pipe will be silently closed and introduction should 483 // not be affected. 484 DVLOG(1) << "Ignoring duplicate peer name " << name; 485 return; 486 } 487 488 auto result = peers_.insert(std::make_pair(name, channel)); 489 DCHECK(result.second); 490 491 DVLOG(2) << "Accepting new peer " << name << " on node " << name_; 492 493 auto it = pending_peer_messages_.find(name); 494 if (it != pending_peer_messages_.end()) { 495 std::swap(pending_messages, it->second); 496 pending_peer_messages_.erase(it); 497 } 498 } 499 500 if (start_channel) 501 channel->Start(); 502 503 // Flush any queued message we need to deliver to this node. 504 while (!pending_messages.empty()) { 505 channel->SendChannelMessage(std::move(pending_messages.front())); 506 pending_messages.pop(); 507 } 508 } 509 510 void NodeController::DropPeer(const ports::NodeName& name, 511 NodeChannel* channel) { 512 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 513 514 { 515 base::AutoLock lock(peers_lock_); 516 auto it = peers_.find(name); 517 518 if (it != peers_.end()) { 519 ports::NodeName peer = it->first; 520 peers_.erase(it); 521 DVLOG(1) << "Dropped peer " << peer; 522 } 523 524 pending_peer_messages_.erase(name); 525 pending_invitations_.erase(name); 526 } 527 528 std::vector<ports::PortRef> ports_to_close; 529 { 530 // Clean up any reserved ports. 531 base::AutoLock lock(reserved_ports_lock_); 532 auto it = reserved_ports_.find(name); 533 if (it != reserved_ports_.end()) { 534 for (auto& entry : it->second) 535 ports_to_close.emplace_back(entry.second); 536 reserved_ports_.erase(it); 537 } 538 } 539 540 bool is_inviter; 541 { 542 base::AutoLock lock(inviter_lock_); 543 is_inviter = (name == inviter_name_ || 544 (channel && channel == bootstrap_inviter_channel_)); 545 } 546 547 // If the error comes from the inviter channel, we also need to cancel any 548 // port merge requests, so that errors can be propagated to the message 549 // pipes. 550 if (is_inviter) 551 CancelPendingPortMerges(); 552 553 auto connection_it = pending_isolated_connections_.find(name); 554 if (connection_it != pending_isolated_connections_.end()) { 555 IsolatedConnection& connection = connection_it->second; 556 ports_to_close.push_back(connection.local_port); 557 if (!connection.name.empty()) 558 named_isolated_connections_.erase(connection.name); 559 pending_isolated_connections_.erase(connection_it); 560 } 561 562 for (const auto& port : ports_to_close) 563 node_->ClosePort(port); 564 565 node_->LostConnectionToNode(name); 566 AttemptShutdownIfRequested(); 567 } 568 569 void NodeController::SendPeerEvent(const ports::NodeName& name, 570 ports::ScopedEvent event) { 571 Channel::MessagePtr event_message = SerializeEventMessage(std::move(event)); 572 if (!event_message) 573 return; 574 scoped_refptr<NodeChannel> peer = GetPeerChannel(name); 575 #if defined(OS_WIN) 576 if (event_message->has_handles()) { 577 // If we're sending a message with handles we aren't the destination 578 // node's inviter or broker (i.e. we don't know its process handle), ask 579 // the broker to relay for us. 580 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 581 if (!peer || !peer->HasRemoteProcessHandle()) { 582 if (!GetConfiguration().is_broker_process && broker) { 583 broker->RelayEventMessage(name, std::move(event_message)); 584 } else { 585 base::AutoLock lock(broker_lock_); 586 pending_relay_messages_[name].emplace(std::move(event_message)); 587 } 588 return; 589 } 590 } 591 #elif defined(OS_MACOSX) && !defined(OS_IOS) 592 if (event_message->has_mach_ports()) { 593 // Messages containing Mach ports are always routed through the broker, even 594 // if the broker process is the intended recipient. 595 bool use_broker = false; 596 if (!GetConfiguration().is_broker_process) { 597 base::AutoLock lock(inviter_lock_); 598 use_broker = (bootstrap_inviter_channel_ || 599 inviter_name_ != ports::kInvalidNodeName); 600 } 601 602 if (use_broker) { 603 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 604 if (broker) { 605 broker->RelayEventMessage(name, std::move(event_message)); 606 } else { 607 base::AutoLock lock(broker_lock_); 608 pending_relay_messages_[name].emplace(std::move(event_message)); 609 } 610 return; 611 } 612 } 613 #endif // defined(OS_WIN) 614 615 if (peer) { 616 peer->SendChannelMessage(std::move(event_message)); 617 return; 618 } 619 620 // If we don't know who the peer is and we are the broker, we can only assume 621 // the peer is invalid, i.e., it's either a junk name or has already been 622 // disconnected. 623 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 624 if (!broker) { 625 DVLOG(1) << "Dropping message for unknown peer: " << name; 626 return; 627 } 628 629 // If we aren't the broker, assume we just need to be introduced and queue 630 // until that can be either confirmed or denied by the broker. 631 bool needs_introduction = false; 632 { 633 base::AutoLock lock(peers_lock_); 634 // We may have been introduced on another thread by the time we get here. 635 // Double-check to be safe. 636 auto it = peers_.find(name); 637 if (it == peers_.end()) { 638 auto& queue = pending_peer_messages_[name]; 639 needs_introduction = queue.empty(); 640 queue.emplace(std::move(event_message)); 641 } else { 642 peer = it->second; 643 } 644 } 645 if (needs_introduction) 646 broker->RequestIntroduction(name); 647 else if (peer) 648 peer->SendChannelMessage(std::move(event_message)); 649 } 650 651 void NodeController::DropAllPeers() { 652 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 653 654 std::vector<scoped_refptr<NodeChannel>> all_peers; 655 { 656 base::AutoLock lock(inviter_lock_); 657 if (bootstrap_inviter_channel_) { 658 // |bootstrap_inviter_channel_| isn't null'd here becuase we rely on its 659 // existence to determine whether or not this is the root node. Once 660 // bootstrap_inviter_channel_->ShutDown() has been called, 661 // |bootstrap_inviter_channel_| is essentially a dead object and it 662 // doesn't matter if it's deleted now or when |this| is deleted. Note: 663 // |bootstrap_inviter_channel_| is only modified on the IO thread. 664 all_peers.push_back(bootstrap_inviter_channel_); 665 } 666 } 667 668 { 669 base::AutoLock lock(peers_lock_); 670 for (const auto& peer : peers_) 671 all_peers.push_back(peer.second); 672 for (const auto& peer : pending_invitations_) 673 all_peers.push_back(peer.second); 674 peers_.clear(); 675 pending_invitations_.clear(); 676 pending_peer_messages_.clear(); 677 pending_isolated_connections_.clear(); 678 named_isolated_connections_.clear(); 679 } 680 681 for (const auto& peer : all_peers) 682 peer->ShutDown(); 683 684 if (destroy_on_io_thread_shutdown_) 685 delete this; 686 } 687 688 void NodeController::ForwardEvent(const ports::NodeName& node, 689 ports::ScopedEvent event) { 690 DCHECK(event); 691 if (node == name_) 692 node_->AcceptEvent(std::move(event)); 693 else 694 SendPeerEvent(node, std::move(event)); 695 696 AttemptShutdownIfRequested(); 697 } 698 699 void NodeController::BroadcastEvent(ports::ScopedEvent event) { 700 Channel::MessagePtr channel_message = SerializeEventMessage(std::move(event)); 701 DCHECK(channel_message && !channel_message->has_handles()); 702 703 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 704 if (broker) 705 broker->Broadcast(std::move(channel_message)); 706 else 707 OnBroadcast(name_, std::move(channel_message)); 708 } 709 710 void NodeController::PortStatusChanged(const ports::PortRef& port) { 711 scoped_refptr<ports::UserData> user_data; 712 node_->GetUserData(port, &user_data); 713 714 PortObserver* observer = static_cast<PortObserver*>(user_data.get()); 715 if (observer) { 716 observer->OnPortStatusChanged(); 717 } else { 718 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " 719 << "doesn't have an observer."; 720 } 721 } 722 723 void NodeController::OnAcceptInvitee(const ports::NodeName& from_node, 724 const ports::NodeName& inviter_name, 725 const ports::NodeName& token) { 726 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 727 728 scoped_refptr<NodeChannel> inviter; 729 { 730 base::AutoLock lock(inviter_lock_); 731 if (bootstrap_inviter_channel_ && 732 inviter_name_ == ports::kInvalidNodeName) { 733 inviter_name_ = inviter_name; 734 inviter = bootstrap_inviter_channel_; 735 } 736 } 737 738 if (!inviter) { 739 DLOG(ERROR) << "Unexpected AcceptInvitee message from " << from_node; 740 DropPeer(from_node, nullptr); 741 return; 742 } 743 744 inviter->SetRemoteNodeName(inviter_name); 745 inviter->AcceptInvitation(token, name_); 746 747 // NOTE: The invitee does not actually add its inviter as a peer until 748 // receiving an AcceptBrokerClient message from the broker. The inviter will 749 // request that said message be sent upon receiving AcceptInvitation. 750 751 DVLOG(1) << "Broker client " << name_ << " accepting invitation from " 752 << inviter_name; 753 } 754 755 void NodeController::OnAcceptInvitation(const ports::NodeName& from_node, 756 const ports::NodeName& token, 757 const ports::NodeName& invitee_name) { 758 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 759 760 auto it = pending_invitations_.find(from_node); 761 if (it == pending_invitations_.end() || token != from_node) { 762 DLOG(ERROR) << "Received unexpected AcceptInvitation message from " 763 << from_node; 764 DropPeer(from_node, nullptr); 765 return; 766 } 767 768 { 769 base::AutoLock lock(reserved_ports_lock_); 770 auto it = reserved_ports_.find(from_node); 771 if (it != reserved_ports_.end()) { 772 // Swap the temporary node name's reserved ports into an entry keyed by 773 // the real node name. 774 auto result = 775 reserved_ports_.emplace(invitee_name, std::move(it->second)); 776 DCHECK(result.second); 777 reserved_ports_.erase(it); 778 } 779 } 780 781 scoped_refptr<NodeChannel> channel = it->second; 782 pending_invitations_.erase(it); 783 784 DCHECK(channel); 785 786 DVLOG(1) << "Node " << name_ << " accepted invitee " << invitee_name; 787 788 AddPeer(invitee_name, channel, false /* start_channel */); 789 790 // TODO(rockot): We could simplify invitee initialization if we could 791 // synchronously get a new async broker channel from the broker. For now we do 792 // it asynchronously since it's only used to facilitate handle passing, not 793 // handle creation. 794 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); 795 if (broker) { 796 // Inform the broker of this new client. 797 broker->AddBrokerClient(invitee_name, channel->CloneRemoteProcessHandle()); 798 } else { 799 // If we have no broker, either we need to wait for one, or we *are* the 800 // broker. 801 scoped_refptr<NodeChannel> inviter = GetInviterChannel(); 802 if (!inviter) { 803 base::AutoLock lock(inviter_lock_); 804 inviter = bootstrap_inviter_channel_; 805 } 806 807 if (!inviter) { 808 // Yes, we're the broker. We can initialize the client directly. 809 channel->AcceptBrokerClient(name_, PlatformHandle()); 810 } else { 811 // We aren't the broker, so wait for a broker connection. 812 base::AutoLock lock(broker_lock_); 813 pending_broker_clients_.push(invitee_name); 814 } 815 } 816 } 817 818 void NodeController::OnAddBrokerClient(const ports::NodeName& from_node, 819 const ports::NodeName& client_name, 820 base::ProcessHandle process_handle) { 821 ScopedProcessHandle scoped_process_handle(process_handle); 822 823 scoped_refptr<NodeChannel> sender = GetPeerChannel(from_node); 824 if (!sender) { 825 DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender."; 826 return; 827 } 828 829 if (GetPeerChannel(client_name)) { 830 DLOG(ERROR) << "Ignoring AddBrokerClient for known client."; 831 DropPeer(from_node, nullptr); 832 return; 833 } 834 835 PlatformChannel broker_channel; 836 ConnectionParams connection_params(broker_channel.TakeLocalEndpoint()); 837 scoped_refptr<NodeChannel> client = 838 NodeChannel::Create(this, std::move(connection_params), io_task_runner_, 839 ProcessErrorCallback()); 840 841 #if defined(OS_WIN) 842 // The broker must have a working handle to the client process in order to 843 // properly copy other handles to and from the client. 844 if (!scoped_process_handle.is_valid()) { 845 DLOG(ERROR) << "Broker rejecting client with invalid process handle."; 846 return; 847 } 848 #endif 849 client->SetRemoteProcessHandle(std::move(scoped_process_handle)); 850 851 AddPeer(client_name, client, true /* start_channel */); 852 853 DVLOG(1) << "Broker " << name_ << " accepting client " << client_name 854 << " from peer " << from_node; 855 856 sender->BrokerClientAdded( 857 client_name, broker_channel.TakeRemoteEndpoint().TakePlatformHandle()); 858 } 859 860 void NodeController::OnBrokerClientAdded(const ports::NodeName& from_node, 861 const ports::NodeName& client_name, 862 PlatformHandle broker_channel) { 863 scoped_refptr<NodeChannel> client = GetPeerChannel(client_name); 864 if (!client) { 865 DLOG(ERROR) << "BrokerClientAdded for unknown client " << client_name; 866 return; 867 } 868 869 // This should have come from our own broker. 870 if (GetBrokerChannel() != GetPeerChannel(from_node)) { 871 DLOG(ERROR) << "BrokerClientAdded from non-broker node " << from_node; 872 return; 873 } 874 875 DVLOG(1) << "Client " << client_name << " accepted by broker " << from_node; 876 877 client->AcceptBrokerClient(from_node, std::move(broker_channel)); 878 } 879 880 void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node, 881 const ports::NodeName& broker_name, 882 PlatformHandle broker_channel) { 883 DCHECK(!GetConfiguration().is_broker_process); 884 885 // This node should already have an inviter in bootstrap mode. 886 ports::NodeName inviter_name; 887 scoped_refptr<NodeChannel> inviter; 888 { 889 base::AutoLock lock(inviter_lock_); 890 inviter_name = inviter_name_; 891 inviter = bootstrap_inviter_channel_; 892 bootstrap_inviter_channel_ = nullptr; 893 } 894 DCHECK(inviter_name == from_node); 895 DCHECK(inviter); 896 897 base::queue<ports::NodeName> pending_broker_clients; 898 std::unordered_map<ports::NodeName, OutgoingMessageQueue> 899 pending_relay_messages; 900 { 901 base::AutoLock lock(broker_lock_); 902 broker_name_ = broker_name; 903 std::swap(pending_broker_clients, pending_broker_clients_); 904 std::swap(pending_relay_messages, pending_relay_messages_); 905 } 906 DCHECK(broker_name != ports::kInvalidNodeName); 907 908 // It's now possible to add both the broker and the inviter as peers. 909 // Note that the broker and inviter may be the same node. 910 scoped_refptr<NodeChannel> broker; 911 if (broker_name == inviter_name) { 912 DCHECK(!broker_channel.is_valid()); 913 broker = inviter; 914 } else { 915 DCHECK(broker_channel.is_valid()); 916 broker = NodeChannel::Create( 917 this, 918 ConnectionParams(PlatformChannelEndpoint(std::move(broker_channel))), 919 io_task_runner_, ProcessErrorCallback()); 920 AddPeer(broker_name, broker, true /* start_channel */); 921 } 922 923 AddPeer(inviter_name, inviter, false /* start_channel */); 924 925 { 926 // Complete any port merge requests we have waiting for the inviter. 927 base::AutoLock lock(pending_port_merges_lock_); 928 for (const auto& request : pending_port_merges_) 929 inviter->RequestPortMerge(request.second.name(), request.first); 930 pending_port_merges_.clear(); 931 } 932 933 // Feed the broker any pending invitees of our own. 934 while (!pending_broker_clients.empty()) { 935 const ports::NodeName& invitee_name = pending_broker_clients.front(); 936 auto it = pending_invitations_.find(invitee_name); 937 // If for any reason we don't have a pending invitation for the invitee, 938 // there's nothing left to do: we've already swapped the relevant state into 939 // the stack. 940 if (it != pending_invitations_.end()) { 941 broker->AddBrokerClient(invitee_name, 942 it->second->CloneRemoteProcessHandle()); 943 } 944 pending_broker_clients.pop(); 945 } 946 947 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 948 // Have the broker relay any messages we have waiting. 949 for (auto& entry : pending_relay_messages) { 950 const ports::NodeName& destination = entry.first; 951 auto& message_queue = entry.second; 952 while (!message_queue.empty()) { 953 broker->RelayEventMessage(destination, std::move(message_queue.front())); 954 message_queue.pop(); 955 } 956 } 957 #endif 958 959 DVLOG(1) << "Client " << name_ << " accepted by broker " << broker_name; 960 } 961 962 void NodeController::OnEventMessage(const ports::NodeName& from_node, 963 Channel::MessagePtr channel_message) { 964 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 965 966 auto event = DeserializeEventMessage(from_node, std::move(channel_message)); 967 if (!event) { 968 // We silently ignore unparseable events, as they may come from a process 969 // running a newer version of Mojo. 970 DVLOG(1) << "Ignoring invalid or unknown event from " << from_node; 971 return; 972 } 973 974 node_->AcceptEvent(std::move(event)); 975 976 AttemptShutdownIfRequested(); 977 } 978 979 void NodeController::OnRequestPortMerge( 980 const ports::NodeName& from_node, 981 const ports::PortName& connector_port_name, 982 const std::string& name) { 983 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 984 985 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for name " << name 986 << " and port " << connector_port_name << "@" << from_node; 987 988 ports::PortRef local_port; 989 { 990 base::AutoLock lock(reserved_ports_lock_); 991 auto it = reserved_ports_.find(from_node); 992 // TODO(https://crbug.com/822034): We should send a notification back to the 993 // requestor so they can clean up their dangling port in this failure case. 994 // This requires changes to the internal protocol, which can't be made yet. 995 // Until this is done, pipes from |MojoExtractMessagePipeFromInvitation()| 996 // will never break if the given name was invalid. 997 if (it == reserved_ports_.end()) { 998 DVLOG(1) << "Ignoring port merge request from node " << from_node << ". " 999 << "No ports reserved for that node."; 1000 return; 1001 } 1002 1003 PortMap& port_map = it->second; 1004 auto port_it = port_map.find(name); 1005 if (port_it == port_map.end()) { 1006 DVLOG(1) << "Ignoring request to connect to port for unknown name " 1007 << name << " from node " << from_node; 1008 return; 1009 } 1010 local_port = port_it->second; 1011 port_map.erase(port_it); 1012 if (port_map.empty()) 1013 reserved_ports_.erase(it); 1014 } 1015 1016 int rv = node_->MergePorts(local_port, from_node, connector_port_name); 1017 if (rv != ports::OK) 1018 DLOG(ERROR) << "MergePorts failed: " << rv; 1019 } 1020 1021 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, 1022 const ports::NodeName& name) { 1023 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 1024 1025 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); 1026 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { 1027 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " 1028 << from_node; 1029 DropPeer(from_node, nullptr); 1030 return; 1031 } 1032 1033 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name); 1034 if (!new_friend) { 1035 // We don't know who they're talking about! 1036 requestor->Introduce(name, PlatformHandle()); 1037 } else { 1038 PlatformChannel new_channel; 1039 requestor->Introduce(name, 1040 new_channel.TakeLocalEndpoint().TakePlatformHandle()); 1041 new_friend->Introduce( 1042 from_node, new_channel.TakeRemoteEndpoint().TakePlatformHandle()); 1043 } 1044 } 1045 1046 void NodeController::OnIntroduce(const ports::NodeName& from_node, 1047 const ports::NodeName& name, 1048 PlatformHandle channel_handle) { 1049 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 1050 1051 if (!channel_handle.is_valid()) { 1052 node_->LostConnectionToNode(name); 1053 1054 DVLOG(1) << "Could not be introduced to peer " << name; 1055 base::AutoLock lock(peers_lock_); 1056 pending_peer_messages_.erase(name); 1057 return; 1058 } 1059 1060 scoped_refptr<NodeChannel> channel = NodeChannel::Create( 1061 this, 1062 ConnectionParams(PlatformChannelEndpoint(std::move(channel_handle))), 1063 io_task_runner_, ProcessErrorCallback()); 1064 1065 DVLOG(1) << "Adding new peer " << name << " via broker introduction."; 1066 AddPeer(name, channel, true /* start_channel */); 1067 } 1068 1069 void NodeController::OnBroadcast(const ports::NodeName& from_node, 1070 Channel::MessagePtr message) { 1071 DCHECK(!message->has_handles()); 1072 1073 auto event = DeserializeEventMessage(from_node, std::move(message)); 1074 if (!event) { 1075 // We silently ignore unparseable events, as they may come from a process 1076 // running a newer version of Mojo. 1077 DVLOG(1) << "Ignoring request to broadcast invalid or unknown event from " 1078 << from_node; 1079 return; 1080 } 1081 1082 base::AutoLock lock(peers_lock_); 1083 for (auto& iter : peers_) { 1084 // Clone and send the event to each known peer. Events which cannot be 1085 // cloned cannot be broadcast. 1086 ports::ScopedEvent clone = event->Clone(); 1087 if (!clone) { 1088 DVLOG(1) << "Ignoring request to broadcast invalid event from " 1089 << from_node << " [type=" << static_cast<uint32_t>(event->type()) 1090 << "]"; 1091 return; 1092 } 1093 1094 iter.second->SendChannelMessage(SerializeEventMessage(std::move(clone))); 1095 } 1096 } 1097 1098 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 1099 void NodeController::OnRelayEventMessage(const ports::NodeName& from_node, 1100 base::ProcessHandle from_process, 1101 const ports::NodeName& destination, 1102 Channel::MessagePtr message) { 1103 // The broker should always know which process this came from. 1104 DCHECK(from_process != base::kNullProcessHandle); 1105 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 1106 1107 if (GetBrokerChannel()) { 1108 // Only the broker should be asked to relay a message. 1109 LOG(ERROR) << "Non-broker refusing to relay message."; 1110 DropPeer(from_node, nullptr); 1111 return; 1112 } 1113 1114 if (destination == name_) { 1115 // Great, we can deliver this message locally. 1116 OnEventMessage(from_node, std::move(message)); 1117 return; 1118 } 1119 1120 scoped_refptr<NodeChannel> peer = GetPeerChannel(destination); 1121 if (peer) 1122 peer->EventMessageFromRelay(from_node, std::move(message)); 1123 else 1124 DLOG(ERROR) << "Dropping relay message for unknown node " << destination; 1125 } 1126 1127 void NodeController::OnEventMessageFromRelay(const ports::NodeName& from_node, 1128 const ports::NodeName& source_node, 1129 Channel::MessagePtr message) { 1130 if (GetPeerChannel(from_node) != GetBrokerChannel()) { 1131 LOG(ERROR) << "Refusing relayed message from non-broker node."; 1132 DropPeer(from_node, nullptr); 1133 return; 1134 } 1135 1136 OnEventMessage(source_node, std::move(message)); 1137 } 1138 #endif 1139 1140 void NodeController::OnAcceptPeer(const ports::NodeName& from_node, 1141 const ports::NodeName& token, 1142 const ports::NodeName& peer_name, 1143 const ports::PortName& port_name) { 1144 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 1145 1146 auto it = pending_isolated_connections_.find(from_node); 1147 if (it == pending_isolated_connections_.end()) { 1148 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node; 1149 DropPeer(from_node, nullptr); 1150 return; 1151 } 1152 1153 IsolatedConnection& connection = it->second; 1154 scoped_refptr<NodeChannel> channel = std::move(connection.channel); 1155 ports::PortRef local_port = connection.local_port; 1156 if (!connection.name.empty()) 1157 named_isolated_connections_[connection.name] = peer_name; 1158 pending_isolated_connections_.erase(it); 1159 DCHECK(channel); 1160 1161 if (name_ != peer_name) { 1162 // It's possible (e.g. in tests) that we may "connect" to ourself, in which 1163 // case we skip this |AddPeer()| call and go straight to merging ports. 1164 // Note that we explicitly drop any prior connection to the same peer so 1165 // that new isolated connections can replace old ones. 1166 DropPeer(peer_name, nullptr); 1167 AddPeer(peer_name, channel, false /* start_channel */); 1168 DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name; 1169 } 1170 1171 // We need to choose one side to initiate the port merge. It doesn't matter 1172 // who does it as long as they don't both try. Simple solution: pick the one 1173 // with the "smaller" port name. 1174 if (local_port.name() < port_name) 1175 node()->MergePorts(local_port, peer_name, port_name); 1176 } 1177 1178 void NodeController::OnChannelError(const ports::NodeName& from_node, 1179 NodeChannel* channel) { 1180 if (io_task_runner_->RunsTasksInCurrentSequence()) { 1181 RequestContext request_context(RequestContext::Source::SYSTEM); 1182 DropPeer(from_node, channel); 1183 } else { 1184 io_task_runner_->PostTask( 1185 FROM_HERE, 1186 base::Bind(&NodeController::OnChannelError, base::Unretained(this), 1187 from_node, base::RetainedRef(channel))); 1188 } 1189 } 1190 1191 #if defined(OS_MACOSX) && !defined(OS_IOS) 1192 MachPortRelay* NodeController::GetMachPortRelay() { 1193 { 1194 base::AutoLock lock(inviter_lock_); 1195 // Return null if we're not the root. 1196 if (bootstrap_inviter_channel_ || inviter_name_ != ports::kInvalidNodeName) 1197 return nullptr; 1198 } 1199 1200 base::AutoLock lock(mach_port_relay_lock_); 1201 return mach_port_relay_.get(); 1202 } 1203 #endif 1204 1205 void NodeController::CancelPendingPortMerges() { 1206 std::vector<ports::PortRef> ports_to_close; 1207 1208 { 1209 base::AutoLock lock(pending_port_merges_lock_); 1210 reject_pending_merges_ = true; 1211 for (const auto& port : pending_port_merges_) 1212 ports_to_close.push_back(port.second); 1213 pending_port_merges_.clear(); 1214 } 1215 1216 for (const auto& port : ports_to_close) 1217 node_->ClosePort(port); 1218 } 1219 1220 void NodeController::DestroyOnIOThreadShutdown() { 1221 destroy_on_io_thread_shutdown_ = true; 1222 } 1223 1224 void NodeController::AttemptShutdownIfRequested() { 1225 if (!shutdown_callback_flag_) 1226 return; 1227 1228 base::Closure callback; 1229 { 1230 base::AutoLock lock(shutdown_lock_); 1231 if (shutdown_callback_.is_null()) 1232 return; 1233 if (!node_->CanShutdownCleanly( 1234 ports::Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)) { 1235 DVLOG(2) << "Unable to cleanly shut down node " << name_; 1236 return; 1237 } 1238 1239 callback = shutdown_callback_; 1240 shutdown_callback_.Reset(); 1241 shutdown_callback_flag_.Set(false); 1242 } 1243 1244 DCHECK(!callback.is_null()); 1245 1246 callback.Run(); 1247 } 1248 1249 NodeController::IsolatedConnection::IsolatedConnection() = default; 1250 1251 NodeController::IsolatedConnection::IsolatedConnection( 1252 const IsolatedConnection& other) = default; 1253 1254 NodeController::IsolatedConnection::IsolatedConnection( 1255 IsolatedConnection&& other) = default; 1256 1257 NodeController::IsolatedConnection::IsolatedConnection( 1258 scoped_refptr<NodeChannel> channel, 1259 const ports::PortRef& local_port, 1260 base::StringPiece name) 1261 : channel(std::move(channel)), local_port(local_port), name(name) {} 1262 1263 NodeController::IsolatedConnection::~IsolatedConnection() = default; 1264 1265 NodeController::IsolatedConnection& NodeController::IsolatedConnection:: 1266 operator=(const IsolatedConnection& other) = default; 1267 1268 NodeController::IsolatedConnection& NodeController::IsolatedConnection:: 1269 operator=(IsolatedConnection&& other) = default; 1270 1271 } // namespace core 1272 } // namespace mojo 1273