1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "ipc/ipc_mojo_bootstrap.h" 6 7 #include <inttypes.h> 8 #include <stdint.h> 9 10 #include <map> 11 #include <memory> 12 #include <set> 13 #include <utility> 14 #include <vector> 15 16 #include "base/callback.h" 17 #include "base/containers/queue.h" 18 #include "base/logging.h" 19 #include "base/macros.h" 20 #include "base/memory/ptr_util.h" 21 #include "base/no_destructor.h" 22 #include "base/sequenced_task_runner.h" 23 #include "base/single_thread_task_runner.h" 24 #include "base/strings/stringprintf.h" 25 #include "base/synchronization/lock.h" 26 #include "base/threading/thread_checker.h" 27 #include "base/threading/thread_task_runner_handle.h" 28 #include "base/trace_event/memory_allocator_dump.h" 29 #include "base/trace_event/memory_dump_manager.h" 30 #include "base/trace_event/memory_dump_provider.h" 31 #include "ipc/ipc_channel.h" 32 #include "mojo/public/cpp/bindings/associated_group.h" 33 #include "mojo/public/cpp/bindings/associated_group_controller.h" 34 #include "mojo/public/cpp/bindings/connector.h" 35 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" 36 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" 37 #include "mojo/public/cpp/bindings/interface_id.h" 38 #include "mojo/public/cpp/bindings/message.h" 39 #include "mojo/public/cpp/bindings/message_header_validator.h" 40 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h" 41 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h" 42 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h" 43 #include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h" 44 45 namespace IPC { 46 47 namespace { 48 49 class ChannelAssociatedGroupController; 50 51 // Used to track some internal Channel state in pursuit of message leaks. 52 // 53 // TODO(https://crbug.com/813045): Remove this. 54 class ControllerMemoryDumpProvider 55 : public base::trace_event::MemoryDumpProvider { 56 public: 57 ControllerMemoryDumpProvider() { 58 base::trace_event::MemoryDumpManager::GetInstance()->RegisterDumpProvider( 59 this, "IPCChannel", nullptr); 60 } 61 62 ~ControllerMemoryDumpProvider() override { 63 base::trace_event::MemoryDumpManager::GetInstance()->UnregisterDumpProvider( 64 this); 65 } 66 67 void AddController(ChannelAssociatedGroupController* controller) { 68 base::AutoLock lock(lock_); 69 controllers_.insert(controller); 70 } 71 72 void RemoveController(ChannelAssociatedGroupController* controller) { 73 base::AutoLock lock(lock_); 74 controllers_.erase(controller); 75 } 76 77 // base::trace_event::MemoryDumpProvider: 78 bool OnMemoryDump(const base::trace_event::MemoryDumpArgs& args, 79 base::trace_event::ProcessMemoryDump* pmd) override; 80 81 private: 82 base::Lock lock_; 83 std::set<ChannelAssociatedGroupController*> controllers_; 84 85 DISALLOW_COPY_AND_ASSIGN(ControllerMemoryDumpProvider); 86 }; 87 88 ControllerMemoryDumpProvider& GetMemoryDumpProvider() { 89 static base::NoDestructor<ControllerMemoryDumpProvider> provider; 90 return *provider; 91 } 92 93 class ChannelAssociatedGroupController 94 : public mojo::AssociatedGroupController, 95 public mojo::MessageReceiver, 96 public mojo::PipeControlMessageHandlerDelegate { 97 public: 98 ChannelAssociatedGroupController( 99 bool set_interface_id_namespace_bit, 100 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 101 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) 102 : task_runner_(ipc_task_runner), 103 proxy_task_runner_(proxy_task_runner), 104 set_interface_id_namespace_bit_(set_interface_id_namespace_bit), 105 filters_(this), 106 control_message_handler_(this), 107 control_message_proxy_thunk_(this), 108 control_message_proxy_(&control_message_proxy_thunk_) { 109 thread_checker_.DetachFromThread(); 110 control_message_handler_.SetDescription( 111 "IPC::mojom::Bootstrap [master] PipeControlMessageHandler"); 112 filters_.Append<mojo::MessageHeaderValidator>( 113 "IPC::mojom::Bootstrap [master] MessageHeaderValidator"); 114 115 GetMemoryDumpProvider().AddController(this); 116 } 117 118 size_t GetQueuedMessageCount() { 119 base::AutoLock lock(outgoing_messages_lock_); 120 return outgoing_messages_.size(); 121 } 122 123 void Bind(mojo::ScopedMessagePipeHandle handle) { 124 DCHECK(thread_checker_.CalledOnValidThread()); 125 DCHECK(task_runner_->BelongsToCurrentThread()); 126 127 connector_.reset(new mojo::Connector( 128 std::move(handle), mojo::Connector::SINGLE_THREADED_SEND, 129 task_runner_)); 130 connector_->set_incoming_receiver(&filters_); 131 connector_->set_connection_error_handler( 132 base::Bind(&ChannelAssociatedGroupController::OnPipeError, 133 base::Unretained(this))); 134 connector_->set_enforce_errors_from_incoming_receiver(false); 135 connector_->SetWatcherHeapProfilerTag("IPC Channel"); 136 } 137 138 void Pause() { 139 DCHECK(!paused_); 140 paused_ = true; 141 } 142 143 void Unpause() { 144 DCHECK(paused_); 145 paused_ = false; 146 } 147 148 void FlushOutgoingMessages() { 149 std::vector<mojo::Message> outgoing_messages; 150 { 151 base::AutoLock lock(outgoing_messages_lock_); 152 std::swap(outgoing_messages, outgoing_messages_); 153 } 154 for (auto& message : outgoing_messages) 155 SendMessage(&message); 156 } 157 158 void CreateChannelEndpoints(mojom::ChannelAssociatedPtr* sender, 159 mojom::ChannelAssociatedRequest* receiver) { 160 mojo::InterfaceId sender_id, receiver_id; 161 if (set_interface_id_namespace_bit_) { 162 sender_id = 1 | mojo::kInterfaceIdNamespaceMask; 163 receiver_id = 1; 164 } else { 165 sender_id = 1; 166 receiver_id = 1 | mojo::kInterfaceIdNamespaceMask; 167 } 168 169 { 170 base::AutoLock locker(lock_); 171 Endpoint* sender_endpoint = new Endpoint(this, sender_id); 172 Endpoint* receiver_endpoint = new Endpoint(this, receiver_id); 173 endpoints_.insert({ sender_id, sender_endpoint }); 174 endpoints_.insert({ receiver_id, receiver_endpoint }); 175 sender_endpoint->set_handle_created(); 176 receiver_endpoint->set_handle_created(); 177 } 178 179 mojo::ScopedInterfaceEndpointHandle sender_handle = 180 CreateScopedInterfaceEndpointHandle(sender_id); 181 mojo::ScopedInterfaceEndpointHandle receiver_handle = 182 CreateScopedInterfaceEndpointHandle(receiver_id); 183 184 sender->Bind(mojom::ChannelAssociatedPtrInfo(std::move(sender_handle), 0)); 185 *receiver = mojom::ChannelAssociatedRequest(std::move(receiver_handle)); 186 } 187 188 void ShutDown() { 189 DCHECK(thread_checker_.CalledOnValidThread()); 190 shut_down_ = true; 191 connector_->CloseMessagePipe(); 192 OnPipeError(); 193 connector_.reset(); 194 195 base::AutoLock lock(outgoing_messages_lock_); 196 outgoing_messages_.clear(); 197 } 198 199 // mojo::AssociatedGroupController: 200 mojo::InterfaceId AssociateInterface( 201 mojo::ScopedInterfaceEndpointHandle handle_to_send) override { 202 if (!handle_to_send.pending_association()) 203 return mojo::kInvalidInterfaceId; 204 205 uint32_t id = 0; 206 { 207 base::AutoLock locker(lock_); 208 do { 209 if (next_interface_id_ >= mojo::kInterfaceIdNamespaceMask) 210 next_interface_id_ = 2; 211 id = next_interface_id_++; 212 if (set_interface_id_namespace_bit_) 213 id |= mojo::kInterfaceIdNamespaceMask; 214 } while (ContainsKey(endpoints_, id)); 215 216 Endpoint* endpoint = new Endpoint(this, id); 217 if (encountered_error_) 218 endpoint->set_peer_closed(); 219 endpoint->set_handle_created(); 220 endpoints_.insert({id, endpoint}); 221 } 222 223 if (!NotifyAssociation(&handle_to_send, id)) { 224 // The peer handle of |handle_to_send|, which is supposed to join this 225 // associated group, has been closed. 226 { 227 base::AutoLock locker(lock_); 228 Endpoint* endpoint = FindEndpoint(id); 229 if (endpoint) 230 MarkClosedAndMaybeRemove(endpoint); 231 } 232 233 control_message_proxy_.NotifyPeerEndpointClosed( 234 id, handle_to_send.disconnect_reason()); 235 } 236 return id; 237 } 238 239 mojo::ScopedInterfaceEndpointHandle CreateLocalEndpointHandle( 240 mojo::InterfaceId id) override { 241 if (!mojo::IsValidInterfaceId(id)) 242 return mojo::ScopedInterfaceEndpointHandle(); 243 244 // Unless it is the master ID, |id| is from the remote side and therefore 245 // its namespace bit is supposed to be different than the value that this 246 // router would use. 247 if (!mojo::IsMasterInterfaceId(id) && 248 set_interface_id_namespace_bit_ == 249 mojo::HasInterfaceIdNamespaceBitSet(id)) { 250 return mojo::ScopedInterfaceEndpointHandle(); 251 } 252 253 base::AutoLock locker(lock_); 254 bool inserted = false; 255 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); 256 if (inserted) { 257 DCHECK(!endpoint->handle_created()); 258 if (encountered_error_) 259 endpoint->set_peer_closed(); 260 } else { 261 if (endpoint->handle_created()) 262 return mojo::ScopedInterfaceEndpointHandle(); 263 } 264 265 endpoint->set_handle_created(); 266 return CreateScopedInterfaceEndpointHandle(id); 267 } 268 269 void CloseEndpointHandle( 270 mojo::InterfaceId id, 271 const base::Optional<mojo::DisconnectReason>& reason) override { 272 if (!mojo::IsValidInterfaceId(id)) 273 return; 274 { 275 base::AutoLock locker(lock_); 276 DCHECK(ContainsKey(endpoints_, id)); 277 Endpoint* endpoint = endpoints_[id].get(); 278 DCHECK(!endpoint->client()); 279 DCHECK(!endpoint->closed()); 280 MarkClosedAndMaybeRemove(endpoint); 281 } 282 283 if (!mojo::IsMasterInterfaceId(id) || reason) 284 control_message_proxy_.NotifyPeerEndpointClosed(id, reason); 285 } 286 287 mojo::InterfaceEndpointController* AttachEndpointClient( 288 const mojo::ScopedInterfaceEndpointHandle& handle, 289 mojo::InterfaceEndpointClient* client, 290 scoped_refptr<base::SequencedTaskRunner> runner) override { 291 const mojo::InterfaceId id = handle.id(); 292 293 DCHECK(mojo::IsValidInterfaceId(id)); 294 DCHECK(client); 295 296 base::AutoLock locker(lock_); 297 DCHECK(ContainsKey(endpoints_, id)); 298 299 Endpoint* endpoint = endpoints_[id].get(); 300 endpoint->AttachClient(client, std::move(runner)); 301 302 if (endpoint->peer_closed()) 303 NotifyEndpointOfError(endpoint, true /* force_async */); 304 305 return endpoint; 306 } 307 308 void DetachEndpointClient( 309 const mojo::ScopedInterfaceEndpointHandle& handle) override { 310 const mojo::InterfaceId id = handle.id(); 311 312 DCHECK(mojo::IsValidInterfaceId(id)); 313 314 base::AutoLock locker(lock_); 315 DCHECK(ContainsKey(endpoints_, id)); 316 317 Endpoint* endpoint = endpoints_[id].get(); 318 endpoint->DetachClient(); 319 } 320 321 void RaiseError() override { 322 // We ignore errors on channel endpoints, leaving the pipe open. There are 323 // good reasons for this: 324 // 325 // * We should never close a channel endpoint in either process as long as 326 // the child process is still alive. The child's endpoint should only be 327 // closed implicitly by process death, and the browser's endpoint should 328 // only be closed after the child process is confirmed to be dead. Crash 329 // reporting logic in Chrome relies on this behavior in order to do the 330 // right thing. 331 // 332 // * There are two interesting conditions under which RaiseError() can be 333 // implicitly reached: an incoming message fails validation, or the 334 // local endpoint drops a response callback without calling it. 335 // 336 // * In the validation case, we also report the message as bad, and this 337 // will imminently trigger the common bad-IPC path in the browser, 338 // causing the browser to kill the offending renderer. 339 // 340 // * In the dropped response callback case, the net result of ignoring the 341 // issue is generally innocuous. While indicative of programmer error, 342 // it's not a severe failure and is already covered by separate DCHECKs. 343 // 344 // See https://crbug.com/861607 for additional discussion. 345 } 346 347 bool PrefersSerializedMessages() override { return true; } 348 349 private: 350 class Endpoint; 351 class ControlMessageProxyThunk; 352 friend class Endpoint; 353 friend class ControlMessageProxyThunk; 354 355 // MessageWrapper objects are always destroyed under the controller's lock. On 356 // destruction, if the message it wrappers contains 357 // ScopedInterfaceEndpointHandles (which cannot be destructed under the 358 // controller's lock), the wrapper unlocks to clean them up. 359 class MessageWrapper { 360 public: 361 MessageWrapper() = default; 362 363 MessageWrapper(ChannelAssociatedGroupController* controller, 364 mojo::Message message) 365 : controller_(controller), value_(std::move(message)) {} 366 367 MessageWrapper(MessageWrapper&& other) 368 : controller_(other.controller_), value_(std::move(other.value_)) {} 369 370 ~MessageWrapper() { 371 if (value_.associated_endpoint_handles()->empty()) 372 return; 373 374 controller_->lock_.AssertAcquired(); 375 { 376 base::AutoUnlock unlocker(controller_->lock_); 377 value_.mutable_associated_endpoint_handles()->clear(); 378 } 379 } 380 381 MessageWrapper& operator=(MessageWrapper&& other) { 382 controller_ = other.controller_; 383 value_ = std::move(other.value_); 384 return *this; 385 } 386 387 mojo::Message& value() { return value_; } 388 389 private: 390 ChannelAssociatedGroupController* controller_ = nullptr; 391 mojo::Message value_; 392 393 DISALLOW_COPY_AND_ASSIGN(MessageWrapper); 394 }; 395 396 class Endpoint : public base::RefCountedThreadSafe<Endpoint>, 397 public mojo::InterfaceEndpointController { 398 public: 399 Endpoint(ChannelAssociatedGroupController* controller, mojo::InterfaceId id) 400 : controller_(controller), id_(id) {} 401 402 mojo::InterfaceId id() const { return id_; } 403 404 bool closed() const { 405 controller_->lock_.AssertAcquired(); 406 return closed_; 407 } 408 409 void set_closed() { 410 controller_->lock_.AssertAcquired(); 411 closed_ = true; 412 } 413 414 bool peer_closed() const { 415 controller_->lock_.AssertAcquired(); 416 return peer_closed_; 417 } 418 419 void set_peer_closed() { 420 controller_->lock_.AssertAcquired(); 421 peer_closed_ = true; 422 } 423 424 bool handle_created() const { 425 controller_->lock_.AssertAcquired(); 426 return handle_created_; 427 } 428 429 void set_handle_created() { 430 controller_->lock_.AssertAcquired(); 431 handle_created_ = true; 432 } 433 434 const base::Optional<mojo::DisconnectReason>& disconnect_reason() const { 435 return disconnect_reason_; 436 } 437 438 void set_disconnect_reason( 439 const base::Optional<mojo::DisconnectReason>& disconnect_reason) { 440 disconnect_reason_ = disconnect_reason; 441 } 442 443 base::SequencedTaskRunner* task_runner() const { 444 return task_runner_.get(); 445 } 446 447 mojo::InterfaceEndpointClient* client() const { 448 controller_->lock_.AssertAcquired(); 449 return client_; 450 } 451 452 void AttachClient(mojo::InterfaceEndpointClient* client, 453 scoped_refptr<base::SequencedTaskRunner> runner) { 454 controller_->lock_.AssertAcquired(); 455 DCHECK(!client_); 456 DCHECK(!closed_); 457 DCHECK(runner->RunsTasksInCurrentSequence()); 458 459 task_runner_ = std::move(runner); 460 client_ = client; 461 } 462 463 void DetachClient() { 464 controller_->lock_.AssertAcquired(); 465 DCHECK(client_); 466 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 467 DCHECK(!closed_); 468 469 task_runner_ = nullptr; 470 client_ = nullptr; 471 sync_watcher_.reset(); 472 } 473 474 uint32_t EnqueueSyncMessage(MessageWrapper message) { 475 controller_->lock_.AssertAcquired(); 476 uint32_t id = GenerateSyncMessageId(); 477 sync_messages_.emplace(id, std::move(message)); 478 SignalSyncMessageEvent(); 479 return id; 480 } 481 482 void SignalSyncMessageEvent() { 483 controller_->lock_.AssertAcquired(); 484 485 if (sync_watcher_) 486 sync_watcher_->SignalEvent(); 487 } 488 489 MessageWrapper PopSyncMessage(uint32_t id) { 490 controller_->lock_.AssertAcquired(); 491 if (sync_messages_.empty() || sync_messages_.front().first != id) 492 return MessageWrapper(); 493 MessageWrapper message = std::move(sync_messages_.front().second); 494 sync_messages_.pop(); 495 return message; 496 } 497 498 // mojo::InterfaceEndpointController: 499 bool SendMessage(mojo::Message* message) override { 500 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 501 message->set_interface_id(id_); 502 return controller_->SendMessage(message); 503 } 504 505 void AllowWokenUpBySyncWatchOnSameThread() override { 506 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 507 508 EnsureSyncWatcherExists(); 509 sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence(); 510 } 511 512 bool SyncWatch(const bool* should_stop) override { 513 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 514 515 // It's not legal to make sync calls from the master endpoint's thread, 516 // and in fact they must only happen from the proxy task runner. 517 DCHECK(!controller_->task_runner_->BelongsToCurrentThread()); 518 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); 519 520 EnsureSyncWatcherExists(); 521 return sync_watcher_->SyncWatch(should_stop); 522 } 523 524 private: 525 friend class base::RefCountedThreadSafe<Endpoint>; 526 527 ~Endpoint() override { 528 controller_->lock_.AssertAcquired(); 529 DCHECK(!client_); 530 DCHECK(closed_); 531 DCHECK(peer_closed_); 532 DCHECK(!sync_watcher_); 533 } 534 535 void OnSyncMessageEventReady() { 536 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 537 538 scoped_refptr<Endpoint> keepalive(this); 539 scoped_refptr<AssociatedGroupController> controller_keepalive( 540 controller_); 541 base::AutoLock locker(controller_->lock_); 542 bool more_to_process = false; 543 if (!sync_messages_.empty()) { 544 MessageWrapper message_wrapper = 545 std::move(sync_messages_.front().second); 546 sync_messages_.pop(); 547 548 bool dispatch_succeeded; 549 mojo::InterfaceEndpointClient* client = client_; 550 { 551 base::AutoUnlock unlocker(controller_->lock_); 552 dispatch_succeeded = 553 client->HandleIncomingMessage(&message_wrapper.value()); 554 } 555 556 if (!sync_messages_.empty()) 557 more_to_process = true; 558 559 if (!dispatch_succeeded) 560 controller_->RaiseError(); 561 } 562 563 if (!more_to_process) 564 sync_watcher_->ResetEvent(); 565 566 // If there are no queued sync messages and the peer has closed, there 567 // there won't be incoming sync messages in the future. If any 568 // SyncWatch() calls are on the stack for this endpoint, resetting the 569 // watcher will allow them to exit as the stack undwinds. 570 if (!more_to_process && peer_closed_) 571 sync_watcher_.reset(); 572 } 573 574 void EnsureSyncWatcherExists() { 575 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 576 if (sync_watcher_) 577 return; 578 579 base::AutoLock locker(controller_->lock_); 580 sync_watcher_ = std::make_unique<mojo::SequenceLocalSyncEventWatcher>( 581 base::BindRepeating(&Endpoint::OnSyncMessageEventReady, 582 base::Unretained(this))); 583 if (peer_closed_ || !sync_messages_.empty()) 584 SignalSyncMessageEvent(); 585 } 586 587 uint32_t GenerateSyncMessageId() { 588 // Overflow is fine. 589 uint32_t id = next_sync_message_id_++; 590 DCHECK(sync_messages_.empty() || sync_messages_.front().first != id); 591 return id; 592 } 593 594 ChannelAssociatedGroupController* const controller_; 595 const mojo::InterfaceId id_; 596 597 bool closed_ = false; 598 bool peer_closed_ = false; 599 bool handle_created_ = false; 600 base::Optional<mojo::DisconnectReason> disconnect_reason_; 601 mojo::InterfaceEndpointClient* client_ = nullptr; 602 scoped_refptr<base::SequencedTaskRunner> task_runner_; 603 std::unique_ptr<mojo::SequenceLocalSyncEventWatcher> sync_watcher_; 604 base::queue<std::pair<uint32_t, MessageWrapper>> sync_messages_; 605 uint32_t next_sync_message_id_ = 0; 606 607 DISALLOW_COPY_AND_ASSIGN(Endpoint); 608 }; 609 610 class ControlMessageProxyThunk : public MessageReceiver { 611 public: 612 explicit ControlMessageProxyThunk( 613 ChannelAssociatedGroupController* controller) 614 : controller_(controller) {} 615 616 private: 617 // MessageReceiver: 618 bool Accept(mojo::Message* message) override { 619 return controller_->SendMessage(message); 620 } 621 622 ChannelAssociatedGroupController* controller_; 623 624 DISALLOW_COPY_AND_ASSIGN(ControlMessageProxyThunk); 625 }; 626 627 ~ChannelAssociatedGroupController() override { 628 DCHECK(!connector_); 629 630 base::AutoLock locker(lock_); 631 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { 632 Endpoint* endpoint = iter->second.get(); 633 ++iter; 634 635 if (!endpoint->closed()) { 636 // This happens when a NotifyPeerEndpointClosed message been received, 637 // but the interface ID hasn't been used to create local endpoint 638 // handle. 639 DCHECK(!endpoint->client()); 640 DCHECK(endpoint->peer_closed()); 641 MarkClosedAndMaybeRemove(endpoint); 642 } else { 643 MarkPeerClosedAndMaybeRemove(endpoint); 644 } 645 } 646 647 DCHECK(endpoints_.empty()); 648 649 GetMemoryDumpProvider().RemoveController(this); 650 } 651 652 bool SendMessage(mojo::Message* message) { 653 if (task_runner_->BelongsToCurrentThread()) { 654 DCHECK(thread_checker_.CalledOnValidThread()); 655 if (!connector_ || paused_) { 656 if (!shut_down_) { 657 base::AutoLock lock(outgoing_messages_lock_); 658 outgoing_messages_.emplace_back(std::move(*message)); 659 } 660 return true; 661 } 662 return connector_->Accept(message); 663 } else { 664 // Do a message size check here so we don't lose valuable stack 665 // information to the task scheduler. 666 CHECK_LE(message->data_num_bytes(), Channel::kMaximumMessageSize); 667 668 // We always post tasks to the master endpoint thread when called from 669 // other threads in order to simulate IPC::ChannelProxy::Send behavior. 670 task_runner_->PostTask( 671 FROM_HERE, 672 base::Bind( 673 &ChannelAssociatedGroupController::SendMessageOnMasterThread, 674 this, base::Passed(message))); 675 return true; 676 } 677 } 678 679 void SendMessageOnMasterThread(mojo::Message message) { 680 DCHECK(thread_checker_.CalledOnValidThread()); 681 if (!SendMessage(&message)) 682 RaiseError(); 683 } 684 685 void OnPipeError() { 686 DCHECK(thread_checker_.CalledOnValidThread()); 687 688 // We keep |this| alive here because it's possible for the notifications 689 // below to release all other references. 690 scoped_refptr<ChannelAssociatedGroupController> keepalive(this); 691 692 base::AutoLock locker(lock_); 693 encountered_error_ = true; 694 695 std::vector<scoped_refptr<Endpoint>> endpoints_to_notify; 696 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { 697 Endpoint* endpoint = iter->second.get(); 698 ++iter; 699 700 if (endpoint->client()) 701 endpoints_to_notify.push_back(endpoint); 702 703 MarkPeerClosedAndMaybeRemove(endpoint); 704 } 705 706 for (auto& endpoint : endpoints_to_notify) { 707 // Because a notification may in turn detach any endpoint, we have to 708 // check each client again here. 709 if (endpoint->client()) 710 NotifyEndpointOfError(endpoint.get(), false /* force_async */); 711 } 712 } 713 714 void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) { 715 lock_.AssertAcquired(); 716 DCHECK(endpoint->task_runner() && endpoint->client()); 717 if (endpoint->task_runner()->RunsTasksInCurrentSequence() && !force_async) { 718 mojo::InterfaceEndpointClient* client = endpoint->client(); 719 base::Optional<mojo::DisconnectReason> reason( 720 endpoint->disconnect_reason()); 721 722 base::AutoUnlock unlocker(lock_); 723 client->NotifyError(reason); 724 } else { 725 endpoint->task_runner()->PostTask( 726 FROM_HERE, 727 base::Bind(&ChannelAssociatedGroupController:: 728 NotifyEndpointOfErrorOnEndpointThread, 729 this, endpoint->id(), base::Unretained(endpoint))); 730 } 731 } 732 733 void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id, 734 Endpoint* endpoint) { 735 base::AutoLock locker(lock_); 736 auto iter = endpoints_.find(id); 737 if (iter == endpoints_.end() || iter->second.get() != endpoint) 738 return; 739 if (!endpoint->client()) 740 return; 741 742 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence()); 743 NotifyEndpointOfError(endpoint, false /* force_async */); 744 } 745 746 void MarkClosedAndMaybeRemove(Endpoint* endpoint) { 747 lock_.AssertAcquired(); 748 endpoint->set_closed(); 749 if (endpoint->closed() && endpoint->peer_closed()) 750 endpoints_.erase(endpoint->id()); 751 } 752 753 void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) { 754 lock_.AssertAcquired(); 755 endpoint->set_peer_closed(); 756 endpoint->SignalSyncMessageEvent(); 757 if (endpoint->closed() && endpoint->peer_closed()) 758 endpoints_.erase(endpoint->id()); 759 } 760 761 Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) { 762 lock_.AssertAcquired(); 763 DCHECK(!inserted || !*inserted); 764 765 Endpoint* endpoint = FindEndpoint(id); 766 if (!endpoint) { 767 endpoint = new Endpoint(this, id); 768 endpoints_.insert({id, endpoint}); 769 if (inserted) 770 *inserted = true; 771 } 772 return endpoint; 773 } 774 775 Endpoint* FindEndpoint(mojo::InterfaceId id) { 776 lock_.AssertAcquired(); 777 auto iter = endpoints_.find(id); 778 return iter != endpoints_.end() ? iter->second.get() : nullptr; 779 } 780 781 // mojo::MessageReceiver: 782 bool Accept(mojo::Message* message) override { 783 DCHECK(thread_checker_.CalledOnValidThread()); 784 785 if (!message->DeserializeAssociatedEndpointHandles(this)) 786 return false; 787 788 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) 789 return control_message_handler_.Accept(message); 790 791 mojo::InterfaceId id = message->interface_id(); 792 DCHECK(mojo::IsValidInterfaceId(id)); 793 794 base::AutoLock locker(lock_); 795 Endpoint* endpoint = FindEndpoint(id); 796 if (!endpoint) 797 return true; 798 799 mojo::InterfaceEndpointClient* client = endpoint->client(); 800 if (!client || !endpoint->task_runner()->RunsTasksInCurrentSequence()) { 801 // No client has been bound yet or the client runs tasks on another 802 // thread. We assume the other thread must always be the one on which 803 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. 804 // 805 // If the client is not yet bound, it must be bound by the time this task 806 // runs or else it's programmer error. 807 DCHECK(proxy_task_runner_); 808 809 if (message->has_flag(mojo::Message::kFlagIsSync)) { 810 MessageWrapper message_wrapper(this, std::move(*message)); 811 // Sync messages may need to be handled by the endpoint if it's blocking 812 // on a sync reply. We pass ownership of the message to the endpoint's 813 // sync message queue. If the endpoint was blocking, it will dequeue the 814 // message and dispatch it. Otherwise the posted |AcceptSyncMessage()| 815 // call will dequeue the message and dispatch it. 816 uint32_t message_id = 817 endpoint->EnqueueSyncMessage(std::move(message_wrapper)); 818 proxy_task_runner_->PostTask( 819 FROM_HERE, 820 base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage, 821 this, id, message_id)); 822 return true; 823 } 824 825 proxy_task_runner_->PostTask( 826 FROM_HERE, 827 base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread, 828 this, base::Passed(message))); 829 return true; 830 } 831 832 // We do not expect to receive sync responses on the master endpoint thread. 833 // If it's happening, it's a bug. 834 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) || 835 !message->has_flag(mojo::Message::kFlagIsResponse)); 836 837 base::AutoUnlock unlocker(lock_); 838 return client->HandleIncomingMessage(message); 839 } 840 841 void AcceptOnProxyThread(mojo::Message message) { 842 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); 843 844 mojo::InterfaceId id = message.interface_id(); 845 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); 846 847 base::AutoLock locker(lock_); 848 Endpoint* endpoint = FindEndpoint(id); 849 if (!endpoint) 850 return; 851 852 mojo::InterfaceEndpointClient* client = endpoint->client(); 853 if (!client) 854 return; 855 856 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence()); 857 858 // Sync messages should never make their way to this method. 859 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync)); 860 861 bool result = false; 862 { 863 base::AutoUnlock unlocker(lock_); 864 result = client->HandleIncomingMessage(&message); 865 } 866 867 if (!result) 868 RaiseError(); 869 } 870 871 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { 872 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); 873 874 base::AutoLock locker(lock_); 875 Endpoint* endpoint = FindEndpoint(interface_id); 876 if (!endpoint) 877 return; 878 879 // Careful, if the endpoint is detached its members are cleared. Check for 880 // that before dereferencing. 881 mojo::InterfaceEndpointClient* client = endpoint->client(); 882 if (!client) 883 return; 884 885 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence()); 886 MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id); 887 888 // The message must have already been dequeued by the endpoint waking up 889 // from a sync wait. Nothing to do. 890 if (message_wrapper.value().IsNull()) 891 return; 892 893 bool result = false; 894 { 895 base::AutoUnlock unlocker(lock_); 896 result = client->HandleIncomingMessage(&message_wrapper.value()); 897 } 898 899 if (!result) 900 RaiseError(); 901 } 902 903 // mojo::PipeControlMessageHandlerDelegate: 904 bool OnPeerAssociatedEndpointClosed( 905 mojo::InterfaceId id, 906 const base::Optional<mojo::DisconnectReason>& reason) override { 907 DCHECK(thread_checker_.CalledOnValidThread()); 908 909 scoped_refptr<ChannelAssociatedGroupController> keepalive(this); 910 base::AutoLock locker(lock_); 911 scoped_refptr<Endpoint> endpoint = FindOrInsertEndpoint(id, nullptr); 912 if (reason) 913 endpoint->set_disconnect_reason(reason); 914 if (!endpoint->peer_closed()) { 915 if (endpoint->client()) 916 NotifyEndpointOfError(endpoint.get(), false /* force_async */); 917 MarkPeerClosedAndMaybeRemove(endpoint.get()); 918 } 919 920 return true; 921 } 922 923 // Checked in places which must be run on the master endpoint's thread. 924 base::ThreadChecker thread_checker_; 925 926 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 927 928 scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_; 929 const bool set_interface_id_namespace_bit_; 930 bool paused_ = false; 931 std::unique_ptr<mojo::Connector> connector_; 932 mojo::FilterChain filters_; 933 mojo::PipeControlMessageHandler control_message_handler_; 934 ControlMessageProxyThunk control_message_proxy_thunk_; 935 936 // NOTE: It is unsafe to call into this object while holding |lock_|. 937 mojo::PipeControlMessageProxy control_message_proxy_; 938 939 // Guards access to |outgoing_messages_| only. Used to support memory dumps 940 // which may be triggered from any thread. 941 base::Lock outgoing_messages_lock_; 942 943 // Outgoing messages that were sent before this controller was bound to a 944 // real message pipe. 945 std::vector<mojo::Message> outgoing_messages_; 946 947 // Guards the fields below for thread-safe access. 948 base::Lock lock_; 949 950 bool encountered_error_ = false; 951 bool shut_down_ = false; 952 953 // ID #1 is reserved for the mojom::Channel interface. 954 uint32_t next_interface_id_ = 2; 955 956 std::map<uint32_t, scoped_refptr<Endpoint>> endpoints_; 957 958 DISALLOW_COPY_AND_ASSIGN(ChannelAssociatedGroupController); 959 }; 960 961 bool ControllerMemoryDumpProvider::OnMemoryDump( 962 const base::trace_event::MemoryDumpArgs& args, 963 base::trace_event::ProcessMemoryDump* pmd) { 964 base::AutoLock lock(lock_); 965 for (auto* controller : controllers_) { 966 base::trace_event::MemoryAllocatorDump* dump = pmd->CreateAllocatorDump( 967 base::StringPrintf("mojo/queued_ipc_channel_message/0x%" PRIxPTR, 968 reinterpret_cast<uintptr_t>(controller))); 969 dump->AddScalar(base::trace_event::MemoryAllocatorDump::kNameObjectCount, 970 base::trace_event::MemoryAllocatorDump::kUnitsObjects, 971 controller->GetQueuedMessageCount()); 972 } 973 974 return true; 975 } 976 977 class MojoBootstrapImpl : public MojoBootstrap { 978 public: 979 MojoBootstrapImpl( 980 mojo::ScopedMessagePipeHandle handle, 981 const scoped_refptr<ChannelAssociatedGroupController> controller) 982 : controller_(controller), 983 associated_group_(controller), 984 handle_(std::move(handle)) {} 985 986 ~MojoBootstrapImpl() override { 987 controller_->ShutDown(); 988 } 989 990 private: 991 void Connect(mojom::ChannelAssociatedPtr* sender, 992 mojom::ChannelAssociatedRequest* receiver) override { 993 controller_->Bind(std::move(handle_)); 994 controller_->CreateChannelEndpoints(sender, receiver); 995 } 996 997 void Pause() override { 998 controller_->Pause(); 999 } 1000 1001 void Unpause() override { 1002 controller_->Unpause(); 1003 } 1004 1005 void Flush() override { 1006 controller_->FlushOutgoingMessages(); 1007 } 1008 1009 mojo::AssociatedGroup* GetAssociatedGroup() override { 1010 return &associated_group_; 1011 } 1012 1013 scoped_refptr<ChannelAssociatedGroupController> controller_; 1014 mojo::AssociatedGroup associated_group_; 1015 1016 mojo::ScopedMessagePipeHandle handle_; 1017 1018 DISALLOW_COPY_AND_ASSIGN(MojoBootstrapImpl); 1019 }; 1020 1021 } // namespace 1022 1023 // static 1024 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create( 1025 mojo::ScopedMessagePipeHandle handle, 1026 Channel::Mode mode, 1027 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 1028 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) { 1029 return std::make_unique<MojoBootstrapImpl>( 1030 std::move(handle), 1031 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, 1032 ipc_task_runner, proxy_task_runner)); 1033 } 1034 1035 } // namespace IPC 1036