Home | History | Annotate | Download | only in ipc
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "ipc/ipc_sync_channel.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/debug/trace_event.h"
      9 #include "base/lazy_instance.h"
     10 #include "base/location.h"
     11 #include "base/logging.h"
     12 #include "base/synchronization/waitable_event.h"
     13 #include "base/synchronization/waitable_event_watcher.h"
     14 #include "base/thread_task_runner_handle.h"
     15 #include "base/threading/thread_local.h"
     16 #include "ipc/ipc_logging.h"
     17 #include "ipc/ipc_message_macros.h"
     18 #include "ipc/ipc_sync_message.h"
     19 
     20 using base::TimeDelta;
     21 using base::TimeTicks;
     22 using base::WaitableEvent;
     23 
     24 namespace IPC {
     25 // When we're blocked in a Send(), we need to process incoming synchronous
     26 // messages right away because it could be blocking our reply (either
     27 // directly from the same object we're calling, or indirectly through one or
     28 // more other channels).  That means that in SyncContext's OnMessageReceived,
     29 // we need to process sync message right away if we're blocked.  However a
     30 // simple check isn't sufficient, because the listener thread can be in the
     31 // process of calling Send.
     32 // To work around this, when SyncChannel filters a sync message, it sets
     33 // an event that the listener thread waits on during its Send() call.  This
     34 // allows us to dispatch incoming sync messages when blocked.  The race
     35 // condition is handled because if Send is in the process of being called, it
     36 // will check the event.  In case the listener thread isn't sending a message,
     37 // we queue a task on the listener thread to dispatch the received messages.
     38 // The messages are stored in this queue object that's shared among all
     39 // SyncChannel objects on the same thread (since one object can receive a
     40 // sync message while another one is blocked).
     41 
     42 class SyncChannel::ReceivedSyncMsgQueue :
     43     public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
     44  public:
     45   // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
     46   // if necessary.  Call RemoveContext on the same thread when done.
     47   static ReceivedSyncMsgQueue* AddContext() {
     48     // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
     49     // SyncChannel objects can block the same thread).
     50     ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
     51     if (!rv) {
     52       rv = new ReceivedSyncMsgQueue();
     53       ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv);
     54     }
     55     rv->listener_count_++;
     56     return rv;
     57   }
     58 
     59   // Called on IPC thread when a synchronous message or reply arrives.
     60   void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) {
     61     bool was_task_pending;
     62     {
     63       base::AutoLock auto_lock(message_lock_);
     64 
     65       was_task_pending = task_pending_;
     66       task_pending_ = true;
     67 
     68       // We set the event in case the listener thread is blocked (or is about
     69       // to). In case it's not, the PostTask dispatches the messages.
     70       message_queue_.push_back(QueuedMessage(new Message(msg), context));
     71       message_queue_version_++;
     72     }
     73 
     74     dispatch_event_.Signal();
     75     if (!was_task_pending) {
     76       listener_task_runner_->PostTask(
     77           FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask,
     78                                 this, scoped_refptr<SyncContext>(context)));
     79     }
     80   }
     81 
     82   void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
     83     received_replies_.push_back(QueuedMessage(new Message(msg), context));
     84   }
     85 
     86   // Called on the listener's thread to process any queues synchronous
     87   // messages.
     88   void DispatchMessagesTask(SyncContext* context) {
     89     {
     90       base::AutoLock auto_lock(message_lock_);
     91       task_pending_ = false;
     92     }
     93     context->DispatchMessages();
     94   }
     95 
     96   void DispatchMessages(SyncContext* dispatching_context) {
     97     bool first_time = true;
     98     uint32 expected_version = 0;
     99     SyncMessageQueue::iterator it;
    100     while (true) {
    101       Message* message = NULL;
    102       scoped_refptr<SyncChannel::SyncContext> context;
    103       {
    104         base::AutoLock auto_lock(message_lock_);
    105         if (first_time || message_queue_version_ != expected_version) {
    106           it = message_queue_.begin();
    107           first_time = false;
    108         }
    109         for (; it != message_queue_.end(); it++) {
    110           int message_group = it->context->restrict_dispatch_group();
    111           if (message_group == kRestrictDispatchGroup_None ||
    112               message_group == dispatching_context->restrict_dispatch_group()) {
    113             message = it->message;
    114             context = it->context;
    115             it = message_queue_.erase(it);
    116             message_queue_version_++;
    117             expected_version = message_queue_version_;
    118             break;
    119           }
    120         }
    121       }
    122 
    123       if (message == NULL)
    124         break;
    125       context->OnDispatchMessage(*message);
    126       delete message;
    127     }
    128   }
    129 
    130   // SyncChannel calls this in its destructor.
    131   void RemoveContext(SyncContext* context) {
    132     base::AutoLock auto_lock(message_lock_);
    133 
    134     SyncMessageQueue::iterator iter = message_queue_.begin();
    135     while (iter != message_queue_.end()) {
    136       if (iter->context.get() == context) {
    137         delete iter->message;
    138         iter = message_queue_.erase(iter);
    139         message_queue_version_++;
    140       } else {
    141         iter++;
    142       }
    143     }
    144 
    145     if (--listener_count_ == 0) {
    146       DCHECK(lazy_tls_ptr_.Pointer()->Get());
    147       lazy_tls_ptr_.Pointer()->Set(NULL);
    148     }
    149   }
    150 
    151   WaitableEvent* dispatch_event() { return &dispatch_event_; }
    152   base::SingleThreadTaskRunner* listener_task_runner() {
    153     return listener_task_runner_.get();
    154   }
    155 
    156   // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
    157   static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> >
    158       lazy_tls_ptr_;
    159 
    160   // Called on the ipc thread to check if we can unblock any current Send()
    161   // calls based on a queued reply.
    162   void DispatchReplies() {
    163     for (size_t i = 0; i < received_replies_.size(); ++i) {
    164       Message* message = received_replies_[i].message;
    165       if (received_replies_[i].context->TryToUnblockListener(message)) {
    166         delete message;
    167         received_replies_.erase(received_replies_.begin() + i);
    168         return;
    169       }
    170     }
    171   }
    172 
    173   base::WaitableEventWatcher* top_send_done_watcher() {
    174     return top_send_done_watcher_;
    175   }
    176 
    177   void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) {
    178     top_send_done_watcher_ = watcher;
    179   }
    180 
    181  private:
    182   friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
    183 
    184   // See the comment in SyncChannel::SyncChannel for why this event is created
    185   // as manual reset.
    186   ReceivedSyncMsgQueue() :
    187       message_queue_version_(0),
    188       dispatch_event_(true, false),
    189       listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
    190       task_pending_(false),
    191       listener_count_(0),
    192       top_send_done_watcher_(NULL) {
    193   }
    194 
    195   ~ReceivedSyncMsgQueue() {}
    196 
    197   // Holds information about a queued synchronous message or reply.
    198   struct QueuedMessage {
    199     QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
    200     Message* message;
    201     scoped_refptr<SyncChannel::SyncContext> context;
    202   };
    203 
    204   typedef std::list<QueuedMessage> SyncMessageQueue;
    205   SyncMessageQueue message_queue_;
    206   uint32 message_queue_version_;  // Used to signal DispatchMessages to rescan
    207 
    208   std::vector<QueuedMessage> received_replies_;
    209 
    210   // Set when we got a synchronous message that we must respond to as the
    211   // sender needs its reply before it can reply to our original synchronous
    212   // message.
    213   WaitableEvent dispatch_event_;
    214   scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
    215   base::Lock message_lock_;
    216   bool task_pending_;
    217   int listener_count_;
    218 
    219   // The current send done event watcher for this thread. Used to maintain
    220   // a local global stack of send done watchers to ensure that nested sync
    221   // message loops complete correctly.
    222   base::WaitableEventWatcher* top_send_done_watcher_;
    223 };
    224 
    225 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
    226     SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ =
    227         LAZY_INSTANCE_INITIALIZER;
    228 
    229 SyncChannel::SyncContext::SyncContext(
    230     Listener* listener,
    231     base::SingleThreadTaskRunner* ipc_task_runner,
    232     WaitableEvent* shutdown_event)
    233     : ChannelProxy::Context(listener, ipc_task_runner),
    234       received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
    235       shutdown_event_(shutdown_event),
    236       restrict_dispatch_group_(kRestrictDispatchGroup_None) {
    237 }
    238 
    239 SyncChannel::SyncContext::~SyncContext() {
    240   while (!deserializers_.empty())
    241     Pop();
    242 }
    243 
    244 // Adds information about an outgoing sync message to the context so that
    245 // we know how to deserialize the reply.  Returns a handle that's set when
    246 // the reply has arrived.
    247 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
    248   // Create the tracking information for this message. This object is stored
    249   // by value since all members are pointers that are cheap to copy. These
    250   // pointers are cleaned up in the Pop() function.
    251   //
    252   // The event is created as manual reset because in between Signal and
    253   // OnObjectSignalled, another Send can happen which would stop the watcher
    254   // from being called.  The event would get watched later, when the nested
    255   // Send completes, so the event will need to remain set.
    256   PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg),
    257                          sync_msg->GetReplyDeserializer(),
    258                          new WaitableEvent(true, false));
    259   base::AutoLock auto_lock(deserializers_lock_);
    260   deserializers_.push_back(pending);
    261 }
    262 
    263 bool SyncChannel::SyncContext::Pop() {
    264   bool result;
    265   {
    266     base::AutoLock auto_lock(deserializers_lock_);
    267     PendingSyncMsg msg = deserializers_.back();
    268     delete msg.deserializer;
    269     delete msg.done_event;
    270     msg.done_event = NULL;
    271     deserializers_.pop_back();
    272     result = msg.send_result;
    273   }
    274 
    275   // We got a reply to a synchronous Send() call that's blocking the listener
    276   // thread.  However, further down the call stack there could be another
    277   // blocking Send() call, whose reply we received after we made this last
    278   // Send() call.  So check if we have any queued replies available that
    279   // can now unblock the listener thread.
    280   ipc_task_runner()->PostTask(
    281       FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies,
    282                             received_sync_msgs_.get()));
    283 
    284   return result;
    285 }
    286 
    287 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
    288   base::AutoLock auto_lock(deserializers_lock_);
    289   return deserializers_.back().done_event;
    290 }
    291 
    292 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
    293   return received_sync_msgs_->dispatch_event();
    294 }
    295 
    296 void SyncChannel::SyncContext::DispatchMessages() {
    297   received_sync_msgs_->DispatchMessages(this);
    298 }
    299 
    300 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
    301   base::AutoLock auto_lock(deserializers_lock_);
    302   if (deserializers_.empty() ||
    303       !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
    304     return false;
    305   }
    306 
    307   // TODO(bauerb): Remove logging once investigation of http://crbug.com/141055
    308   // has finished.
    309   if (!msg->is_reply_error()) {
    310     bool send_result = deserializers_.back().deserializer->
    311         SerializeOutputParameters(*msg);
    312     deserializers_.back().send_result = send_result;
    313     VLOG_IF(1, !send_result) << "Couldn't deserialize reply message";
    314   } else {
    315     VLOG(1) << "Received error reply";
    316   }
    317   deserializers_.back().done_event->Signal();
    318 
    319   return true;
    320 }
    321 
    322 void SyncChannel::SyncContext::Clear() {
    323   CancelPendingSends();
    324   received_sync_msgs_->RemoveContext(this);
    325   Context::Clear();
    326 }
    327 
    328 bool SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
    329   // Give the filters a chance at processing this message.
    330   if (TryFilters(msg))
    331     return true;
    332 
    333   if (TryToUnblockListener(&msg))
    334     return true;
    335 
    336   if (msg.is_reply()) {
    337     received_sync_msgs_->QueueReply(msg, this);
    338     return true;
    339   }
    340 
    341   if (msg.should_unblock()) {
    342     received_sync_msgs_->QueueMessage(msg, this);
    343     return true;
    344   }
    345 
    346   return Context::OnMessageReceivedNoFilter(msg);
    347 }
    348 
    349 void SyncChannel::SyncContext::OnChannelError() {
    350   CancelPendingSends();
    351   shutdown_watcher_.StopWatching();
    352   Context::OnChannelError();
    353 }
    354 
    355 void SyncChannel::SyncContext::OnChannelOpened() {
    356   shutdown_watcher_.StartWatching(
    357       shutdown_event_,
    358       base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled,
    359                  base::Unretained(this)));
    360   Context::OnChannelOpened();
    361 }
    362 
    363 void SyncChannel::SyncContext::OnChannelClosed() {
    364   CancelPendingSends();
    365   shutdown_watcher_.StopWatching();
    366   Context::OnChannelClosed();
    367 }
    368 
    369 void SyncChannel::SyncContext::OnSendTimeout(int message_id) {
    370   base::AutoLock auto_lock(deserializers_lock_);
    371   PendingSyncMessageQueue::iterator iter;
    372   VLOG(1) << "Send timeout";
    373   for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
    374     if (iter->id == message_id) {
    375       iter->done_event->Signal();
    376       break;
    377     }
    378   }
    379 }
    380 
    381 void SyncChannel::SyncContext::CancelPendingSends() {
    382   base::AutoLock auto_lock(deserializers_lock_);
    383   PendingSyncMessageQueue::iterator iter;
    384   // TODO(bauerb): Remove once http://crbug/141055 is fixed.
    385   VLOG(1) << "Canceling pending sends";
    386   for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++)
    387     iter->done_event->Signal();
    388 }
    389 
    390 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) {
    391   if (event == shutdown_event_) {
    392     // Process shut down before we can get a reply to a synchronous message.
    393     // Cancel pending Send calls, which will end up setting the send done event.
    394     CancelPendingSends();
    395   } else {
    396     // We got the reply, timed out or the process shutdown.
    397     DCHECK_EQ(GetSendDoneEvent(), event);
    398     base::MessageLoop::current()->QuitNow();
    399   }
    400 }
    401 
    402 base::WaitableEventWatcher::EventCallback
    403     SyncChannel::SyncContext::MakeWaitableEventCallback() {
    404   return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, this);
    405 }
    406 
    407 SyncChannel::SyncChannel(
    408     const IPC::ChannelHandle& channel_handle,
    409     Channel::Mode mode,
    410     Listener* listener,
    411     base::SingleThreadTaskRunner* ipc_task_runner,
    412     bool create_pipe_now,
    413     WaitableEvent* shutdown_event)
    414     : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
    415       sync_messages_with_no_timeout_allowed_(true) {
    416   ChannelProxy::Init(channel_handle, mode, create_pipe_now);
    417   StartWatching();
    418 }
    419 
    420 SyncChannel::SyncChannel(
    421     Listener* listener,
    422     base::SingleThreadTaskRunner* ipc_task_runner,
    423     WaitableEvent* shutdown_event)
    424     : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
    425       sync_messages_with_no_timeout_allowed_(true) {
    426   StartWatching();
    427 }
    428 
    429 SyncChannel::~SyncChannel() {
    430 }
    431 
    432 void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
    433   sync_context()->set_restrict_dispatch_group(group);
    434 }
    435 
    436 bool SyncChannel::Send(Message* message) {
    437   return SendWithTimeout(message, base::kNoTimeout);
    438 }
    439 
    440 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) {
    441 #ifdef IPC_MESSAGE_LOG_ENABLED
    442   Logging* logger = Logging::GetInstance();
    443   std::string name;
    444   logger->GetMessageText(message->type(), &name, message, NULL);
    445   TRACE_EVENT1("task", "SyncChannel::SendWithTimeout",
    446                "name", name);
    447 #else
    448   TRACE_EVENT2("task", "SyncChannel::SendWithTimeout",
    449                "class", IPC_MESSAGE_ID_CLASS(message->type()),
    450                "line", IPC_MESSAGE_ID_LINE(message->type()));
    451 #endif
    452   if (!message->is_sync()) {
    453     ChannelProxy::Send(message);
    454     return true;
    455   }
    456 
    457   // *this* might get deleted in WaitForReply.
    458   scoped_refptr<SyncContext> context(sync_context());
    459   if (context->shutdown_event()->IsSignaled()) {
    460     VLOG(1) << "shutdown event is signaled";
    461     delete message;
    462     return false;
    463   }
    464 
    465   DCHECK(sync_messages_with_no_timeout_allowed_ ||
    466          timeout_ms != base::kNoTimeout);
    467   SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
    468   context->Push(sync_msg);
    469   int message_id = SyncMessage::GetMessageId(*sync_msg);
    470   WaitableEvent* pump_messages_event = sync_msg->pump_messages_event();
    471 
    472   ChannelProxy::Send(message);
    473 
    474   if (timeout_ms != base::kNoTimeout) {
    475     // We use the sync message id so that when a message times out, we don't
    476     // confuse it with another send that is either above/below this Send in
    477     // the call stack.
    478     context->ipc_task_runner()->PostDelayedTask(
    479         FROM_HERE,
    480         base::Bind(&SyncContext::OnSendTimeout, context.get(), message_id),
    481         base::TimeDelta::FromMilliseconds(timeout_ms));
    482   }
    483 
    484   // Wait for reply, or for any other incoming synchronous messages.
    485   // *this* might get deleted, so only call static functions at this point.
    486   WaitForReply(context.get(), pump_messages_event);
    487 
    488   return context->Pop();
    489 }
    490 
    491 void SyncChannel::WaitForReply(
    492     SyncContext* context, WaitableEvent* pump_messages_event) {
    493   context->DispatchMessages();
    494   while (true) {
    495     WaitableEvent* objects[] = {
    496       context->GetDispatchEvent(),
    497       context->GetSendDoneEvent(),
    498       pump_messages_event
    499     };
    500 
    501     unsigned count = pump_messages_event ? 3: 2;
    502     size_t result = WaitableEvent::WaitMany(objects, count);
    503     if (result == 0 /* dispatch event */) {
    504       // We're waiting for a reply, but we received a blocking synchronous
    505       // call.  We must process it or otherwise a deadlock might occur.
    506       context->GetDispatchEvent()->Reset();
    507       context->DispatchMessages();
    508       continue;
    509     }
    510 
    511     if (result == 2 /* pump_messages_event */)
    512       WaitForReplyWithNestedMessageLoop(context);  // Run a nested message loop.
    513 
    514     break;
    515   }
    516 }
    517 
    518 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
    519   base::WaitableEventWatcher send_done_watcher;
    520 
    521   ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs();
    522   DCHECK(sync_msg_queue != NULL);
    523 
    524   base::WaitableEventWatcher* old_send_done_event_watcher =
    525       sync_msg_queue->top_send_done_watcher();
    526 
    527   base::WaitableEventWatcher::EventCallback old_callback;
    528   base::WaitableEvent* old_event = NULL;
    529 
    530   // Maintain a local global stack of send done delegates to ensure that
    531   // nested sync calls complete in the correct sequence, i.e. the
    532   // outermost call completes first, etc.
    533   if (old_send_done_event_watcher) {
    534     old_callback = old_send_done_event_watcher->callback();
    535     old_event = old_send_done_event_watcher->GetWatchedEvent();
    536     old_send_done_event_watcher->StopWatching();
    537   }
    538 
    539   sync_msg_queue->set_top_send_done_watcher(&send_done_watcher);
    540 
    541   send_done_watcher.StartWatching(context->GetSendDoneEvent(),
    542                                   context->MakeWaitableEventCallback());
    543 
    544   {
    545     base::MessageLoop::ScopedNestableTaskAllower allow(
    546         base::MessageLoop::current());
    547     base::MessageLoop::current()->Run();
    548   }
    549 
    550   sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher);
    551   if (old_send_done_event_watcher && old_event) {
    552     old_send_done_event_watcher->StartWatching(old_event, old_callback);
    553   }
    554 }
    555 
    556 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) {
    557   DCHECK(event == sync_context()->GetDispatchEvent());
    558   // The call to DispatchMessages might delete this object, so reregister
    559   // the object watcher first.
    560   event->Reset();
    561   dispatch_watcher_.StartWatching(event, dispatch_watcher_callback_);
    562   sync_context()->DispatchMessages();
    563 }
    564 
    565 void SyncChannel::StartWatching() {
    566   // Ideally we only want to watch this object when running a nested message
    567   // loop.  However, we don't know when it exits if there's another nested
    568   // message loop running under it or not, so we wouldn't know whether to
    569   // stop or keep watching.  So we always watch it, and create the event as
    570   // manual reset since the object watcher might otherwise reset the event
    571   // when we're doing a WaitMany.
    572   dispatch_watcher_callback_ =
    573       base::Bind(&SyncChannel::OnWaitableEventSignaled,
    574                   base::Unretained(this));
    575   dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(),
    576                                   dispatch_watcher_callback_);
    577 }
    578 
    579 }  // namespace IPC
    580