1 // Copyright 2015 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" 6 7 #include <stdint.h> 8 9 #include <utility> 10 11 #include "base/bind.h" 12 #include "base/location.h" 13 #include "base/macros.h" 14 #include "base/memory/ptr_util.h" 15 #include "base/single_thread_task_runner.h" 16 #include "base/stl_util.h" 17 #include "base/threading/thread_task_runner_handle.h" 18 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" 19 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" 20 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h" 21 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" 22 23 namespace mojo { 24 namespace internal { 25 26 // InterfaceEndpoint stores the information of an interface endpoint registered 27 // with the router. 28 // No one other than the router's |endpoints_| and |tasks_| should hold refs to 29 // this object. 30 class MultiplexRouter::InterfaceEndpoint 31 : public base::RefCounted<InterfaceEndpoint>, 32 public InterfaceEndpointController { 33 public: 34 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) 35 : router_(router), 36 id_(id), 37 closed_(false), 38 peer_closed_(false), 39 handle_created_(false), 40 client_(nullptr), 41 event_signalled_(false) {} 42 43 // --------------------------------------------------------------------------- 44 // The following public methods are safe to call from any threads without 45 // locking. 46 47 InterfaceId id() const { return id_; } 48 49 // --------------------------------------------------------------------------- 50 // The following public methods are called under the router's lock. 51 52 bool closed() const { return closed_; } 53 void set_closed() { 54 router_->AssertLockAcquired(); 55 closed_ = true; 56 } 57 58 bool peer_closed() const { return peer_closed_; } 59 void set_peer_closed() { 60 router_->AssertLockAcquired(); 61 peer_closed_ = true; 62 } 63 64 bool handle_created() const { return handle_created_; } 65 void set_handle_created() { 66 router_->AssertLockAcquired(); 67 handle_created_ = true; 68 } 69 70 const base::Optional<DisconnectReason>& disconnect_reason() const { 71 return disconnect_reason_; 72 } 73 void set_disconnect_reason( 74 const base::Optional<DisconnectReason>& disconnect_reason) { 75 router_->AssertLockAcquired(); 76 disconnect_reason_ = disconnect_reason; 77 } 78 79 base::SingleThreadTaskRunner* task_runner() const { 80 return task_runner_.get(); 81 } 82 83 InterfaceEndpointClient* client() const { return client_; } 84 85 void AttachClient(InterfaceEndpointClient* client, 86 scoped_refptr<base::SingleThreadTaskRunner> runner) { 87 router_->AssertLockAcquired(); 88 DCHECK(!client_); 89 DCHECK(!closed_); 90 DCHECK(runner->BelongsToCurrentThread()); 91 92 task_runner_ = std::move(runner); 93 client_ = client; 94 } 95 96 // This method must be called on the same thread as the corresponding 97 // AttachClient() call. 98 void DetachClient() { 99 router_->AssertLockAcquired(); 100 DCHECK(client_); 101 DCHECK(task_runner_->BelongsToCurrentThread()); 102 DCHECK(!closed_); 103 104 task_runner_ = nullptr; 105 client_ = nullptr; 106 sync_watcher_.reset(); 107 } 108 109 void SignalSyncMessageEvent() { 110 router_->AssertLockAcquired(); 111 if (event_signalled_) 112 return; 113 114 event_signalled_ = true; 115 if (!sync_message_event_sender_.is_valid()) 116 return; 117 118 MojoResult result = 119 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr, 120 0, MOJO_WRITE_MESSAGE_FLAG_NONE); 121 DCHECK_EQ(MOJO_RESULT_OK, result); 122 } 123 124 void ResetSyncMessageSignal() { 125 router_->AssertLockAcquired(); 126 127 if (!event_signalled_) 128 return; 129 130 event_signalled_ = false; 131 if (!sync_message_event_receiver_.is_valid()) 132 return; 133 134 MojoResult result = 135 ReadMessageRaw(sync_message_event_receiver_.get(), nullptr, nullptr, 136 nullptr, nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); 137 DCHECK_EQ(MOJO_RESULT_OK, result); 138 } 139 140 // --------------------------------------------------------------------------- 141 // The following public methods (i.e., InterfaceEndpointController 142 // implementation) are called by the client on the same thread as the 143 // AttachClient() call. They are called outside of the router's lock. 144 145 bool SendMessage(Message* message) override { 146 DCHECK(task_runner_->BelongsToCurrentThread()); 147 message->set_interface_id(id_); 148 return router_->connector_.Accept(message); 149 } 150 151 void AllowWokenUpBySyncWatchOnSameThread() override { 152 DCHECK(task_runner_->BelongsToCurrentThread()); 153 154 EnsureSyncWatcherExists(); 155 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); 156 } 157 158 bool SyncWatch(const bool* should_stop) override { 159 DCHECK(task_runner_->BelongsToCurrentThread()); 160 161 EnsureSyncWatcherExists(); 162 return sync_watcher_->SyncWatch(should_stop); 163 } 164 165 private: 166 friend class base::RefCounted<InterfaceEndpoint>; 167 168 ~InterfaceEndpoint() override { 169 router_->AssertLockAcquired(); 170 171 DCHECK(!client_); 172 DCHECK(closed_); 173 DCHECK(peer_closed_); 174 DCHECK(!sync_watcher_); 175 } 176 177 void OnHandleReady(MojoResult result) { 178 DCHECK(task_runner_->BelongsToCurrentThread()); 179 scoped_refptr<MultiplexRouter> router_protector(router_); 180 181 // Because we never close |sync_message_event_{sender,receiver}_| before 182 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. 183 DCHECK_EQ(MOJO_RESULT_OK, result); 184 185 MayAutoLock locker(&router_->lock_); 186 scoped_refptr<InterfaceEndpoint> self_protector(this); 187 188 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); 189 190 if (!more_to_process) 191 ResetSyncMessageSignal(); 192 193 // Currently there are no queued sync messages and the peer has closed so 194 // there won't be incoming sync messages in the future. 195 if (!more_to_process && peer_closed_) { 196 // If a SyncWatch() call (or multiple ones) of this interface endpoint is 197 // on the call stack, resetting the sync watcher will allow it to exit 198 // when the call stack unwinds to that frame. 199 sync_watcher_.reset(); 200 } 201 } 202 203 void EnsureSyncWatcherExists() { 204 DCHECK(task_runner_->BelongsToCurrentThread()); 205 if (sync_watcher_) 206 return; 207 208 { 209 MayAutoLock locker(&router_->lock_); 210 211 if (!sync_message_event_sender_.is_valid()) { 212 MojoResult result = 213 CreateMessagePipe(nullptr, &sync_message_event_sender_, 214 &sync_message_event_receiver_); 215 DCHECK_EQ(MOJO_RESULT_OK, result); 216 217 if (event_signalled_) { 218 // Reset the flag so that SignalSyncMessageEvent() will actually 219 // signal using the newly-created message pipe. 220 event_signalled_ = false; 221 SignalSyncMessageEvent(); 222 } 223 } 224 } 225 226 sync_watcher_.reset(new SyncHandleWatcher( 227 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE, 228 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this)))); 229 } 230 231 // --------------------------------------------------------------------------- 232 // The following members are safe to access from any threads. 233 234 MultiplexRouter* const router_; 235 const InterfaceId id_; 236 237 // --------------------------------------------------------------------------- 238 // The following members are accessed under the router's lock. 239 240 // Whether the endpoint has been closed. 241 bool closed_; 242 // Whether the peer endpoint has been closed. 243 bool peer_closed_; 244 245 // Whether there is already a ScopedInterfaceEndpointHandle created for this 246 // endpoint. 247 bool handle_created_; 248 249 base::Optional<DisconnectReason> disconnect_reason_; 250 251 // The task runner on which |client_|'s methods can be called. 252 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 253 // Not owned. It is null if no client is attached to this endpoint. 254 InterfaceEndpointClient* client_; 255 256 // A message pipe used as an event to signal that sync messages are available. 257 // The message pipe handles are initialized under the router's lock and remain 258 // unchanged afterwards. They may be accessed outside of the router's lock 259 // later. 260 ScopedMessagePipeHandle sync_message_event_sender_; 261 ScopedMessagePipeHandle sync_message_event_receiver_; 262 bool event_signalled_; 263 264 // --------------------------------------------------------------------------- 265 // The following members are only valid while a client is attached. They are 266 // used exclusively on the client's thread. They may be accessed outside of 267 // the router's lock. 268 269 std::unique_ptr<SyncHandleWatcher> sync_watcher_; 270 271 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); 272 }; 273 274 // MessageWrapper objects are always destroyed under the router's lock. On 275 // destruction, if the message it wrappers contains 276 // ScopedInterfaceEndpointHandles (which cannot be destructed under the 277 // router's lock), the wrapper unlocks to clean them up. 278 class MultiplexRouter::MessageWrapper { 279 public: 280 MessageWrapper() = default; 281 282 MessageWrapper(MultiplexRouter* router, Message message) 283 : router_(router), value_(std::move(message)) {} 284 285 MessageWrapper(MessageWrapper&& other) 286 : router_(other.router_), value_(std::move(other.value_)) {} 287 288 ~MessageWrapper() { 289 if (value_.associated_endpoint_handles()->empty()) 290 return; 291 292 router_->AssertLockAcquired(); 293 { 294 MayAutoUnlock unlocker(&router_->lock_); 295 value_.mutable_associated_endpoint_handles()->clear(); 296 } 297 } 298 299 MessageWrapper& operator=(MessageWrapper&& other) { 300 router_ = other.router_; 301 value_ = std::move(other.value_); 302 return *this; 303 } 304 305 Message& value() { return value_; } 306 307 private: 308 MultiplexRouter* router_ = nullptr; 309 Message value_; 310 311 DISALLOW_COPY_AND_ASSIGN(MessageWrapper); 312 }; 313 314 struct MultiplexRouter::Task { 315 public: 316 // Doesn't take ownership of |message| but takes its contents. 317 static std::unique_ptr<Task> CreateMessageTask( 318 MessageWrapper message_wrapper) { 319 Task* task = new Task(MESSAGE); 320 task->message_wrapper = std::move(message_wrapper); 321 return base::WrapUnique(task); 322 } 323 static std::unique_ptr<Task> CreateNotifyErrorTask( 324 InterfaceEndpoint* endpoint) { 325 Task* task = new Task(NOTIFY_ERROR); 326 task->endpoint_to_notify = endpoint; 327 return base::WrapUnique(task); 328 } 329 330 ~Task() {} 331 332 bool IsMessageTask() const { return type == MESSAGE; } 333 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; } 334 335 MessageWrapper message_wrapper; 336 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; 337 338 enum Type { MESSAGE, NOTIFY_ERROR }; 339 Type type; 340 341 private: 342 explicit Task(Type in_type) : type(in_type) {} 343 344 DISALLOW_COPY_AND_ASSIGN(Task); 345 }; 346 347 MultiplexRouter::MultiplexRouter( 348 ScopedMessagePipeHandle message_pipe, 349 Config config, 350 bool set_interface_id_namesapce_bit, 351 scoped_refptr<base::SingleThreadTaskRunner> runner) 352 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), 353 task_runner_(runner), 354 header_validator_(nullptr), 355 filters_(this), 356 connector_(std::move(message_pipe), 357 config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND 358 : Connector::SINGLE_THREADED_SEND, 359 std::move(runner)), 360 control_message_handler_(this), 361 control_message_proxy_(&connector_), 362 next_interface_id_value_(1), 363 posted_to_process_tasks_(false), 364 encountered_error_(false), 365 paused_(false), 366 testing_mode_(false) { 367 DCHECK(task_runner_->BelongsToCurrentThread()); 368 369 if (config == MULTI_INTERFACE) 370 lock_.emplace(); 371 372 if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS || 373 config == MULTI_INTERFACE) { 374 // Always participate in sync handle watching in multi-interface mode, 375 // because even if it doesn't expect sync requests during sync handle 376 // watching, it may still need to dispatch messages to associated endpoints 377 // on a different thread. 378 connector_.AllowWokenUpBySyncWatchOnSameThread(); 379 } 380 connector_.set_incoming_receiver(&filters_); 381 connector_.set_connection_error_handler( 382 base::Bind(&MultiplexRouter::OnPipeConnectionError, 383 base::Unretained(this))); 384 385 std::unique_ptr<MessageHeaderValidator> header_validator = 386 base::MakeUnique<MessageHeaderValidator>(); 387 header_validator_ = header_validator.get(); 388 filters_.Append(std::move(header_validator)); 389 } 390 391 MultiplexRouter::~MultiplexRouter() { 392 MayAutoLock locker(&lock_); 393 394 sync_message_tasks_.clear(); 395 tasks_.clear(); 396 397 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { 398 InterfaceEndpoint* endpoint = iter->second.get(); 399 // Increment the iterator before calling UpdateEndpointStateMayRemove() 400 // because it may remove the corresponding value from the map. 401 ++iter; 402 403 if (!endpoint->closed()) { 404 // This happens when a NotifyPeerEndpointClosed message been received, but 405 // the interface ID hasn't been used to create local endpoint handle. 406 DCHECK(!endpoint->client()); 407 DCHECK(endpoint->peer_closed()); 408 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 409 } else { 410 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 411 } 412 } 413 414 DCHECK(endpoints_.empty()); 415 } 416 417 void MultiplexRouter::SetMasterInterfaceName(const char* name) { 418 DCHECK(thread_checker_.CalledOnValidThread()); 419 header_validator_->SetDescription( 420 std::string(name) + " [master] MessageHeaderValidator"); 421 control_message_handler_.SetDescription( 422 std::string(name) + " [master] PipeControlMessageHandler"); 423 connector_.SetWatcherHeapProfilerTag(name); 424 } 425 426 InterfaceId MultiplexRouter::AssociateInterface( 427 ScopedInterfaceEndpointHandle handle_to_send) { 428 if (!handle_to_send.pending_association()) 429 return kInvalidInterfaceId; 430 431 uint32_t id = 0; 432 { 433 MayAutoLock locker(&lock_); 434 do { 435 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) 436 next_interface_id_value_ = 1; 437 id = next_interface_id_value_++; 438 if (set_interface_id_namespace_bit_) 439 id |= kInterfaceIdNamespaceMask; 440 } while (base::ContainsKey(endpoints_, id)); 441 442 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); 443 endpoints_[id] = endpoint; 444 if (encountered_error_) 445 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 446 endpoint->set_handle_created(); 447 } 448 449 if (!NotifyAssociation(&handle_to_send, id)) { 450 // The peer handle of |handle_to_send|, which is supposed to join this 451 // associated group, has been closed. 452 { 453 MayAutoLock locker(&lock_); 454 InterfaceEndpoint* endpoint = FindEndpoint(id); 455 if (endpoint) 456 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 457 } 458 459 control_message_proxy_.NotifyPeerEndpointClosed( 460 id, handle_to_send.disconnect_reason()); 461 } 462 return id; 463 } 464 465 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( 466 InterfaceId id) { 467 if (!IsValidInterfaceId(id)) 468 return ScopedInterfaceEndpointHandle(); 469 470 MayAutoLock locker(&lock_); 471 bool inserted = false; 472 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); 473 if (inserted) { 474 DCHECK(!endpoint->handle_created()); 475 476 if (encountered_error_) 477 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 478 } else { 479 // If the endpoint already exist, it is because we have received a 480 // notification that the peer endpoint has closed. 481 CHECK(!endpoint->closed()); 482 CHECK(endpoint->peer_closed()); 483 484 if (endpoint->handle_created()) 485 return ScopedInterfaceEndpointHandle(); 486 } 487 488 endpoint->set_handle_created(); 489 return CreateScopedInterfaceEndpointHandle(id); 490 } 491 492 void MultiplexRouter::CloseEndpointHandle( 493 InterfaceId id, 494 const base::Optional<DisconnectReason>& reason) { 495 if (!IsValidInterfaceId(id)) 496 return; 497 498 MayAutoLock locker(&lock_); 499 DCHECK(base::ContainsKey(endpoints_, id)); 500 InterfaceEndpoint* endpoint = endpoints_[id].get(); 501 DCHECK(!endpoint->client()); 502 DCHECK(!endpoint->closed()); 503 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 504 505 if (!IsMasterInterfaceId(id) || reason) { 506 MayAutoUnlock unlocker(&lock_); 507 control_message_proxy_.NotifyPeerEndpointClosed(id, reason); 508 } 509 510 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); 511 } 512 513 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( 514 const ScopedInterfaceEndpointHandle& handle, 515 InterfaceEndpointClient* client, 516 scoped_refptr<base::SingleThreadTaskRunner> runner) { 517 const InterfaceId id = handle.id(); 518 519 DCHECK(IsValidInterfaceId(id)); 520 DCHECK(client); 521 522 MayAutoLock locker(&lock_); 523 DCHECK(base::ContainsKey(endpoints_, id)); 524 525 InterfaceEndpoint* endpoint = endpoints_[id].get(); 526 endpoint->AttachClient(client, std::move(runner)); 527 528 if (endpoint->peer_closed()) 529 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 530 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); 531 532 return endpoint; 533 } 534 535 void MultiplexRouter::DetachEndpointClient( 536 const ScopedInterfaceEndpointHandle& handle) { 537 const InterfaceId id = handle.id(); 538 539 DCHECK(IsValidInterfaceId(id)); 540 541 MayAutoLock locker(&lock_); 542 DCHECK(base::ContainsKey(endpoints_, id)); 543 544 InterfaceEndpoint* endpoint = endpoints_[id].get(); 545 endpoint->DetachClient(); 546 } 547 548 void MultiplexRouter::RaiseError() { 549 if (task_runner_->BelongsToCurrentThread()) { 550 connector_.RaiseError(); 551 } else { 552 task_runner_->PostTask(FROM_HERE, 553 base::Bind(&MultiplexRouter::RaiseError, this)); 554 } 555 } 556 557 void MultiplexRouter::CloseMessagePipe() { 558 DCHECK(thread_checker_.CalledOnValidThread()); 559 connector_.CloseMessagePipe(); 560 // CloseMessagePipe() above won't trigger connection error handler. 561 // Explicitly call OnPipeConnectionError() so that associated endpoints will 562 // get notified. 563 OnPipeConnectionError(); 564 } 565 566 void MultiplexRouter::PauseIncomingMethodCallProcessing() { 567 DCHECK(thread_checker_.CalledOnValidThread()); 568 connector_.PauseIncomingMethodCallProcessing(); 569 570 MayAutoLock locker(&lock_); 571 paused_ = true; 572 573 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) 574 iter->second->ResetSyncMessageSignal(); 575 } 576 577 void MultiplexRouter::ResumeIncomingMethodCallProcessing() { 578 DCHECK(thread_checker_.CalledOnValidThread()); 579 connector_.ResumeIncomingMethodCallProcessing(); 580 581 MayAutoLock locker(&lock_); 582 paused_ = false; 583 584 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { 585 auto sync_iter = sync_message_tasks_.find(iter->first); 586 if (iter->second->peer_closed() || 587 (sync_iter != sync_message_tasks_.end() && 588 !sync_iter->second.empty())) { 589 iter->second->SignalSyncMessageEvent(); 590 } 591 } 592 593 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); 594 } 595 596 bool MultiplexRouter::HasAssociatedEndpoints() const { 597 DCHECK(thread_checker_.CalledOnValidThread()); 598 MayAutoLock locker(&lock_); 599 600 if (endpoints_.size() > 1) 601 return true; 602 if (endpoints_.size() == 0) 603 return false; 604 605 return !base::ContainsKey(endpoints_, kMasterInterfaceId); 606 } 607 608 void MultiplexRouter::EnableTestingMode() { 609 DCHECK(thread_checker_.CalledOnValidThread()); 610 MayAutoLock locker(&lock_); 611 612 testing_mode_ = true; 613 connector_.set_enforce_errors_from_incoming_receiver(false); 614 } 615 616 bool MultiplexRouter::Accept(Message* message) { 617 DCHECK(thread_checker_.CalledOnValidThread()); 618 619 if (!message->DeserializeAssociatedEndpointHandles(this)) 620 return false; 621 622 scoped_refptr<MultiplexRouter> protector(this); 623 MayAutoLock locker(&lock_); 624 625 DCHECK(!paused_); 626 627 ClientCallBehavior client_call_behavior = 628 connector_.during_sync_handle_watcher_callback() 629 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES 630 : ALLOW_DIRECT_CLIENT_CALLS; 631 632 bool processed = 633 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, 634 connector_.task_runner()); 635 636 if (!processed) { 637 // Either the task queue is not empty or we cannot process the message 638 // directly. In both cases, there is no need to call ProcessTasks(). 639 tasks_.push_back( 640 Task::CreateMessageTask(MessageWrapper(this, std::move(*message)))); 641 Task* task = tasks_.back().get(); 642 643 if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) { 644 InterfaceId id = task->message_wrapper.value().interface_id(); 645 sync_message_tasks_[id].push_back(task); 646 InterfaceEndpoint* endpoint = FindEndpoint(id); 647 if (endpoint) 648 endpoint->SignalSyncMessageEvent(); 649 } 650 } else if (!tasks_.empty()) { 651 // Processing the message may result in new tasks (for error notification) 652 // being added to the queue. In this case, we have to attempt to process the 653 // tasks. 654 ProcessTasks(client_call_behavior, connector_.task_runner()); 655 } 656 657 // Always return true. If we see errors during message processing, we will 658 // explicitly call Connector::RaiseError() to disconnect the message pipe. 659 return true; 660 } 661 662 bool MultiplexRouter::OnPeerAssociatedEndpointClosed( 663 InterfaceId id, 664 const base::Optional<DisconnectReason>& reason) { 665 DCHECK(!IsMasterInterfaceId(id) || reason); 666 667 MayAutoLock locker(&lock_); 668 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); 669 670 if (reason) 671 endpoint->set_disconnect_reason(reason); 672 673 // It is possible that this endpoint has been set as peer closed. That is 674 // because when the message pipe is closed, all the endpoints are updated with 675 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue, 676 // as long as there are refs keeping the router alive. If there is a 677 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get 678 // here and see that the endpoint has been marked as peer closed. 679 if (!endpoint->peer_closed()) { 680 if (endpoint->client()) 681 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 682 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 683 } 684 685 // No need to trigger a ProcessTasks() because it is already on the stack. 686 687 return true; 688 } 689 690 void MultiplexRouter::OnPipeConnectionError() { 691 DCHECK(thread_checker_.CalledOnValidThread()); 692 693 scoped_refptr<MultiplexRouter> protector(this); 694 MayAutoLock locker(&lock_); 695 696 encountered_error_ = true; 697 698 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { 699 InterfaceEndpoint* endpoint = iter->second.get(); 700 // Increment the iterator before calling UpdateEndpointStateMayRemove() 701 // because it may remove the corresponding value from the map. 702 ++iter; 703 704 if (endpoint->client()) 705 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 706 707 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 708 } 709 710 ProcessTasks(connector_.during_sync_handle_watcher_callback() 711 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES 712 : ALLOW_DIRECT_CLIENT_CALLS, 713 connector_.task_runner()); 714 } 715 716 void MultiplexRouter::ProcessTasks( 717 ClientCallBehavior client_call_behavior, 718 base::SingleThreadTaskRunner* current_task_runner) { 719 AssertLockAcquired(); 720 721 if (posted_to_process_tasks_) 722 return; 723 724 while (!tasks_.empty() && !paused_) { 725 std::unique_ptr<Task> task(std::move(tasks_.front())); 726 tasks_.pop_front(); 727 728 InterfaceId id = kInvalidInterfaceId; 729 bool sync_message = 730 task->IsMessageTask() && !task->message_wrapper.value().IsNull() && 731 task->message_wrapper.value().has_flag(Message::kFlagIsSync); 732 if (sync_message) { 733 id = task->message_wrapper.value().interface_id(); 734 auto& sync_message_queue = sync_message_tasks_[id]; 735 DCHECK_EQ(task.get(), sync_message_queue.front()); 736 sync_message_queue.pop_front(); 737 } 738 739 bool processed = 740 task->IsNotifyErrorTask() 741 ? ProcessNotifyErrorTask(task.get(), client_call_behavior, 742 current_task_runner) 743 : ProcessIncomingMessage(&task->message_wrapper.value(), 744 client_call_behavior, current_task_runner); 745 746 if (!processed) { 747 if (sync_message) { 748 auto& sync_message_queue = sync_message_tasks_[id]; 749 sync_message_queue.push_front(task.get()); 750 } 751 tasks_.push_front(std::move(task)); 752 break; 753 } else { 754 if (sync_message) { 755 auto iter = sync_message_tasks_.find(id); 756 if (iter != sync_message_tasks_.end() && iter->second.empty()) 757 sync_message_tasks_.erase(iter); 758 } 759 } 760 } 761 } 762 763 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { 764 AssertLockAcquired(); 765 766 auto iter = sync_message_tasks_.find(id); 767 if (iter == sync_message_tasks_.end()) 768 return false; 769 770 if (paused_) 771 return true; 772 773 MultiplexRouter::Task* task = iter->second.front(); 774 iter->second.pop_front(); 775 776 DCHECK(task->IsMessageTask()); 777 MessageWrapper message_wrapper = std::move(task->message_wrapper); 778 779 // Note: after this call, |task| and |iter| may be invalidated. 780 bool processed = ProcessIncomingMessage( 781 &message_wrapper.value(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, 782 nullptr); 783 DCHECK(processed); 784 785 iter = sync_message_tasks_.find(id); 786 if (iter == sync_message_tasks_.end()) 787 return false; 788 789 if (iter->second.empty()) { 790 sync_message_tasks_.erase(iter); 791 return false; 792 } 793 794 return true; 795 } 796 797 bool MultiplexRouter::ProcessNotifyErrorTask( 798 Task* task, 799 ClientCallBehavior client_call_behavior, 800 base::SingleThreadTaskRunner* current_task_runner) { 801 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); 802 DCHECK(!paused_); 803 804 AssertLockAcquired(); 805 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); 806 if (!endpoint->client()) 807 return true; 808 809 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || 810 endpoint->task_runner() != current_task_runner) { 811 MaybePostToProcessTasks(endpoint->task_runner()); 812 return false; 813 } 814 815 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); 816 817 InterfaceEndpointClient* client = endpoint->client(); 818 base::Optional<DisconnectReason> disconnect_reason( 819 endpoint->disconnect_reason()); 820 821 { 822 // We must unlock before calling into |client| because it may call this 823 // object within NotifyError(). Holding the lock will lead to deadlock. 824 // 825 // It is safe to call into |client| without the lock. Because |client| is 826 // always accessed on the same thread, including DetachEndpointClient(). 827 MayAutoUnlock unlocker(&lock_); 828 client->NotifyError(disconnect_reason); 829 } 830 return true; 831 } 832 833 bool MultiplexRouter::ProcessIncomingMessage( 834 Message* message, 835 ClientCallBehavior client_call_behavior, 836 base::SingleThreadTaskRunner* current_task_runner) { 837 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); 838 DCHECK(!paused_); 839 DCHECK(message); 840 AssertLockAcquired(); 841 842 if (message->IsNull()) { 843 // This is a sync message and has been processed during sync handle 844 // watching. 845 return true; 846 } 847 848 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { 849 bool result = false; 850 851 { 852 MayAutoUnlock unlocker(&lock_); 853 result = control_message_handler_.Accept(message); 854 } 855 856 if (!result) 857 RaiseErrorInNonTestingMode(); 858 859 return true; 860 } 861 862 InterfaceId id = message->interface_id(); 863 DCHECK(IsValidInterfaceId(id)); 864 865 InterfaceEndpoint* endpoint = FindEndpoint(id); 866 if (!endpoint || endpoint->closed()) 867 return true; 868 869 if (!endpoint->client()) { 870 // We need to wait until a client is attached in order to dispatch further 871 // messages. 872 return false; 873 } 874 875 bool can_direct_call; 876 if (message->has_flag(Message::kFlagIsSync)) { 877 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS && 878 endpoint->task_runner()->BelongsToCurrentThread(); 879 } else { 880 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS && 881 endpoint->task_runner() == current_task_runner; 882 } 883 884 if (!can_direct_call) { 885 MaybePostToProcessTasks(endpoint->task_runner()); 886 return false; 887 } 888 889 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); 890 891 InterfaceEndpointClient* client = endpoint->client(); 892 bool result = false; 893 { 894 // We must unlock before calling into |client| because it may call this 895 // object within HandleIncomingMessage(). Holding the lock will lead to 896 // deadlock. 897 // 898 // It is safe to call into |client| without the lock. Because |client| is 899 // always accessed on the same thread, including DetachEndpointClient(). 900 MayAutoUnlock unlocker(&lock_); 901 result = client->HandleIncomingMessage(message); 902 } 903 if (!result) 904 RaiseErrorInNonTestingMode(); 905 906 return true; 907 } 908 909 void MultiplexRouter::MaybePostToProcessTasks( 910 base::SingleThreadTaskRunner* task_runner) { 911 AssertLockAcquired(); 912 if (posted_to_process_tasks_) 913 return; 914 915 posted_to_process_tasks_ = true; 916 posted_to_task_runner_ = task_runner; 917 task_runner->PostTask( 918 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); 919 } 920 921 void MultiplexRouter::LockAndCallProcessTasks() { 922 // There is no need to hold a ref to this class in this case because this is 923 // always called using base::Bind(), which holds a ref. 924 MayAutoLock locker(&lock_); 925 posted_to_process_tasks_ = false; 926 scoped_refptr<base::SingleThreadTaskRunner> runner( 927 std::move(posted_to_task_runner_)); 928 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); 929 } 930 931 void MultiplexRouter::UpdateEndpointStateMayRemove( 932 InterfaceEndpoint* endpoint, 933 EndpointStateUpdateType type) { 934 if (type == ENDPOINT_CLOSED) { 935 endpoint->set_closed(); 936 } else { 937 endpoint->set_peer_closed(); 938 // If the interface endpoint is performing a sync watch, this makes sure 939 // it is notified and eventually exits the sync watch. 940 endpoint->SignalSyncMessageEvent(); 941 } 942 if (endpoint->closed() && endpoint->peer_closed()) 943 endpoints_.erase(endpoint->id()); 944 } 945 946 void MultiplexRouter::RaiseErrorInNonTestingMode() { 947 AssertLockAcquired(); 948 if (!testing_mode_) 949 RaiseError(); 950 } 951 952 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint( 953 InterfaceId id, 954 bool* inserted) { 955 AssertLockAcquired(); 956 // Either |inserted| is nullptr or it points to a boolean initialized as 957 // false. 958 DCHECK(!inserted || !*inserted); 959 960 InterfaceEndpoint* endpoint = FindEndpoint(id); 961 if (!endpoint) { 962 endpoint = new InterfaceEndpoint(this, id); 963 endpoints_[id] = endpoint; 964 if (inserted) 965 *inserted = true; 966 } 967 968 return endpoint; 969 } 970 971 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindEndpoint( 972 InterfaceId id) { 973 AssertLockAcquired(); 974 auto iter = endpoints_.find(id); 975 return iter != endpoints_.end() ? iter->second.get() : nullptr; 976 } 977 978 void MultiplexRouter::AssertLockAcquired() { 979 #if DCHECK_IS_ON() 980 if (lock_) 981 lock_->AssertAcquired(); 982 #endif 983 } 984 985 } // namespace internal 986 } // namespace mojo 987