1 // Copyright 2015 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" 6 7 #include <stdint.h> 8 9 #include <utility> 10 11 #include "base/bind.h" 12 #include "base/location.h" 13 #include "base/macros.h" 14 #include "base/memory/ptr_util.h" 15 #include "base/sequenced_task_runner.h" 16 #include "base/stl_util.h" 17 #include "base/synchronization/waitable_event.h" 18 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" 19 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" 20 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h" 21 #include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h" 22 23 namespace mojo { 24 namespace internal { 25 26 // InterfaceEndpoint stores the information of an interface endpoint registered 27 // with the router. 28 // No one other than the router's |endpoints_| and |tasks_| should hold refs to 29 // this object. 30 class MultiplexRouter::InterfaceEndpoint 31 : public base::RefCountedThreadSafe<InterfaceEndpoint>, 32 public InterfaceEndpointController { 33 public: 34 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) 35 : router_(router), 36 id_(id), 37 closed_(false), 38 peer_closed_(false), 39 handle_created_(false), 40 client_(nullptr) {} 41 42 // --------------------------------------------------------------------------- 43 // The following public methods are safe to call from any sequence without 44 // locking. 45 46 InterfaceId id() const { return id_; } 47 48 // --------------------------------------------------------------------------- 49 // The following public methods are called under the router's lock. 50 51 bool closed() const { return closed_; } 52 void set_closed() { 53 router_->AssertLockAcquired(); 54 closed_ = true; 55 } 56 57 bool peer_closed() const { return peer_closed_; } 58 void set_peer_closed() { 59 router_->AssertLockAcquired(); 60 peer_closed_ = true; 61 } 62 63 bool handle_created() const { return handle_created_; } 64 void set_handle_created() { 65 router_->AssertLockAcquired(); 66 handle_created_ = true; 67 } 68 69 const base::Optional<DisconnectReason>& disconnect_reason() const { 70 return disconnect_reason_; 71 } 72 void set_disconnect_reason( 73 const base::Optional<DisconnectReason>& disconnect_reason) { 74 router_->AssertLockAcquired(); 75 disconnect_reason_ = disconnect_reason; 76 } 77 78 base::SequencedTaskRunner* task_runner() const { return task_runner_.get(); } 79 80 InterfaceEndpointClient* client() const { return client_; } 81 82 void AttachClient(InterfaceEndpointClient* client, 83 scoped_refptr<base::SequencedTaskRunner> runner) { 84 router_->AssertLockAcquired(); 85 DCHECK(!client_); 86 DCHECK(!closed_); 87 DCHECK(runner->RunsTasksInCurrentSequence()); 88 89 task_runner_ = std::move(runner); 90 client_ = client; 91 } 92 93 // This method must be called on the same sequence as the corresponding 94 // AttachClient() call. 95 void DetachClient() { 96 router_->AssertLockAcquired(); 97 DCHECK(client_); 98 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 99 DCHECK(!closed_); 100 101 task_runner_ = nullptr; 102 client_ = nullptr; 103 sync_watcher_.reset(); 104 } 105 106 void SignalSyncMessageEvent() { 107 router_->AssertLockAcquired(); 108 if (sync_message_event_signaled_) 109 return; 110 sync_message_event_signaled_ = true; 111 if (sync_watcher_) 112 sync_watcher_->SignalEvent(); 113 } 114 115 void ResetSyncMessageSignal() { 116 router_->AssertLockAcquired(); 117 if (!sync_message_event_signaled_) 118 return; 119 sync_message_event_signaled_ = false; 120 if (sync_watcher_) 121 sync_watcher_->ResetEvent(); 122 } 123 124 // --------------------------------------------------------------------------- 125 // The following public methods (i.e., InterfaceEndpointController 126 // implementation) are called by the client on the same sequence as the 127 // AttachClient() call. They are called outside of the router's lock. 128 129 bool SendMessage(Message* message) override { 130 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 131 message->set_interface_id(id_); 132 return router_->connector_.Accept(message); 133 } 134 135 void AllowWokenUpBySyncWatchOnSameThread() override { 136 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 137 138 EnsureSyncWatcherExists(); 139 sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence(); 140 } 141 142 bool SyncWatch(const bool* should_stop) override { 143 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 144 145 EnsureSyncWatcherExists(); 146 return sync_watcher_->SyncWatch(should_stop); 147 } 148 149 private: 150 friend class base::RefCountedThreadSafe<InterfaceEndpoint>; 151 152 ~InterfaceEndpoint() override { 153 router_->AssertLockAcquired(); 154 155 DCHECK(!client_); 156 } 157 158 void OnSyncEventSignaled() { 159 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 160 scoped_refptr<MultiplexRouter> router_protector(router_); 161 162 MayAutoLock locker(&router_->lock_); 163 scoped_refptr<InterfaceEndpoint> self_protector(this); 164 165 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); 166 167 if (!more_to_process) 168 ResetSyncMessageSignal(); 169 170 // Currently there are no queued sync messages and the peer has closed so 171 // there won't be incoming sync messages in the future. 172 if (!more_to_process && peer_closed_) { 173 // If a SyncWatch() call (or multiple ones) of this interface endpoint is 174 // on the call stack, resetting the sync watcher will allow it to exit 175 // when the call stack unwinds to that frame. 176 sync_watcher_.reset(); 177 } 178 } 179 180 void EnsureSyncWatcherExists() { 181 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 182 if (sync_watcher_) 183 return; 184 185 MayAutoLock locker(&router_->lock_); 186 sync_watcher_ = 187 std::make_unique<SequenceLocalSyncEventWatcher>(base::BindRepeating( 188 &InterfaceEndpoint::OnSyncEventSignaled, base::Unretained(this))); 189 if (sync_message_event_signaled_) 190 sync_watcher_->SignalEvent(); 191 } 192 193 // --------------------------------------------------------------------------- 194 // The following members are safe to access from any sequence. 195 196 MultiplexRouter* const router_; 197 const InterfaceId id_; 198 199 // --------------------------------------------------------------------------- 200 // The following members are accessed under the router's lock. 201 202 // Whether the endpoint has been closed. 203 bool closed_; 204 // Whether the peer endpoint has been closed. 205 bool peer_closed_; 206 207 // Whether there is already a ScopedInterfaceEndpointHandle created for this 208 // endpoint. 209 bool handle_created_; 210 211 base::Optional<DisconnectReason> disconnect_reason_; 212 213 // The task runner on which |client_|'s methods can be called. 214 scoped_refptr<base::SequencedTaskRunner> task_runner_; 215 // Not owned. It is null if no client is attached to this endpoint. 216 InterfaceEndpointClient* client_; 217 218 // Indicates whether the sync watcher should be signaled for this endpoint. 219 bool sync_message_event_signaled_ = false; 220 221 // Guarded by the router's lock. Used to synchronously wait on replies. 222 std::unique_ptr<SequenceLocalSyncEventWatcher> sync_watcher_; 223 224 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); 225 }; 226 227 // MessageWrapper objects are always destroyed under the router's lock. On 228 // destruction, if the message it wrappers contains interface IDs, the wrapper 229 // closes the corresponding endpoints. 230 class MultiplexRouter::MessageWrapper { 231 public: 232 MessageWrapper() = default; 233 234 MessageWrapper(MultiplexRouter* router, Message message) 235 : router_(router), value_(std::move(message)) {} 236 237 MessageWrapper(MessageWrapper&& other) 238 : router_(other.router_), value_(std::move(other.value_)) {} 239 240 ~MessageWrapper() { 241 if (!router_ || value_.IsNull()) 242 return; 243 244 router_->AssertLockAcquired(); 245 // Don't try to close the endpoints if at this point the router is already 246 // half-destructed. 247 if (!router_->being_destructed_) 248 router_->CloseEndpointsForMessage(value_); 249 } 250 251 MessageWrapper& operator=(MessageWrapper&& other) { 252 router_ = other.router_; 253 value_ = std::move(other.value_); 254 return *this; 255 } 256 257 const Message& value() const { return value_; } 258 259 // Must be called outside of the router's lock. 260 // Returns a null message if it fails to deseralize the associated endpoint 261 // handles. 262 Message DeserializeEndpointHandlesAndTake() { 263 if (!value_.DeserializeAssociatedEndpointHandles(router_)) { 264 // The previous call may have deserialized part of the associated 265 // interface endpoint handles. They must be destroyed outside of the 266 // router's lock, so we cannot wait until destruction of MessageWrapper. 267 value_.Reset(); 268 return Message(); 269 } 270 return std::move(value_); 271 } 272 273 private: 274 MultiplexRouter* router_ = nullptr; 275 Message value_; 276 277 DISALLOW_COPY_AND_ASSIGN(MessageWrapper); 278 }; 279 280 struct MultiplexRouter::Task { 281 public: 282 // Doesn't take ownership of |message| but takes its contents. 283 static std::unique_ptr<Task> CreateMessageTask( 284 MessageWrapper message_wrapper) { 285 Task* task = new Task(MESSAGE); 286 task->message_wrapper = std::move(message_wrapper); 287 return base::WrapUnique(task); 288 } 289 static std::unique_ptr<Task> CreateNotifyErrorTask( 290 InterfaceEndpoint* endpoint) { 291 Task* task = new Task(NOTIFY_ERROR); 292 task->endpoint_to_notify = endpoint; 293 return base::WrapUnique(task); 294 } 295 296 ~Task() {} 297 298 bool IsMessageTask() const { return type == MESSAGE; } 299 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; } 300 301 MessageWrapper message_wrapper; 302 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; 303 304 enum Type { MESSAGE, NOTIFY_ERROR }; 305 Type type; 306 307 private: 308 explicit Task(Type in_type) : type(in_type) {} 309 310 DISALLOW_COPY_AND_ASSIGN(Task); 311 }; 312 313 MultiplexRouter::MultiplexRouter( 314 ScopedMessagePipeHandle message_pipe, 315 Config config, 316 bool set_interface_id_namesapce_bit, 317 scoped_refptr<base::SequencedTaskRunner> runner) 318 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), 319 task_runner_(runner), 320 filters_(this), 321 connector_(std::move(message_pipe), 322 config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND 323 : Connector::SINGLE_THREADED_SEND, 324 std::move(runner)), 325 control_message_handler_(this), 326 control_message_proxy_(&connector_) { 327 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 328 329 if (config == MULTI_INTERFACE) 330 lock_.emplace(); 331 332 if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS || 333 config == MULTI_INTERFACE) { 334 // Always participate in sync handle watching in multi-interface mode, 335 // because even if it doesn't expect sync requests during sync handle 336 // watching, it may still need to dispatch messages to associated endpoints 337 // on a different sequence. 338 connector_.AllowWokenUpBySyncWatchOnSameThread(); 339 } 340 connector_.set_incoming_receiver(&filters_); 341 connector_.set_connection_error_handler(base::Bind( 342 &MultiplexRouter::OnPipeConnectionError, base::Unretained(this))); 343 344 std::unique_ptr<MessageHeaderValidator> header_validator = 345 std::make_unique<MessageHeaderValidator>(); 346 header_validator_ = header_validator.get(); 347 filters_.Append(std::move(header_validator)); 348 } 349 350 MultiplexRouter::~MultiplexRouter() { 351 MayAutoLock locker(&lock_); 352 353 being_destructed_ = true; 354 355 sync_message_tasks_.clear(); 356 tasks_.clear(); 357 endpoints_.clear(); 358 } 359 360 void MultiplexRouter::AddIncomingMessageFilter( 361 std::unique_ptr<MessageReceiver> filter) { 362 filters_.Append(std::move(filter)); 363 } 364 365 void MultiplexRouter::SetMasterInterfaceName(const char* name) { 366 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 367 header_validator_->SetDescription(std::string(name) + 368 " [master] MessageHeaderValidator"); 369 control_message_handler_.SetDescription( 370 std::string(name) + " [master] PipeControlMessageHandler"); 371 connector_.SetWatcherHeapProfilerTag(name); 372 } 373 374 InterfaceId MultiplexRouter::AssociateInterface( 375 ScopedInterfaceEndpointHandle handle_to_send) { 376 if (!handle_to_send.pending_association()) 377 return kInvalidInterfaceId; 378 379 uint32_t id = 0; 380 { 381 MayAutoLock locker(&lock_); 382 do { 383 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) 384 next_interface_id_value_ = 1; 385 id = next_interface_id_value_++; 386 if (set_interface_id_namespace_bit_) 387 id |= kInterfaceIdNamespaceMask; 388 } while (base::ContainsKey(endpoints_, id)); 389 390 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); 391 endpoints_[id] = endpoint; 392 if (encountered_error_) 393 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 394 endpoint->set_handle_created(); 395 } 396 397 if (!NotifyAssociation(&handle_to_send, id)) { 398 // The peer handle of |handle_to_send|, which is supposed to join this 399 // associated group, has been closed. 400 { 401 MayAutoLock locker(&lock_); 402 InterfaceEndpoint* endpoint = FindEndpoint(id); 403 if (endpoint) 404 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 405 } 406 407 control_message_proxy_.NotifyPeerEndpointClosed( 408 id, handle_to_send.disconnect_reason()); 409 } 410 return id; 411 } 412 413 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( 414 InterfaceId id) { 415 if (!IsValidInterfaceId(id)) 416 return ScopedInterfaceEndpointHandle(); 417 418 MayAutoLock locker(&lock_); 419 bool inserted = false; 420 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); 421 if (inserted) { 422 if (encountered_error_) 423 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 424 } else { 425 if (endpoint->handle_created() || endpoint->closed()) 426 return ScopedInterfaceEndpointHandle(); 427 } 428 429 endpoint->set_handle_created(); 430 return CreateScopedInterfaceEndpointHandle(id); 431 } 432 433 void MultiplexRouter::CloseEndpointHandle( 434 InterfaceId id, 435 const base::Optional<DisconnectReason>& reason) { 436 if (!IsValidInterfaceId(id)) 437 return; 438 439 MayAutoLock locker(&lock_); 440 DCHECK(base::ContainsKey(endpoints_, id)); 441 InterfaceEndpoint* endpoint = endpoints_[id].get(); 442 DCHECK(!endpoint->client()); 443 DCHECK(!endpoint->closed()); 444 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 445 446 if (!IsMasterInterfaceId(id) || reason) { 447 MayAutoUnlock unlocker(&lock_); 448 control_message_proxy_.NotifyPeerEndpointClosed(id, reason); 449 } 450 451 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); 452 } 453 454 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( 455 const ScopedInterfaceEndpointHandle& handle, 456 InterfaceEndpointClient* client, 457 scoped_refptr<base::SequencedTaskRunner> runner) { 458 const InterfaceId id = handle.id(); 459 460 DCHECK(IsValidInterfaceId(id)); 461 DCHECK(client); 462 463 MayAutoLock locker(&lock_); 464 DCHECK(base::ContainsKey(endpoints_, id)); 465 466 InterfaceEndpoint* endpoint = endpoints_[id].get(); 467 endpoint->AttachClient(client, std::move(runner)); 468 469 if (endpoint->peer_closed()) 470 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 471 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); 472 473 return endpoint; 474 } 475 476 void MultiplexRouter::DetachEndpointClient( 477 const ScopedInterfaceEndpointHandle& handle) { 478 const InterfaceId id = handle.id(); 479 480 DCHECK(IsValidInterfaceId(id)); 481 482 MayAutoLock locker(&lock_); 483 DCHECK(base::ContainsKey(endpoints_, id)); 484 485 InterfaceEndpoint* endpoint = endpoints_[id].get(); 486 endpoint->DetachClient(); 487 } 488 489 void MultiplexRouter::RaiseError() { 490 if (task_runner_->RunsTasksInCurrentSequence()) { 491 connector_.RaiseError(); 492 } else { 493 task_runner_->PostTask(FROM_HERE, 494 base::Bind(&MultiplexRouter::RaiseError, this)); 495 } 496 } 497 498 bool MultiplexRouter::PrefersSerializedMessages() { 499 MayAutoLock locker(&lock_); 500 return connector_.PrefersSerializedMessages(); 501 } 502 503 void MultiplexRouter::CloseMessagePipe() { 504 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 505 connector_.CloseMessagePipe(); 506 // CloseMessagePipe() above won't trigger connection error handler. 507 // Explicitly call OnPipeConnectionError() so that associated endpoints will 508 // get notified. 509 OnPipeConnectionError(); 510 } 511 512 void MultiplexRouter::PauseIncomingMethodCallProcessing() { 513 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 514 connector_.PauseIncomingMethodCallProcessing(); 515 516 MayAutoLock locker(&lock_); 517 paused_ = true; 518 519 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) 520 iter->second->ResetSyncMessageSignal(); 521 } 522 523 void MultiplexRouter::ResumeIncomingMethodCallProcessing() { 524 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 525 connector_.ResumeIncomingMethodCallProcessing(); 526 527 MayAutoLock locker(&lock_); 528 paused_ = false; 529 530 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { 531 auto sync_iter = sync_message_tasks_.find(iter->first); 532 if (iter->second->peer_closed() || 533 (sync_iter != sync_message_tasks_.end() && 534 !sync_iter->second.empty())) { 535 iter->second->SignalSyncMessageEvent(); 536 } 537 } 538 539 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); 540 } 541 542 bool MultiplexRouter::HasAssociatedEndpoints() const { 543 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 544 MayAutoLock locker(&lock_); 545 546 if (endpoints_.size() > 1) 547 return true; 548 if (endpoints_.size() == 0) 549 return false; 550 551 return !base::ContainsKey(endpoints_, kMasterInterfaceId); 552 } 553 554 void MultiplexRouter::EnableTestingMode() { 555 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 556 MayAutoLock locker(&lock_); 557 558 testing_mode_ = true; 559 connector_.set_enforce_errors_from_incoming_receiver(false); 560 } 561 562 bool MultiplexRouter::Accept(Message* message) { 563 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 564 565 // Insert endpoints for the payload interface IDs as soon as the message 566 // arrives, instead of waiting till the message is dispatched. Consider the 567 // following sequence: 568 // 1) Async message msg1 arrives, containing interface ID x. Msg1 is not 569 // dispatched because a sync call is blocking the thread. 570 // 2) Sync message msg2 arrives targeting interface ID x. 571 // 572 // If we don't insert endpoint for interface ID x, when trying to dispatch 573 // msg2 we don't know whether it is an unexpected message or it is just 574 // because the message containing x hasn't been dispatched. 575 if (!InsertEndpointsForMessage(*message)) 576 return false; 577 578 scoped_refptr<MultiplexRouter> protector(this); 579 MayAutoLock locker(&lock_); 580 581 DCHECK(!paused_); 582 583 ClientCallBehavior client_call_behavior = 584 connector_.during_sync_handle_watcher_callback() 585 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES 586 : ALLOW_DIRECT_CLIENT_CALLS; 587 588 MessageWrapper message_wrapper(this, std::move(*message)); 589 bool processed = tasks_.empty() && ProcessIncomingMessage( 590 &message_wrapper, client_call_behavior, 591 connector_.task_runner()); 592 593 if (!processed) { 594 // Either the task queue is not empty or we cannot process the message 595 // directly. In both cases, there is no need to call ProcessTasks(). 596 tasks_.push_back(Task::CreateMessageTask(std::move(message_wrapper))); 597 Task* task = tasks_.back().get(); 598 599 if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) { 600 InterfaceId id = task->message_wrapper.value().interface_id(); 601 sync_message_tasks_[id].push_back(task); 602 InterfaceEndpoint* endpoint = FindEndpoint(id); 603 if (endpoint) 604 endpoint->SignalSyncMessageEvent(); 605 } 606 } else if (!tasks_.empty()) { 607 // Processing the message may result in new tasks (for error notification) 608 // being added to the queue. In this case, we have to attempt to process the 609 // tasks. 610 ProcessTasks(client_call_behavior, connector_.task_runner()); 611 } 612 613 // Always return true. If we see errors during message processing, we will 614 // explicitly call Connector::RaiseError() to disconnect the message pipe. 615 return true; 616 } 617 618 bool MultiplexRouter::OnPeerAssociatedEndpointClosed( 619 InterfaceId id, 620 const base::Optional<DisconnectReason>& reason) { 621 MayAutoLock locker(&lock_); 622 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); 623 624 if (reason) 625 endpoint->set_disconnect_reason(reason); 626 627 // It is possible that this endpoint has been set as peer closed. That is 628 // because when the message pipe is closed, all the endpoints are updated with 629 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue, 630 // as long as there are refs keeping the router alive. If there is a 631 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get 632 // here and see that the endpoint has been marked as peer closed. 633 if (!endpoint->peer_closed()) { 634 if (endpoint->client()) 635 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 636 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 637 } 638 639 // No need to trigger a ProcessTasks() because it is already on the stack. 640 641 return true; 642 } 643 644 void MultiplexRouter::OnPipeConnectionError() { 645 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 646 647 scoped_refptr<MultiplexRouter> protector(this); 648 MayAutoLock locker(&lock_); 649 650 encountered_error_ = true; 651 652 // Calling UpdateEndpointStateMayRemove() may remove the corresponding value 653 // from |endpoints_| and invalidate any iterator of |endpoints_|. Therefore, 654 // copy the endpoint pointers to a vector and iterate over it instead. 655 std::vector<scoped_refptr<InterfaceEndpoint>> endpoint_vector; 656 endpoint_vector.reserve(endpoints_.size()); 657 for (const auto& pair : endpoints_) 658 endpoint_vector.push_back(pair.second); 659 660 for (const auto& endpoint : endpoint_vector) { 661 if (endpoint->client()) 662 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint.get())); 663 664 UpdateEndpointStateMayRemove(endpoint.get(), PEER_ENDPOINT_CLOSED); 665 } 666 667 ProcessTasks(connector_.during_sync_handle_watcher_callback() 668 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES 669 : ALLOW_DIRECT_CLIENT_CALLS, 670 connector_.task_runner()); 671 } 672 673 void MultiplexRouter::ProcessTasks( 674 ClientCallBehavior client_call_behavior, 675 base::SequencedTaskRunner* current_task_runner) { 676 AssertLockAcquired(); 677 678 if (posted_to_process_tasks_) 679 return; 680 681 while (!tasks_.empty() && !paused_) { 682 std::unique_ptr<Task> task(std::move(tasks_.front())); 683 tasks_.pop_front(); 684 685 InterfaceId id = kInvalidInterfaceId; 686 bool sync_message = 687 task->IsMessageTask() && !task->message_wrapper.value().IsNull() && 688 task->message_wrapper.value().has_flag(Message::kFlagIsSync); 689 if (sync_message) { 690 id = task->message_wrapper.value().interface_id(); 691 auto& sync_message_queue = sync_message_tasks_[id]; 692 DCHECK_EQ(task.get(), sync_message_queue.front()); 693 sync_message_queue.pop_front(); 694 } 695 696 bool processed = 697 task->IsNotifyErrorTask() 698 ? ProcessNotifyErrorTask(task.get(), client_call_behavior, 699 current_task_runner) 700 : ProcessIncomingMessage(&task->message_wrapper, 701 client_call_behavior, current_task_runner); 702 703 if (!processed) { 704 if (sync_message) { 705 auto& sync_message_queue = sync_message_tasks_[id]; 706 sync_message_queue.push_front(task.get()); 707 } 708 tasks_.push_front(std::move(task)); 709 break; 710 } else { 711 if (sync_message) { 712 auto iter = sync_message_tasks_.find(id); 713 if (iter != sync_message_tasks_.end() && iter->second.empty()) 714 sync_message_tasks_.erase(iter); 715 } 716 } 717 } 718 } 719 720 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { 721 AssertLockAcquired(); 722 723 auto iter = sync_message_tasks_.find(id); 724 if (iter == sync_message_tasks_.end()) 725 return false; 726 727 if (paused_) 728 return true; 729 730 MultiplexRouter::Task* task = iter->second.front(); 731 iter->second.pop_front(); 732 733 DCHECK(task->IsMessageTask()); 734 MessageWrapper message_wrapper = std::move(task->message_wrapper); 735 736 // Note: after this call, |task| and |iter| may be invalidated. 737 bool processed = ProcessIncomingMessage( 738 &message_wrapper, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr); 739 DCHECK(processed); 740 741 iter = sync_message_tasks_.find(id); 742 if (iter == sync_message_tasks_.end()) 743 return false; 744 745 if (iter->second.empty()) { 746 sync_message_tasks_.erase(iter); 747 return false; 748 } 749 750 return true; 751 } 752 753 bool MultiplexRouter::ProcessNotifyErrorTask( 754 Task* task, 755 ClientCallBehavior client_call_behavior, 756 base::SequencedTaskRunner* current_task_runner) { 757 DCHECK(!current_task_runner || 758 current_task_runner->RunsTasksInCurrentSequence()); 759 DCHECK(!paused_); 760 761 AssertLockAcquired(); 762 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); 763 if (!endpoint->client()) 764 return true; 765 766 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || 767 endpoint->task_runner() != current_task_runner) { 768 MaybePostToProcessTasks(endpoint->task_runner()); 769 return false; 770 } 771 772 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence()); 773 774 InterfaceEndpointClient* client = endpoint->client(); 775 base::Optional<DisconnectReason> disconnect_reason( 776 endpoint->disconnect_reason()); 777 778 { 779 // We must unlock before calling into |client| because it may call this 780 // object within NotifyError(). Holding the lock will lead to deadlock. 781 // 782 // It is safe to call into |client| without the lock. Because |client| is 783 // always accessed on the same sequence, including DetachEndpointClient(). 784 MayAutoUnlock unlocker(&lock_); 785 client->NotifyError(disconnect_reason); 786 } 787 return true; 788 } 789 790 bool MultiplexRouter::ProcessIncomingMessage( 791 MessageWrapper* message_wrapper, 792 ClientCallBehavior client_call_behavior, 793 base::SequencedTaskRunner* current_task_runner) { 794 DCHECK(!current_task_runner || 795 current_task_runner->RunsTasksInCurrentSequence()); 796 DCHECK(!paused_); 797 DCHECK(message_wrapper); 798 AssertLockAcquired(); 799 800 const Message* message = &message_wrapper->value(); 801 if (message->IsNull()) { 802 // This is a sync message and has been processed during sync handle 803 // watching. 804 return true; 805 } 806 807 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { 808 bool result = false; 809 810 { 811 MayAutoUnlock unlocker(&lock_); 812 Message tmp_message = 813 message_wrapper->DeserializeEndpointHandlesAndTake(); 814 result = !tmp_message.IsNull() && 815 control_message_handler_.Accept(&tmp_message); 816 } 817 818 if (!result) 819 RaiseErrorInNonTestingMode(); 820 821 return true; 822 } 823 824 InterfaceId id = message->interface_id(); 825 DCHECK(IsValidInterfaceId(id)); 826 827 InterfaceEndpoint* endpoint = FindEndpoint(id); 828 if (!endpoint || endpoint->closed()) 829 return true; 830 831 if (!endpoint->client()) { 832 // We need to wait until a client is attached in order to dispatch further 833 // messages. 834 return false; 835 } 836 837 bool can_direct_call; 838 if (message->has_flag(Message::kFlagIsSync)) { 839 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS && 840 endpoint->task_runner()->RunsTasksInCurrentSequence(); 841 } else { 842 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS && 843 endpoint->task_runner() == current_task_runner; 844 } 845 846 if (!can_direct_call) { 847 MaybePostToProcessTasks(endpoint->task_runner()); 848 return false; 849 } 850 851 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence()); 852 853 InterfaceEndpointClient* client = endpoint->client(); 854 bool result = false; 855 { 856 // We must unlock before calling into |client| because it may call this 857 // object within HandleIncomingMessage(). Holding the lock will lead to 858 // deadlock. 859 // 860 // It is safe to call into |client| without the lock. Because |client| is 861 // always accessed on the same sequence, including DetachEndpointClient(). 862 MayAutoUnlock unlocker(&lock_); 863 Message tmp_message = message_wrapper->DeserializeEndpointHandlesAndTake(); 864 result = 865 !tmp_message.IsNull() && client->HandleIncomingMessage(&tmp_message); 866 } 867 if (!result) 868 RaiseErrorInNonTestingMode(); 869 870 return true; 871 } 872 873 void MultiplexRouter::MaybePostToProcessTasks( 874 base::SequencedTaskRunner* task_runner) { 875 AssertLockAcquired(); 876 if (posted_to_process_tasks_) 877 return; 878 879 posted_to_process_tasks_ = true; 880 posted_to_task_runner_ = task_runner; 881 task_runner->PostTask( 882 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); 883 } 884 885 void MultiplexRouter::LockAndCallProcessTasks() { 886 // There is no need to hold a ref to this class in this case because this is 887 // always called using base::Bind(), which holds a ref. 888 MayAutoLock locker(&lock_); 889 posted_to_process_tasks_ = false; 890 scoped_refptr<base::SequencedTaskRunner> runner( 891 std::move(posted_to_task_runner_)); 892 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); 893 } 894 895 void MultiplexRouter::UpdateEndpointStateMayRemove( 896 InterfaceEndpoint* endpoint, 897 EndpointStateUpdateType type) { 898 if (type == ENDPOINT_CLOSED) { 899 endpoint->set_closed(); 900 } else { 901 endpoint->set_peer_closed(); 902 // If the interface endpoint is performing a sync watch, this makes sure 903 // it is notified and eventually exits the sync watch. 904 endpoint->SignalSyncMessageEvent(); 905 } 906 if (endpoint->closed() && endpoint->peer_closed()) 907 endpoints_.erase(endpoint->id()); 908 } 909 910 void MultiplexRouter::RaiseErrorInNonTestingMode() { 911 AssertLockAcquired(); 912 if (!testing_mode_) 913 RaiseError(); 914 } 915 916 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint( 917 InterfaceId id, 918 bool* inserted) { 919 AssertLockAcquired(); 920 // Either |inserted| is nullptr or it points to a boolean initialized as 921 // false. 922 DCHECK(!inserted || !*inserted); 923 924 InterfaceEndpoint* endpoint = FindEndpoint(id); 925 if (!endpoint) { 926 endpoint = new InterfaceEndpoint(this, id); 927 endpoints_[id] = endpoint; 928 if (inserted) 929 *inserted = true; 930 } 931 932 return endpoint; 933 } 934 935 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindEndpoint( 936 InterfaceId id) { 937 AssertLockAcquired(); 938 auto iter = endpoints_.find(id); 939 return iter != endpoints_.end() ? iter->second.get() : nullptr; 940 } 941 942 void MultiplexRouter::AssertLockAcquired() { 943 #if DCHECK_IS_ON() 944 if (lock_) 945 lock_->AssertAcquired(); 946 #endif 947 } 948 949 bool MultiplexRouter::InsertEndpointsForMessage(const Message& message) { 950 if (!message.is_serialized()) 951 return true; 952 953 uint32_t num_ids = message.payload_num_interface_ids(); 954 if (num_ids == 0) 955 return true; 956 957 const uint32_t* ids = message.payload_interface_ids(); 958 959 MayAutoLock locker(&lock_); 960 for (uint32_t i = 0; i < num_ids; ++i) { 961 // Message header validation already ensures that the IDs are valid and not 962 // the master ID. 963 // The IDs are from the remote side and therefore their namespace bit is 964 // supposed to be different than the value that this router would use. 965 if (set_interface_id_namespace_bit_ == 966 HasInterfaceIdNamespaceBitSet(ids[i])) { 967 return false; 968 } 969 970 // It is possible that the endpoint already exists even when the remote side 971 // is well-behaved: it might have notified us that the peer endpoint has 972 // closed. 973 bool inserted = false; 974 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(ids[i], &inserted); 975 if (endpoint->closed() || endpoint->handle_created()) 976 return false; 977 } 978 979 return true; 980 } 981 982 void MultiplexRouter::CloseEndpointsForMessage(const Message& message) { 983 AssertLockAcquired(); 984 985 if (!message.is_serialized()) 986 return; 987 988 uint32_t num_ids = message.payload_num_interface_ids(); 989 if (num_ids == 0) 990 return; 991 992 const uint32_t* ids = message.payload_interface_ids(); 993 for (uint32_t i = 0; i < num_ids; ++i) { 994 InterfaceEndpoint* endpoint = FindEndpoint(ids[i]); 995 // If the remote side maliciously sends the same interface ID in another 996 // message which has been dispatched, we could get here with no endpoint 997 // for the ID, a closed endpoint, or an endpoint with handle created. 998 if (!endpoint || endpoint->closed() || endpoint->handle_created()) { 999 RaiseErrorInNonTestingMode(); 1000 continue; 1001 } 1002 1003 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 1004 MayAutoUnlock unlocker(&lock_); 1005 control_message_proxy_.NotifyPeerEndpointClosed(ids[i], base::nullopt); 1006 } 1007 1008 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); 1009 } 1010 1011 } // namespace internal 1012 } // namespace mojo 1013