Home | History | Annotate | Download | only in ipc
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "ipc/ipc_sync_channel.h"
      6 
      7 #include <stddef.h>
      8 #include <stdint.h>
      9 
     10 #include <utility>
     11 
     12 #include "base/bind.h"
     13 #include "base/lazy_instance.h"
     14 #include "base/location.h"
     15 #include "base/logging.h"
     16 #include "base/macros.h"
     17 #include "base/memory/ptr_util.h"
     18 #include "base/run_loop.h"
     19 #include "base/sequenced_task_runner.h"
     20 #include "base/synchronization/waitable_event.h"
     21 #include "base/threading/thread_local.h"
     22 #include "base/threading/thread_task_runner_handle.h"
     23 #include "base/trace_event/trace_event.h"
     24 #include "ipc/ipc_channel_factory.h"
     25 #include "ipc/ipc_logging.h"
     26 #include "ipc/ipc_message_macros.h"
     27 #include "ipc/ipc_sync_message.h"
     28 #include "mojo/public/cpp/bindings/sync_event_watcher.h"
     29 
     30 using base::WaitableEvent;
     31 
     32 namespace IPC {
     33 
     34 namespace {
     35 
     36 // A generic callback used when watching handles synchronously. Sets |*signal|
     37 // to true.
     38 void OnEventReady(bool* signal) {
     39   *signal = true;
     40 }
     41 
     42 base::LazyInstance<std::unique_ptr<base::WaitableEvent>>::Leaky
     43     g_pump_messages_event = LAZY_INSTANCE_INITIALIZER;
     44 
     45 }  // namespace
     46 
     47 // When we're blocked in a Send(), we need to process incoming synchronous
     48 // messages right away because it could be blocking our reply (either
     49 // directly from the same object we're calling, or indirectly through one or
     50 // more other channels).  That means that in SyncContext's OnMessageReceived,
     51 // we need to process sync message right away if we're blocked.  However a
     52 // simple check isn't sufficient, because the listener thread can be in the
     53 // process of calling Send.
     54 // To work around this, when SyncChannel filters a sync message, it sets
     55 // an event that the listener thread waits on during its Send() call.  This
     56 // allows us to dispatch incoming sync messages when blocked.  The race
     57 // condition is handled because if Send is in the process of being called, it
     58 // will check the event.  In case the listener thread isn't sending a message,
     59 // we queue a task on the listener thread to dispatch the received messages.
     60 // The messages are stored in this queue object that's shared among all
     61 // SyncChannel objects on the same thread (since one object can receive a
     62 // sync message while another one is blocked).
     63 
     64 class SyncChannel::ReceivedSyncMsgQueue :
     65     public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
     66  public:
     67   // SyncChannel::WaitForReplyWithNestedMessageLoop may be re-entered, i.e. we
     68   // may nest waiting message loops arbitrarily deep on the SyncChannel's
     69   // thread. Every such operation has a corresponding WaitableEvent to be
     70   // watched which, when signalled for IPC completion, breaks out of the loop.
     71   // A reference to the innermost (i.e. topmost) watcher is held in
     72   // |ReceivedSyncMsgQueue::top_send_done_event_watcher_|.
     73   //
     74   // NestedSendDoneWatcher provides a simple scoper which is used by
     75   // WaitForReplyWithNestedMessageLoop to begin watching a new local "send done"
     76   // event, preserving the previous topmost state on the local stack until the
     77   // new inner loop is broken. If yet another subsequent nested loop is started
     78   // therein the process is repeated again in the new inner stack frame, and so
     79   // on.
     80   //
     81   // When this object is destroyed on stack unwind, the previous topmost state
     82   // is swapped back into |ReceivedSyncMsgQueue::top_send_done_event_watcher_|,
     83   // and its watch is resumed immediately.
     84   class NestedSendDoneWatcher {
     85    public:
     86     NestedSendDoneWatcher(SyncChannel::SyncContext* context,
     87                           base::RunLoop* run_loop,
     88                           scoped_refptr<base::SequencedTaskRunner> task_runner)
     89         : sync_msg_queue_(context->received_sync_msgs()),
     90           outer_state_(sync_msg_queue_->top_send_done_event_watcher_),
     91           event_(context->GetSendDoneEvent()),
     92           callback_(
     93               base::BindOnce(&SyncChannel::SyncContext::OnSendDoneEventSignaled,
     94                              context,
     95                              run_loop)),
     96           task_runner_(std::move(task_runner)) {
     97       sync_msg_queue_->top_send_done_event_watcher_ = this;
     98       if (outer_state_)
     99         outer_state_->StopWatching();
    100       StartWatching();
    101     }
    102 
    103     ~NestedSendDoneWatcher() {
    104       sync_msg_queue_->top_send_done_event_watcher_ = outer_state_;
    105       if (outer_state_)
    106         outer_state_->StartWatching();
    107     }
    108 
    109    private:
    110     void Run(WaitableEvent* event) {
    111       DCHECK(callback_);
    112       std::move(callback_).Run(event);
    113     }
    114 
    115     void StartWatching() {
    116       watcher_.StartWatching(
    117           event_,
    118           base::BindOnce(&NestedSendDoneWatcher::Run, base::Unretained(this)),
    119           task_runner_);
    120     }
    121 
    122     void StopWatching() { watcher_.StopWatching(); }
    123 
    124     ReceivedSyncMsgQueue* const sync_msg_queue_;
    125     NestedSendDoneWatcher* const outer_state_;
    126 
    127     base::WaitableEvent* const event_;
    128     base::WaitableEventWatcher::EventCallback callback_;
    129     base::WaitableEventWatcher watcher_;
    130     scoped_refptr<base::SequencedTaskRunner> task_runner_;
    131 
    132     DISALLOW_COPY_AND_ASSIGN(NestedSendDoneWatcher);
    133   };
    134 
    135   // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
    136   // if necessary.  Call RemoveContext on the same thread when done.
    137   static ReceivedSyncMsgQueue* AddContext() {
    138     // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
    139     // SyncChannel objects can block the same thread).
    140     ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
    141     if (!rv) {
    142       rv = new ReceivedSyncMsgQueue();
    143       ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv);
    144     }
    145     rv->listener_count_++;
    146     return rv;
    147   }
    148 
    149   // Prevents messages from being dispatched immediately when the dispatch event
    150   // is signaled. Instead, |*dispatch_flag| will be set.
    151   void BlockDispatch(bool* dispatch_flag) { dispatch_flag_ = dispatch_flag; }
    152 
    153   // Allows messages to be dispatched immediately when the dispatch event is
    154   // signaled.
    155   void UnblockDispatch() { dispatch_flag_ = nullptr; }
    156 
    157   // Called on IPC thread when a synchronous message or reply arrives.
    158   void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) {
    159     bool was_task_pending;
    160     {
    161       base::AutoLock auto_lock(message_lock_);
    162 
    163       was_task_pending = task_pending_;
    164       task_pending_ = true;
    165 
    166       // We set the event in case the listener thread is blocked (or is about
    167       // to). In case it's not, the PostTask dispatches the messages.
    168       message_queue_.push_back(QueuedMessage(new Message(msg), context));
    169       message_queue_version_++;
    170     }
    171 
    172     dispatch_event_.Signal();
    173     if (!was_task_pending) {
    174       listener_task_runner_->PostTask(
    175           FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask,
    176                                 this, base::RetainedRef(context)));
    177     }
    178   }
    179 
    180   void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
    181     received_replies_.push_back(QueuedMessage(new Message(msg), context));
    182   }
    183 
    184   // Called on the listener's thread to process any queues synchronous
    185   // messages.
    186   void DispatchMessagesTask(SyncContext* context) {
    187     {
    188       base::AutoLock auto_lock(message_lock_);
    189       task_pending_ = false;
    190     }
    191     context->DispatchMessages();
    192   }
    193 
    194   // Dispatches any queued incoming sync messages. If |dispatching_context| is
    195   // not null, messages which target a restricted dispatch channel will only be
    196   // dispatched if |dispatching_context| belongs to the same restricted dispatch
    197   // group as that channel. If |dispatching_context| is null, all queued
    198   // messages are dispatched.
    199   void DispatchMessages(SyncContext* dispatching_context) {
    200     bool first_time = true;
    201     uint32_t expected_version = 0;
    202     SyncMessageQueue::iterator it;
    203     while (true) {
    204       Message* message = nullptr;
    205       scoped_refptr<SyncChannel::SyncContext> context;
    206       {
    207         base::AutoLock auto_lock(message_lock_);
    208         if (first_time || message_queue_version_ != expected_version) {
    209           it = message_queue_.begin();
    210           first_time = false;
    211         }
    212         for (; it != message_queue_.end(); it++) {
    213           int message_group = it->context->restrict_dispatch_group();
    214           if (message_group == kRestrictDispatchGroup_None ||
    215               (dispatching_context &&
    216                message_group ==
    217                    dispatching_context->restrict_dispatch_group())) {
    218             message = it->message;
    219             context = it->context;
    220             it = message_queue_.erase(it);
    221             message_queue_version_++;
    222             expected_version = message_queue_version_;
    223             break;
    224           }
    225         }
    226       }
    227 
    228       if (message == nullptr)
    229         break;
    230       context->OnDispatchMessage(*message);
    231       delete message;
    232     }
    233   }
    234 
    235   // SyncChannel calls this in its destructor.
    236   void RemoveContext(SyncContext* context) {
    237     base::AutoLock auto_lock(message_lock_);
    238 
    239     SyncMessageQueue::iterator iter = message_queue_.begin();
    240     while (iter != message_queue_.end()) {
    241       if (iter->context.get() == context) {
    242         delete iter->message;
    243         iter = message_queue_.erase(iter);
    244         message_queue_version_++;
    245       } else {
    246         iter++;
    247       }
    248     }
    249 
    250     if (--listener_count_ == 0) {
    251       DCHECK(lazy_tls_ptr_.Pointer()->Get());
    252       lazy_tls_ptr_.Pointer()->Set(nullptr);
    253       sync_dispatch_watcher_.reset();
    254     }
    255   }
    256 
    257   base::WaitableEvent* dispatch_event() { return &dispatch_event_; }
    258   base::SingleThreadTaskRunner* listener_task_runner() {
    259     return listener_task_runner_.get();
    260   }
    261 
    262   // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
    263   static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue>>::
    264       DestructorAtExit lazy_tls_ptr_;
    265 
    266   // Called on the ipc thread to check if we can unblock any current Send()
    267   // calls based on a queued reply.
    268   void DispatchReplies() {
    269     for (size_t i = 0; i < received_replies_.size(); ++i) {
    270       Message* message = received_replies_[i].message;
    271       if (received_replies_[i].context->TryToUnblockListener(message)) {
    272         delete message;
    273         received_replies_.erase(received_replies_.begin() + i);
    274         return;
    275       }
    276     }
    277   }
    278 
    279  private:
    280   friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
    281 
    282   // See the comment in SyncChannel::SyncChannel for why this event is created
    283   // as manual reset.
    284   ReceivedSyncMsgQueue()
    285       : message_queue_version_(0),
    286         dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL,
    287                         base::WaitableEvent::InitialState::NOT_SIGNALED),
    288         listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
    289         sync_dispatch_watcher_(std::make_unique<mojo::SyncEventWatcher>(
    290             &dispatch_event_,
    291             base::Bind(&ReceivedSyncMsgQueue::OnDispatchEventReady,
    292                        base::Unretained(this)))) {
    293     sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread();
    294   }
    295 
    296   ~ReceivedSyncMsgQueue() = default;
    297 
    298   void OnDispatchEventReady() {
    299     if (dispatch_flag_) {
    300       *dispatch_flag_ = true;
    301       return;
    302     }
    303 
    304     // We were woken up during a sync wait, but no specific SyncChannel is
    305     // currently waiting. i.e., some other Mojo interface on this thread is
    306     // waiting for a response. Since we don't support anything analogous to
    307     // restricted dispatch on Mojo interfaces, in this case it's safe to
    308     // dispatch sync messages for any context.
    309     DispatchMessages(nullptr);
    310   }
    311 
    312   // Holds information about a queued synchronous message or reply.
    313   struct QueuedMessage {
    314     QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
    315     Message* message;
    316     scoped_refptr<SyncChannel::SyncContext> context;
    317   };
    318 
    319   typedef std::list<QueuedMessage> SyncMessageQueue;
    320   SyncMessageQueue message_queue_;
    321 
    322   // Used to signal DispatchMessages to rescan
    323   uint32_t message_queue_version_ = 0;
    324 
    325   std::vector<QueuedMessage> received_replies_;
    326 
    327   // Signaled when we get a synchronous message that we must respond to, as the
    328   // sender needs its reply before it can reply to our original synchronous
    329   // message.
    330   base::WaitableEvent dispatch_event_;
    331   scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
    332   base::Lock message_lock_;
    333   bool task_pending_ = false;
    334   int listener_count_ = 0;
    335 
    336   // The current NestedSendDoneWatcher for this thread, if we're currently
    337   // in a SyncChannel::WaitForReplyWithNestedMessageLoop. See
    338   // NestedSendDoneWatcher comments for more details.
    339   NestedSendDoneWatcher* top_send_done_event_watcher_ = nullptr;
    340 
    341   // If not null, the address of a flag to set when the dispatch event signals,
    342   // in lieu of actually dispatching messages. This is used by
    343   // SyncChannel::WaitForReply to restrict the scope of queued messages we're
    344   // allowed to process while it's waiting.
    345   bool* dispatch_flag_ = nullptr;
    346 
    347   // Watches |dispatch_event_| during all sync handle watches on this thread.
    348   std::unique_ptr<mojo::SyncEventWatcher> sync_dispatch_watcher_;
    349 };
    350 
    351 base::LazyInstance<base::ThreadLocalPointer<
    352     SyncChannel::ReceivedSyncMsgQueue>>::DestructorAtExit
    353     SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ =
    354         LAZY_INSTANCE_INITIALIZER;
    355 
    356 SyncChannel::SyncContext::SyncContext(
    357     Listener* listener,
    358     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
    359     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
    360     WaitableEvent* shutdown_event)
    361     : ChannelProxy::Context(listener, ipc_task_runner, listener_task_runner),
    362       received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
    363       shutdown_event_(shutdown_event),
    364       restrict_dispatch_group_(kRestrictDispatchGroup_None) {}
    365 
    366 void SyncChannel::SyncContext::OnSendDoneEventSignaled(
    367     base::RunLoop* nested_loop,
    368     base::WaitableEvent* event) {
    369   DCHECK_EQ(GetSendDoneEvent(), event);
    370   nested_loop->Quit();
    371 }
    372 
    373 SyncChannel::SyncContext::~SyncContext() {
    374   while (!deserializers_.empty())
    375     Pop();
    376 }
    377 
    378 // Adds information about an outgoing sync message to the context so that
    379 // we know how to deserialize the reply. Returns |true| if the message was added
    380 // to the context or |false| if it was rejected (e.g. due to shutdown.)
    381 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
    382   // Create the tracking information for this message. This object is stored
    383   // by value since all members are pointers that are cheap to copy. These
    384   // pointers are cleaned up in the Pop() function.
    385   //
    386   // The event is created as manual reset because in between Signal and
    387   // OnObjectSignalled, another Send can happen which would stop the watcher
    388   // from being called.  The event would get watched later, when the nested
    389   // Send completes, so the event will need to remain set.
    390   base::AutoLock auto_lock(deserializers_lock_);
    391   if (reject_new_deserializers_)
    392     return false;
    393   PendingSyncMsg pending(
    394       SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(),
    395       new base::WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL,
    396                               base::WaitableEvent::InitialState::NOT_SIGNALED));
    397   deserializers_.push_back(pending);
    398   return true;
    399 }
    400 
    401 bool SyncChannel::SyncContext::Pop() {
    402   bool result;
    403   {
    404     base::AutoLock auto_lock(deserializers_lock_);
    405     PendingSyncMsg msg = deserializers_.back();
    406     delete msg.deserializer;
    407     delete msg.done_event;
    408     msg.done_event = nullptr;
    409     deserializers_.pop_back();
    410     result = msg.send_result;
    411   }
    412 
    413   // We got a reply to a synchronous Send() call that's blocking the listener
    414   // thread.  However, further down the call stack there could be another
    415   // blocking Send() call, whose reply we received after we made this last
    416   // Send() call.  So check if we have any queued replies available that
    417   // can now unblock the listener thread.
    418   ipc_task_runner()->PostTask(
    419       FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies,
    420                             received_sync_msgs_));
    421 
    422   return result;
    423 }
    424 
    425 base::WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
    426   base::AutoLock auto_lock(deserializers_lock_);
    427   return deserializers_.back().done_event;
    428 }
    429 
    430 base::WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
    431   return received_sync_msgs_->dispatch_event();
    432 }
    433 
    434 void SyncChannel::SyncContext::DispatchMessages() {
    435   received_sync_msgs_->DispatchMessages(this);
    436 }
    437 
    438 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
    439   base::AutoLock auto_lock(deserializers_lock_);
    440   if (deserializers_.empty() ||
    441       !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
    442     return false;
    443   }
    444 
    445   if (!msg->is_reply_error()) {
    446     bool send_result = deserializers_.back().deserializer->
    447         SerializeOutputParameters(*msg);
    448     deserializers_.back().send_result = send_result;
    449     DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message";
    450   } else {
    451     DVLOG(1) << "Received error reply";
    452   }
    453 
    454   base::WaitableEvent* done_event = deserializers_.back().done_event;
    455   TRACE_EVENT_FLOW_BEGIN0(
    456       TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
    457       "SyncChannel::SyncContext::TryToUnblockListener", done_event);
    458 
    459   done_event->Signal();
    460 
    461   return true;
    462 }
    463 
    464 void SyncChannel::SyncContext::Clear() {
    465   CancelPendingSends();
    466   received_sync_msgs_->RemoveContext(this);
    467   Context::Clear();
    468 }
    469 
    470 bool SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
    471   // Give the filters a chance at processing this message.
    472   if (TryFilters(msg))
    473     return true;
    474 
    475   if (TryToUnblockListener(&msg))
    476     return true;
    477 
    478   if (msg.is_reply()) {
    479     received_sync_msgs_->QueueReply(msg, this);
    480     return true;
    481   }
    482 
    483   if (msg.should_unblock()) {
    484     received_sync_msgs_->QueueMessage(msg, this);
    485     return true;
    486   }
    487 
    488   return Context::OnMessageReceivedNoFilter(msg);
    489 }
    490 
    491 void SyncChannel::SyncContext::OnChannelError() {
    492   CancelPendingSends();
    493   shutdown_watcher_.StopWatching();
    494   Context::OnChannelError();
    495 }
    496 
    497 void SyncChannel::SyncContext::OnChannelOpened() {
    498   shutdown_watcher_.StartWatching(
    499       shutdown_event_,
    500       base::Bind(&SyncChannel::SyncContext::OnShutdownEventSignaled,
    501                  base::Unretained(this)),
    502       base::SequencedTaskRunnerHandle::Get());
    503   Context::OnChannelOpened();
    504 }
    505 
    506 void SyncChannel::SyncContext::OnChannelClosed() {
    507   CancelPendingSends();
    508   shutdown_watcher_.StopWatching();
    509   Context::OnChannelClosed();
    510 }
    511 
    512 void SyncChannel::SyncContext::CancelPendingSends() {
    513   base::AutoLock auto_lock(deserializers_lock_);
    514   reject_new_deserializers_ = true;
    515   PendingSyncMessageQueue::iterator iter;
    516   DVLOG(1) << "Canceling pending sends";
    517   for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
    518     TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
    519                             "SyncChannel::SyncContext::CancelPendingSends",
    520                             iter->done_event);
    521     iter->done_event->Signal();
    522   }
    523 }
    524 
    525 void SyncChannel::SyncContext::OnShutdownEventSignaled(WaitableEvent* event) {
    526   DCHECK_EQ(event, shutdown_event_);
    527 
    528   // Process shut down before we can get a reply to a synchronous message.
    529   // Cancel pending Send calls, which will end up setting the send done event.
    530   CancelPendingSends();
    531 }
    532 
    533 // static
    534 std::unique_ptr<SyncChannel> SyncChannel::Create(
    535     const IPC::ChannelHandle& channel_handle,
    536     Channel::Mode mode,
    537     Listener* listener,
    538     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
    539     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
    540     bool create_pipe_now,
    541     base::WaitableEvent* shutdown_event) {
    542   std::unique_ptr<SyncChannel> channel =
    543       Create(listener, ipc_task_runner, listener_task_runner, shutdown_event);
    544   channel->Init(channel_handle, mode, create_pipe_now);
    545   return channel;
    546 }
    547 
    548 // static
    549 std::unique_ptr<SyncChannel> SyncChannel::Create(
    550     std::unique_ptr<ChannelFactory> factory,
    551     Listener* listener,
    552     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
    553     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
    554     bool create_pipe_now,
    555     base::WaitableEvent* shutdown_event) {
    556   std::unique_ptr<SyncChannel> channel =
    557       Create(listener, ipc_task_runner, listener_task_runner, shutdown_event);
    558   channel->Init(std::move(factory), create_pipe_now);
    559   return channel;
    560 }
    561 
    562 // static
    563 std::unique_ptr<SyncChannel> SyncChannel::Create(
    564     Listener* listener,
    565     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
    566     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
    567     WaitableEvent* shutdown_event) {
    568   return base::WrapUnique(new SyncChannel(
    569       listener, ipc_task_runner, listener_task_runner, shutdown_event));
    570 }
    571 
    572 SyncChannel::SyncChannel(
    573     Listener* listener,
    574     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
    575     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
    576     WaitableEvent* shutdown_event)
    577     : ChannelProxy(new SyncContext(listener,
    578                                    ipc_task_runner,
    579                                    listener_task_runner,
    580                                    shutdown_event)),
    581       sync_handle_registry_(mojo::SyncHandleRegistry::current()) {
    582   // The current (listener) thread must be distinct from the IPC thread, or else
    583   // sending synchronous messages will deadlock.
    584   DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get());
    585   StartWatching();
    586 }
    587 
    588 SyncChannel::~SyncChannel() = default;
    589 
    590 void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
    591   sync_context()->set_restrict_dispatch_group(group);
    592 }
    593 
    594 scoped_refptr<SyncMessageFilter> SyncChannel::CreateSyncMessageFilter() {
    595   scoped_refptr<SyncMessageFilter> filter = new SyncMessageFilter(
    596       sync_context()->shutdown_event());
    597   AddFilter(filter.get());
    598   if (!did_init())
    599     pre_init_sync_message_filters_.push_back(filter);
    600   return filter;
    601 }
    602 
    603 bool SyncChannel::Send(Message* message) {
    604 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
    605   std::string name;
    606   Logging::GetInstance()->GetMessageText(
    607       message->type(), &name, message, nullptr);
    608   TRACE_EVENT1("ipc", "SyncChannel::Send", "name", name);
    609 #else
    610   TRACE_EVENT2("ipc", "SyncChannel::Send",
    611                "class", IPC_MESSAGE_ID_CLASS(message->type()),
    612                "line", IPC_MESSAGE_ID_LINE(message->type()));
    613 #endif
    614   if (!message->is_sync()) {
    615     ChannelProxy::SendInternal(message);
    616     return true;
    617   }
    618 
    619   SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
    620   bool pump_messages = sync_msg->ShouldPumpMessages();
    621 
    622   // *this* might get deleted in WaitForReply.
    623   scoped_refptr<SyncContext> context(sync_context());
    624   if (!context->Push(sync_msg)) {
    625     DVLOG(1) << "Channel is shutting down. Dropping sync message.";
    626     delete message;
    627     return false;
    628   }
    629 
    630   ChannelProxy::SendInternal(message);
    631 
    632   // Wait for reply, or for any other incoming synchronous messages.
    633   // |this| might get deleted, so only call static functions at this point.
    634   scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_;
    635   WaitForReply(registry.get(), context.get(), pump_messages);
    636 
    637   TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
    638                         "SyncChannel::Send", context->GetSendDoneEvent());
    639 
    640   return context->Pop();
    641 }
    642 
    643 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
    644                                SyncContext* context,
    645                                bool pump_messages) {
    646   context->DispatchMessages();
    647 
    648   base::WaitableEvent* pump_messages_event = nullptr;
    649   if (pump_messages) {
    650     if (!g_pump_messages_event.Get()) {
    651       g_pump_messages_event.Get() = std::make_unique<base::WaitableEvent>(
    652           base::WaitableEvent::ResetPolicy::MANUAL,
    653           base::WaitableEvent::InitialState::SIGNALED);
    654     }
    655     pump_messages_event = g_pump_messages_event.Get().get();
    656   }
    657 
    658   while (true) {
    659     bool dispatch = false;
    660     bool send_done = false;
    661     bool should_pump_messages = false;
    662     base::Closure on_send_done_callback = base::Bind(&OnEventReady, &send_done);
    663     registry->RegisterEvent(context->GetSendDoneEvent(), on_send_done_callback);
    664 
    665     base::Closure on_pump_messages_callback;
    666     if (pump_messages_event) {
    667       on_pump_messages_callback =
    668           base::Bind(&OnEventReady, &should_pump_messages);
    669       registry->RegisterEvent(pump_messages_event, on_pump_messages_callback);
    670     }
    671 
    672     const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages };
    673     context->received_sync_msgs()->BlockDispatch(&dispatch);
    674     registry->Wait(stop_flags, 3);
    675     context->received_sync_msgs()->UnblockDispatch();
    676 
    677     registry->UnregisterEvent(context->GetSendDoneEvent(),
    678                               on_send_done_callback);
    679     if (pump_messages_event)
    680       registry->UnregisterEvent(pump_messages_event, on_pump_messages_callback);
    681 
    682     if (dispatch) {
    683       // We're waiting for a reply, but we received a blocking synchronous call.
    684       // We must process it to avoid potential deadlocks.
    685       context->GetDispatchEvent()->Reset();
    686       context->DispatchMessages();
    687       continue;
    688     }
    689 
    690     if (should_pump_messages)
    691       WaitForReplyWithNestedMessageLoop(context);  // Run a nested run loop.
    692 
    693     break;
    694   }
    695 }
    696 
    697 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
    698   base::RunLoop nested_loop(base::RunLoop::Type::kNestableTasksAllowed);
    699   ReceivedSyncMsgQueue::NestedSendDoneWatcher watcher(
    700       context, &nested_loop, context->listener_task_runner());
    701   nested_loop.Run();
    702 }
    703 
    704 void SyncChannel::OnDispatchEventSignaled(base::WaitableEvent* event) {
    705   DCHECK_EQ(sync_context()->GetDispatchEvent(), event);
    706   sync_context()->GetDispatchEvent()->Reset();
    707 
    708   StartWatching();
    709 
    710   // NOTE: May delete |this|.
    711   sync_context()->DispatchMessages();
    712 }
    713 
    714 void SyncChannel::StartWatching() {
    715   // |dispatch_watcher_| watches the event asynchronously, only dispatching
    716   // messages once the listener thread is unblocked and pumping its task queue.
    717   // The ReceivedSyncMsgQueue also watches this event and may dispatch
    718   // immediately if woken up by a message which it's allowed to dispatch.
    719   dispatch_watcher_.StartWatching(
    720       sync_context()->GetDispatchEvent(),
    721       base::BindOnce(&SyncChannel::OnDispatchEventSignaled,
    722                      base::Unretained(this)),
    723       sync_context()->listener_task_runner());
    724 }
    725 
    726 void SyncChannel::OnChannelInit() {
    727   pre_init_sync_message_filters_.clear();
    728 }
    729 
    730 }  // namespace IPC
    731