1 // Copyright 2015 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" 6 7 #include <stdint.h> 8 9 #include <utility> 10 11 #include "base/bind.h" 12 #include "base/location.h" 13 #include "base/macros.h" 14 #include "base/memory/ptr_util.h" 15 #include "base/single_thread_task_runner.h" 16 #include "base/stl_util.h" 17 #include "base/threading/thread_task_runner_handle.h" 18 #include "mojo/public/cpp/bindings/associated_group.h" 19 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" 20 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" 21 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" 22 23 namespace mojo { 24 namespace internal { 25 26 // InterfaceEndpoint stores the information of an interface endpoint registered 27 // with the router. 28 // No one other than the router's |endpoints_| and |tasks_| should hold refs to 29 // this object. 30 class MultiplexRouter::InterfaceEndpoint 31 : public base::RefCounted<InterfaceEndpoint>, 32 public InterfaceEndpointController { 33 public: 34 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) 35 : router_(router), 36 id_(id), 37 closed_(false), 38 peer_closed_(false), 39 client_(nullptr), 40 event_signalled_(false) {} 41 42 // --------------------------------------------------------------------------- 43 // The following public methods are safe to call from any threads without 44 // locking. 45 46 InterfaceId id() const { return id_; } 47 48 // --------------------------------------------------------------------------- 49 // The following public methods are called under the router's lock. 50 51 bool closed() const { return closed_; } 52 void set_closed() { 53 router_->lock_.AssertAcquired(); 54 closed_ = true; 55 } 56 57 bool peer_closed() const { return peer_closed_; } 58 void set_peer_closed() { 59 router_->lock_.AssertAcquired(); 60 peer_closed_ = true; 61 } 62 63 base::SingleThreadTaskRunner* task_runner() const { 64 return task_runner_.get(); 65 } 66 67 InterfaceEndpointClient* client() const { return client_; } 68 69 void AttachClient(InterfaceEndpointClient* client, 70 scoped_refptr<base::SingleThreadTaskRunner> runner) { 71 router_->lock_.AssertAcquired(); 72 DCHECK(!client_); 73 DCHECK(!closed_); 74 DCHECK(runner->BelongsToCurrentThread()); 75 76 task_runner_ = std::move(runner); 77 client_ = client; 78 } 79 80 // This method must be called on the same thread as the corresponding 81 // AttachClient() call. 82 void DetachClient() { 83 router_->lock_.AssertAcquired(); 84 DCHECK(client_); 85 DCHECK(task_runner_->BelongsToCurrentThread()); 86 DCHECK(!closed_); 87 88 task_runner_ = nullptr; 89 client_ = nullptr; 90 sync_watcher_.reset(); 91 } 92 93 void SignalSyncMessageEvent() { 94 router_->lock_.AssertAcquired(); 95 if (event_signalled_) 96 return; 97 98 EnsureEventMessagePipeExists(); 99 event_signalled_ = true; 100 MojoResult result = 101 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr, 102 0, MOJO_WRITE_MESSAGE_FLAG_NONE); 103 DCHECK_EQ(MOJO_RESULT_OK, result); 104 } 105 106 // --------------------------------------------------------------------------- 107 // The following public methods (i.e., InterfaceEndpointController 108 // implementation) are called by the client on the same thread as the 109 // AttachClient() call. They are called outside of the router's lock. 110 111 bool SendMessage(Message* message) override { 112 DCHECK(task_runner_->BelongsToCurrentThread()); 113 message->set_interface_id(id_); 114 return router_->connector_.Accept(message); 115 } 116 117 void AllowWokenUpBySyncWatchOnSameThread() override { 118 DCHECK(task_runner_->BelongsToCurrentThread()); 119 120 EnsureSyncWatcherExists(); 121 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); 122 } 123 124 bool SyncWatch(const bool* should_stop) override { 125 DCHECK(task_runner_->BelongsToCurrentThread()); 126 127 EnsureSyncWatcherExists(); 128 return sync_watcher_->SyncWatch(should_stop); 129 } 130 131 private: 132 friend class base::RefCounted<InterfaceEndpoint>; 133 134 ~InterfaceEndpoint() override { 135 router_->lock_.AssertAcquired(); 136 137 DCHECK(!client_); 138 DCHECK(closed_); 139 DCHECK(peer_closed_); 140 DCHECK(!sync_watcher_); 141 } 142 143 void OnHandleReady(MojoResult result) { 144 DCHECK(task_runner_->BelongsToCurrentThread()); 145 scoped_refptr<InterfaceEndpoint> self_protector(this); 146 scoped_refptr<MultiplexRouter> router_protector(router_); 147 148 // Because we never close |sync_message_event_{sender,receiver}_| before 149 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. 150 DCHECK_EQ(MOJO_RESULT_OK, result); 151 bool reset_sync_watcher = false; 152 { 153 base::AutoLock locker(router_->lock_); 154 155 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); 156 157 if (!more_to_process) 158 ResetSyncMessageSignal(); 159 160 // Currently there are no queued sync messages and the peer has closed so 161 // there won't be incoming sync messages in the future. 162 reset_sync_watcher = !more_to_process && peer_closed_; 163 } 164 if (reset_sync_watcher) { 165 // If a SyncWatch() call (or multiple ones) of this interface endpoint is 166 // on the call stack, resetting the sync watcher will allow it to exit 167 // when the call stack unwinds to that frame. 168 sync_watcher_.reset(); 169 } 170 } 171 172 void EnsureSyncWatcherExists() { 173 DCHECK(task_runner_->BelongsToCurrentThread()); 174 if (sync_watcher_) 175 return; 176 177 { 178 base::AutoLock locker(router_->lock_); 179 EnsureEventMessagePipeExists(); 180 181 auto iter = router_->sync_message_tasks_.find(id_); 182 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) 183 SignalSyncMessageEvent(); 184 } 185 186 sync_watcher_.reset(new SyncHandleWatcher( 187 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE, 188 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this)))); 189 } 190 191 void EnsureEventMessagePipeExists() { 192 router_->lock_.AssertAcquired(); 193 194 if (sync_message_event_receiver_.is_valid()) 195 return; 196 197 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_, 198 &sync_message_event_receiver_); 199 DCHECK_EQ(MOJO_RESULT_OK, result); 200 } 201 202 void ResetSyncMessageSignal() { 203 router_->lock_.AssertAcquired(); 204 205 if (!event_signalled_) 206 return; 207 208 DCHECK(sync_message_event_receiver_.is_valid()); 209 MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(), 210 nullptr, nullptr, nullptr, nullptr, 211 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); 212 DCHECK_EQ(MOJO_RESULT_OK, result); 213 event_signalled_ = false; 214 } 215 216 // --------------------------------------------------------------------------- 217 // The following members are safe to access from any threads. 218 219 MultiplexRouter* const router_; 220 const InterfaceId id_; 221 222 // --------------------------------------------------------------------------- 223 // The following members are accessed under the router's lock. 224 225 // Whether the endpoint has been closed. 226 bool closed_; 227 // Whether the peer endpoint has been closed. 228 bool peer_closed_; 229 230 // The task runner on which |client_|'s methods can be called. 231 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 232 // Not owned. It is null if no client is attached to this endpoint. 233 InterfaceEndpointClient* client_; 234 235 // A message pipe used as an event to signal that sync messages are available. 236 // The message pipe handles are initialized under the router's lock and remain 237 // unchanged afterwards. They may be accessed outside of the router's lock 238 // later. 239 ScopedMessagePipeHandle sync_message_event_sender_; 240 ScopedMessagePipeHandle sync_message_event_receiver_; 241 bool event_signalled_; 242 243 // --------------------------------------------------------------------------- 244 // The following members are only valid while a client is attached. They are 245 // used exclusively on the client's thread. They may be accessed outside of 246 // the router's lock. 247 248 std::unique_ptr<SyncHandleWatcher> sync_watcher_; 249 250 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); 251 }; 252 253 struct MultiplexRouter::Task { 254 public: 255 // Doesn't take ownership of |message| but takes its contents. 256 static std::unique_ptr<Task> CreateMessageTask(Message* message) { 257 Task* task = new Task(MESSAGE); 258 task->message.reset(new Message); 259 message->MoveTo(task->message.get()); 260 return base::WrapUnique(task); 261 } 262 static std::unique_ptr<Task> CreateNotifyErrorTask( 263 InterfaceEndpoint* endpoint) { 264 Task* task = new Task(NOTIFY_ERROR); 265 task->endpoint_to_notify = endpoint; 266 return base::WrapUnique(task); 267 } 268 269 ~Task() {} 270 271 bool IsMessageTask() const { return type == MESSAGE; } 272 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; } 273 274 std::unique_ptr<Message> message; 275 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; 276 277 enum Type { MESSAGE, NOTIFY_ERROR }; 278 Type type; 279 280 private: 281 explicit Task(Type in_type) : type(in_type) {} 282 }; 283 284 MultiplexRouter::MultiplexRouter( 285 bool set_interface_id_namesapce_bit, 286 ScopedMessagePipeHandle message_pipe, 287 scoped_refptr<base::SingleThreadTaskRunner> runner) 288 : AssociatedGroupController(base::ThreadTaskRunnerHandle::Get()), 289 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), 290 header_validator_(this), 291 connector_(std::move(message_pipe), 292 Connector::MULTI_THREADED_SEND, 293 std::move(runner)), 294 control_message_handler_(this), 295 control_message_proxy_(&connector_), 296 next_interface_id_value_(1), 297 posted_to_process_tasks_(false), 298 encountered_error_(false), 299 testing_mode_(false) { 300 // Always participate in sync handle watching, because even if it doesn't 301 // expect sync requests during sync handle watching, it may still need to 302 // dispatch messages to associated endpoints on a different thread. 303 connector_.AllowWokenUpBySyncWatchOnSameThread(); 304 connector_.set_incoming_receiver(&header_validator_); 305 connector_.set_connection_error_handler( 306 base::Bind(&MultiplexRouter::OnPipeConnectionError, 307 base::Unretained(this))); 308 } 309 310 MultiplexRouter::~MultiplexRouter() { 311 base::AutoLock locker(lock_); 312 313 sync_message_tasks_.clear(); 314 tasks_.clear(); 315 316 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { 317 InterfaceEndpoint* endpoint = iter->second.get(); 318 // Increment the iterator before calling UpdateEndpointStateMayRemove() 319 // because it may remove the corresponding value from the map. 320 ++iter; 321 322 DCHECK(endpoint->closed()); 323 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 324 } 325 326 DCHECK(endpoints_.empty()); 327 } 328 329 void MultiplexRouter::SetMasterInterfaceName(const std::string& name) { 330 DCHECK(thread_checker_.CalledOnValidThread()); 331 header_validator_.SetDescription(name + " [master] MessageHeaderValidator"); 332 control_message_handler_.SetDescription( 333 name + " [master] PipeControlMessageHandler"); 334 } 335 336 void MultiplexRouter::CreateEndpointHandlePair( 337 ScopedInterfaceEndpointHandle* local_endpoint, 338 ScopedInterfaceEndpointHandle* remote_endpoint) { 339 base::AutoLock locker(lock_); 340 uint32_t id = 0; 341 do { 342 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) 343 next_interface_id_value_ = 1; 344 id = next_interface_id_value_++; 345 if (set_interface_id_namespace_bit_) 346 id |= kInterfaceIdNamespaceMask; 347 } while (ContainsKey(endpoints_, id)); 348 349 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); 350 endpoints_[id] = endpoint; 351 if (encountered_error_) 352 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 353 354 *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true); 355 *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false); 356 } 357 358 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( 359 InterfaceId id) { 360 if (!IsValidInterfaceId(id)) 361 return ScopedInterfaceEndpointHandle(); 362 363 base::AutoLock locker(lock_); 364 bool inserted = false; 365 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); 366 if (inserted) { 367 if (encountered_error_) 368 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 369 } else { 370 // If the endpoint already exist, it is because we have received a 371 // notification that the peer endpoint has closed. 372 CHECK(!endpoint->closed()); 373 CHECK(endpoint->peer_closed()); 374 } 375 return CreateScopedInterfaceEndpointHandle(id, true); 376 } 377 378 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) { 379 if (!IsValidInterfaceId(id)) 380 return; 381 382 base::AutoLock locker(lock_); 383 384 if (!is_local) { 385 DCHECK(ContainsKey(endpoints_, id)); 386 DCHECK(!IsMasterInterfaceId(id)); 387 388 // We will receive a NotifyPeerEndpointClosed message from the other side. 389 control_message_proxy_.NotifyEndpointClosedBeforeSent(id); 390 391 return; 392 } 393 394 DCHECK(ContainsKey(endpoints_, id)); 395 InterfaceEndpoint* endpoint = endpoints_[id].get(); 396 DCHECK(!endpoint->client()); 397 DCHECK(!endpoint->closed()); 398 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 399 400 if (!IsMasterInterfaceId(id)) 401 control_message_proxy_.NotifyPeerEndpointClosed(id); 402 403 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); 404 } 405 406 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( 407 const ScopedInterfaceEndpointHandle& handle, 408 InterfaceEndpointClient* client, 409 scoped_refptr<base::SingleThreadTaskRunner> runner) { 410 const InterfaceId id = handle.id(); 411 412 DCHECK(IsValidInterfaceId(id)); 413 DCHECK(client); 414 415 base::AutoLock locker(lock_); 416 DCHECK(ContainsKey(endpoints_, id)); 417 418 InterfaceEndpoint* endpoint = endpoints_[id].get(); 419 endpoint->AttachClient(client, std::move(runner)); 420 421 if (endpoint->peer_closed()) 422 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 423 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); 424 425 return endpoint; 426 } 427 428 void MultiplexRouter::DetachEndpointClient( 429 const ScopedInterfaceEndpointHandle& handle) { 430 const InterfaceId id = handle.id(); 431 432 DCHECK(IsValidInterfaceId(id)); 433 434 base::AutoLock locker(lock_); 435 DCHECK(ContainsKey(endpoints_, id)); 436 437 InterfaceEndpoint* endpoint = endpoints_[id].get(); 438 endpoint->DetachClient(); 439 } 440 441 void MultiplexRouter::RaiseError() { 442 if (task_runner_->BelongsToCurrentThread()) { 443 connector_.RaiseError(); 444 } else { 445 task_runner_->PostTask(FROM_HERE, 446 base::Bind(&MultiplexRouter::RaiseError, this)); 447 } 448 } 449 450 void MultiplexRouter::CloseMessagePipe() { 451 DCHECK(thread_checker_.CalledOnValidThread()); 452 connector_.CloseMessagePipe(); 453 // CloseMessagePipe() above won't trigger connection error handler. 454 // Explicitly call OnPipeConnectionError() so that associated endpoints will 455 // get notified. 456 OnPipeConnectionError(); 457 } 458 459 bool MultiplexRouter::HasAssociatedEndpoints() const { 460 DCHECK(thread_checker_.CalledOnValidThread()); 461 base::AutoLock locker(lock_); 462 463 if (endpoints_.size() > 1) 464 return true; 465 if (endpoints_.size() == 0) 466 return false; 467 468 return !ContainsKey(endpoints_, kMasterInterfaceId); 469 } 470 471 void MultiplexRouter::EnableTestingMode() { 472 DCHECK(thread_checker_.CalledOnValidThread()); 473 base::AutoLock locker(lock_); 474 475 testing_mode_ = true; 476 connector_.set_enforce_errors_from_incoming_receiver(false); 477 } 478 479 bool MultiplexRouter::Accept(Message* message) { 480 DCHECK(thread_checker_.CalledOnValidThread()); 481 482 scoped_refptr<MultiplexRouter> protector(this); 483 base::AutoLock locker(lock_); 484 485 ClientCallBehavior client_call_behavior = 486 connector_.during_sync_handle_watcher_callback() 487 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES 488 : ALLOW_DIRECT_CLIENT_CALLS; 489 490 bool processed = 491 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, 492 connector_.task_runner()); 493 494 if (!processed) { 495 // Either the task queue is not empty or we cannot process the message 496 // directly. In both cases, there is no need to call ProcessTasks(). 497 tasks_.push_back(Task::CreateMessageTask(message)); 498 Task* task = tasks_.back().get(); 499 500 if (task->message->has_flag(Message::kFlagIsSync)) { 501 InterfaceId id = task->message->interface_id(); 502 sync_message_tasks_[id].push_back(task); 503 auto iter = endpoints_.find(id); 504 if (iter != endpoints_.end()) 505 iter->second->SignalSyncMessageEvent(); 506 } 507 } else if (!tasks_.empty()) { 508 // Processing the message may result in new tasks (for error notification) 509 // being added to the queue. In this case, we have to attempt to process the 510 // tasks. 511 ProcessTasks(client_call_behavior, connector_.task_runner()); 512 } 513 514 // Always return true. If we see errors during message processing, we will 515 // explicitly call Connector::RaiseError() to disconnect the message pipe. 516 return true; 517 } 518 519 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { 520 lock_.AssertAcquired(); 521 522 if (IsMasterInterfaceId(id)) 523 return false; 524 525 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); 526 527 // It is possible that this endpoint has been set as peer closed. That is 528 // because when the message pipe is closed, all the endpoints are updated with 529 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue, 530 // as long as there are refs keeping the router alive. If there is a 531 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get 532 // here and see that the endpoint has been marked as peer closed. 533 if (!endpoint->peer_closed()) { 534 if (endpoint->client()) 535 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 536 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 537 } 538 539 // No need to trigger a ProcessTasks() because it is already on the stack. 540 541 return true; 542 } 543 544 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { 545 lock_.AssertAcquired(); 546 547 if (IsMasterInterfaceId(id)) 548 return false; 549 550 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); 551 DCHECK(!endpoint->closed()); 552 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 553 554 control_message_proxy_.NotifyPeerEndpointClosed(id); 555 556 return true; 557 } 558 559 void MultiplexRouter::OnPipeConnectionError() { 560 DCHECK(thread_checker_.CalledOnValidThread()); 561 562 scoped_refptr<MultiplexRouter> protector(this); 563 base::AutoLock locker(lock_); 564 565 encountered_error_ = true; 566 567 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { 568 InterfaceEndpoint* endpoint = iter->second.get(); 569 // Increment the iterator before calling UpdateEndpointStateMayRemove() 570 // because it may remove the corresponding value from the map. 571 ++iter; 572 573 if (endpoint->client()) 574 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 575 576 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 577 } 578 579 ProcessTasks(connector_.during_sync_handle_watcher_callback() 580 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES 581 : ALLOW_DIRECT_CLIENT_CALLS, 582 connector_.task_runner()); 583 } 584 585 void MultiplexRouter::ProcessTasks( 586 ClientCallBehavior client_call_behavior, 587 base::SingleThreadTaskRunner* current_task_runner) { 588 lock_.AssertAcquired(); 589 590 if (posted_to_process_tasks_) 591 return; 592 593 while (!tasks_.empty()) { 594 std::unique_ptr<Task> task(std::move(tasks_.front())); 595 tasks_.pop_front(); 596 597 InterfaceId id = kInvalidInterfaceId; 598 bool sync_message = task->IsMessageTask() && task->message && 599 task->message->has_flag(Message::kFlagIsSync); 600 if (sync_message) { 601 id = task->message->interface_id(); 602 auto& sync_message_queue = sync_message_tasks_[id]; 603 DCHECK_EQ(task.get(), sync_message_queue.front()); 604 sync_message_queue.pop_front(); 605 } 606 607 bool processed = 608 task->IsNotifyErrorTask() 609 ? ProcessNotifyErrorTask(task.get(), client_call_behavior, 610 current_task_runner) 611 : ProcessIncomingMessage(task->message.get(), client_call_behavior, 612 current_task_runner); 613 614 if (!processed) { 615 if (sync_message) { 616 auto& sync_message_queue = sync_message_tasks_[id]; 617 sync_message_queue.push_front(task.get()); 618 } 619 tasks_.push_front(std::move(task)); 620 break; 621 } else { 622 if (sync_message) { 623 auto iter = sync_message_tasks_.find(id); 624 if (iter != sync_message_tasks_.end() && iter->second.empty()) 625 sync_message_tasks_.erase(iter); 626 } 627 } 628 } 629 } 630 631 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { 632 lock_.AssertAcquired(); 633 634 auto iter = sync_message_tasks_.find(id); 635 if (iter == sync_message_tasks_.end()) 636 return false; 637 638 MultiplexRouter::Task* task = iter->second.front(); 639 iter->second.pop_front(); 640 641 DCHECK(task->IsMessageTask()); 642 std::unique_ptr<Message> message(std::move(task->message)); 643 644 // Note: after this call, |task| and |iter| may be invalidated. 645 bool processed = ProcessIncomingMessage( 646 message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr); 647 DCHECK(processed); 648 649 iter = sync_message_tasks_.find(id); 650 if (iter == sync_message_tasks_.end()) 651 return false; 652 653 if (iter->second.empty()) { 654 sync_message_tasks_.erase(iter); 655 return false; 656 } 657 658 return true; 659 } 660 661 bool MultiplexRouter::ProcessNotifyErrorTask( 662 Task* task, 663 ClientCallBehavior client_call_behavior, 664 base::SingleThreadTaskRunner* current_task_runner) { 665 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); 666 lock_.AssertAcquired(); 667 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); 668 if (!endpoint->client()) 669 return true; 670 671 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || 672 endpoint->task_runner() != current_task_runner) { 673 MaybePostToProcessTasks(endpoint->task_runner()); 674 return false; 675 } 676 677 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); 678 679 InterfaceEndpointClient* client = endpoint->client(); 680 { 681 // We must unlock before calling into |client| because it may call this 682 // object within NotifyError(). Holding the lock will lead to deadlock. 683 // 684 // It is safe to call into |client| without the lock. Because |client| is 685 // always accessed on the same thread, including DetachEndpointClient(). 686 base::AutoUnlock unlocker(lock_); 687 client->NotifyError(); 688 } 689 return true; 690 } 691 692 bool MultiplexRouter::ProcessIncomingMessage( 693 Message* message, 694 ClientCallBehavior client_call_behavior, 695 base::SingleThreadTaskRunner* current_task_runner) { 696 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); 697 lock_.AssertAcquired(); 698 699 if (!message) { 700 // This is a sync message and has been processed during sync handle 701 // watching. 702 return true; 703 } 704 705 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { 706 if (!control_message_handler_.Accept(message)) 707 RaiseErrorInNonTestingMode(); 708 return true; 709 } 710 711 InterfaceId id = message->interface_id(); 712 DCHECK(IsValidInterfaceId(id)); 713 714 bool inserted = false; 715 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); 716 if (inserted) { 717 // Currently, it is legitimate to receive messages for an endpoint 718 // that is not registered. For example, the endpoint is transferred in 719 // a message that is discarded. Once we add support to specify all 720 // enclosing endpoints in message header, we should be able to remove 721 // this. 722 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 723 724 // It is also possible that this newly-inserted endpoint is the master 725 // endpoint. When the master InterfacePtr/Binding goes away, the message 726 // pipe is closed and we explicitly trigger a pipe connection error. The 727 // error updates all the endpoints, including the master endpoint, with 728 // PEER_ENDPOINT_CLOSED and removes the master endpoint from the 729 // registration. We continue to process remaining tasks in the queue, as 730 // long as there are refs keeping the router alive. If there are remaining 731 // messages for the master endpoint, we will get here. 732 if (!IsMasterInterfaceId(id)) 733 control_message_proxy_.NotifyPeerEndpointClosed(id); 734 return true; 735 } 736 737 if (endpoint->closed()) 738 return true; 739 740 if (!endpoint->client()) { 741 // We need to wait until a client is attached in order to dispatch further 742 // messages. 743 return false; 744 } 745 746 bool can_direct_call; 747 if (message->has_flag(Message::kFlagIsSync)) { 748 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS && 749 endpoint->task_runner()->BelongsToCurrentThread(); 750 } else { 751 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS && 752 endpoint->task_runner() == current_task_runner; 753 } 754 755 if (!can_direct_call) { 756 MaybePostToProcessTasks(endpoint->task_runner()); 757 return false; 758 } 759 760 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); 761 762 InterfaceEndpointClient* client = endpoint->client(); 763 bool result = false; 764 { 765 // We must unlock before calling into |client| because it may call this 766 // object within HandleIncomingMessage(). Holding the lock will lead to 767 // deadlock. 768 // 769 // It is safe to call into |client| without the lock. Because |client| is 770 // always accessed on the same thread, including DetachEndpointClient(). 771 base::AutoUnlock unlocker(lock_); 772 result = client->HandleIncomingMessage(message); 773 } 774 if (!result) 775 RaiseErrorInNonTestingMode(); 776 777 return true; 778 } 779 780 void MultiplexRouter::MaybePostToProcessTasks( 781 base::SingleThreadTaskRunner* task_runner) { 782 lock_.AssertAcquired(); 783 if (posted_to_process_tasks_) 784 return; 785 786 posted_to_process_tasks_ = true; 787 posted_to_task_runner_ = task_runner; 788 task_runner->PostTask( 789 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); 790 } 791 792 void MultiplexRouter::LockAndCallProcessTasks() { 793 // There is no need to hold a ref to this class in this case because this is 794 // always called using base::Bind(), which holds a ref. 795 base::AutoLock locker(lock_); 796 posted_to_process_tasks_ = false; 797 scoped_refptr<base::SingleThreadTaskRunner> runner( 798 std::move(posted_to_task_runner_)); 799 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); 800 } 801 802 void MultiplexRouter::UpdateEndpointStateMayRemove( 803 InterfaceEndpoint* endpoint, 804 EndpointStateUpdateType type) { 805 switch (type) { 806 case ENDPOINT_CLOSED: 807 endpoint->set_closed(); 808 break; 809 case PEER_ENDPOINT_CLOSED: 810 endpoint->set_peer_closed(); 811 // If the interface endpoint is performing a sync watch, this makes sure 812 // it is notified and eventually exits the sync watch. 813 endpoint->SignalSyncMessageEvent(); 814 break; 815 } 816 if (endpoint->closed() && endpoint->peer_closed()) 817 endpoints_.erase(endpoint->id()); 818 } 819 820 void MultiplexRouter::RaiseErrorInNonTestingMode() { 821 lock_.AssertAcquired(); 822 if (!testing_mode_) 823 RaiseError(); 824 } 825 826 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint( 827 InterfaceId id, 828 bool* inserted) { 829 lock_.AssertAcquired(); 830 // Either |inserted| is nullptr or it points to a boolean initialized as 831 // false. 832 DCHECK(!inserted || !*inserted); 833 834 auto iter = endpoints_.find(id); 835 InterfaceEndpoint* endpoint; 836 if (iter == endpoints_.end()) { 837 endpoint = new InterfaceEndpoint(this, id); 838 endpoints_[id] = endpoint; 839 if (inserted) 840 *inserted = true; 841 } else { 842 endpoint = iter->second.get(); 843 } 844 845 return endpoint; 846 } 847 848 } // namespace internal 849 } // namespace mojo 850