1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "ipc/ipc_sync_channel.h" 6 7 #include "base/bind.h" 8 #include "base/debug/trace_event.h" 9 #include "base/lazy_instance.h" 10 #include "base/location.h" 11 #include "base/logging.h" 12 #include "base/synchronization/waitable_event.h" 13 #include "base/synchronization/waitable_event_watcher.h" 14 #include "base/thread_task_runner_handle.h" 15 #include "base/threading/thread_local.h" 16 #include "ipc/ipc_logging.h" 17 #include "ipc/ipc_message_macros.h" 18 #include "ipc/ipc_sync_message.h" 19 20 using base::TimeDelta; 21 using base::TimeTicks; 22 using base::WaitableEvent; 23 24 namespace IPC { 25 // When we're blocked in a Send(), we need to process incoming synchronous 26 // messages right away because it could be blocking our reply (either 27 // directly from the same object we're calling, or indirectly through one or 28 // more other channels). That means that in SyncContext's OnMessageReceived, 29 // we need to process sync message right away if we're blocked. However a 30 // simple check isn't sufficient, because the listener thread can be in the 31 // process of calling Send. 32 // To work around this, when SyncChannel filters a sync message, it sets 33 // an event that the listener thread waits on during its Send() call. This 34 // allows us to dispatch incoming sync messages when blocked. The race 35 // condition is handled because if Send is in the process of being called, it 36 // will check the event. In case the listener thread isn't sending a message, 37 // we queue a task on the listener thread to dispatch the received messages. 38 // The messages are stored in this queue object that's shared among all 39 // SyncChannel objects on the same thread (since one object can receive a 40 // sync message while another one is blocked). 41 42 class SyncChannel::ReceivedSyncMsgQueue : 43 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { 44 public: 45 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one 46 // if necessary. Call RemoveContext on the same thread when done. 47 static ReceivedSyncMsgQueue* AddContext() { 48 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple 49 // SyncChannel objects can block the same thread). 50 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); 51 if (!rv) { 52 rv = new ReceivedSyncMsgQueue(); 53 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); 54 } 55 rv->listener_count_++; 56 return rv; 57 } 58 59 // Called on IPC thread when a synchronous message or reply arrives. 60 void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) { 61 bool was_task_pending; 62 { 63 base::AutoLock auto_lock(message_lock_); 64 65 was_task_pending = task_pending_; 66 task_pending_ = true; 67 68 // We set the event in case the listener thread is blocked (or is about 69 // to). In case it's not, the PostTask dispatches the messages. 70 message_queue_.push_back(QueuedMessage(new Message(msg), context)); 71 message_queue_version_++; 72 } 73 74 dispatch_event_.Signal(); 75 if (!was_task_pending) { 76 listener_task_runner_->PostTask( 77 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask, 78 this, scoped_refptr<SyncContext>(context))); 79 } 80 } 81 82 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { 83 received_replies_.push_back(QueuedMessage(new Message(msg), context)); 84 } 85 86 // Called on the listener's thread to process any queues synchronous 87 // messages. 88 void DispatchMessagesTask(SyncContext* context) { 89 { 90 base::AutoLock auto_lock(message_lock_); 91 task_pending_ = false; 92 } 93 context->DispatchMessages(); 94 } 95 96 void DispatchMessages(SyncContext* dispatching_context) { 97 bool first_time = true; 98 uint32 expected_version = 0; 99 SyncMessageQueue::iterator it; 100 while (true) { 101 Message* message = NULL; 102 scoped_refptr<SyncChannel::SyncContext> context; 103 { 104 base::AutoLock auto_lock(message_lock_); 105 if (first_time || message_queue_version_ != expected_version) { 106 it = message_queue_.begin(); 107 first_time = false; 108 } 109 for (; it != message_queue_.end(); it++) { 110 int message_group = it->context->restrict_dispatch_group(); 111 if (message_group == kRestrictDispatchGroup_None || 112 message_group == dispatching_context->restrict_dispatch_group()) { 113 message = it->message; 114 context = it->context; 115 it = message_queue_.erase(it); 116 message_queue_version_++; 117 expected_version = message_queue_version_; 118 break; 119 } 120 } 121 } 122 123 if (message == NULL) 124 break; 125 context->OnDispatchMessage(*message); 126 delete message; 127 } 128 } 129 130 // SyncChannel calls this in its destructor. 131 void RemoveContext(SyncContext* context) { 132 base::AutoLock auto_lock(message_lock_); 133 134 SyncMessageQueue::iterator iter = message_queue_.begin(); 135 while (iter != message_queue_.end()) { 136 if (iter->context.get() == context) { 137 delete iter->message; 138 iter = message_queue_.erase(iter); 139 message_queue_version_++; 140 } else { 141 iter++; 142 } 143 } 144 145 if (--listener_count_ == 0) { 146 DCHECK(lazy_tls_ptr_.Pointer()->Get()); 147 lazy_tls_ptr_.Pointer()->Set(NULL); 148 } 149 } 150 151 WaitableEvent* dispatch_event() { return &dispatch_event_; } 152 base::SingleThreadTaskRunner* listener_task_runner() { 153 return listener_task_runner_.get(); 154 } 155 156 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. 157 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > 158 lazy_tls_ptr_; 159 160 // Called on the ipc thread to check if we can unblock any current Send() 161 // calls based on a queued reply. 162 void DispatchReplies() { 163 for (size_t i = 0; i < received_replies_.size(); ++i) { 164 Message* message = received_replies_[i].message; 165 if (received_replies_[i].context->TryToUnblockListener(message)) { 166 delete message; 167 received_replies_.erase(received_replies_.begin() + i); 168 return; 169 } 170 } 171 } 172 173 base::WaitableEventWatcher* top_send_done_watcher() { 174 return top_send_done_watcher_; 175 } 176 177 void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) { 178 top_send_done_watcher_ = watcher; 179 } 180 181 private: 182 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; 183 184 // See the comment in SyncChannel::SyncChannel for why this event is created 185 // as manual reset. 186 ReceivedSyncMsgQueue() : 187 message_queue_version_(0), 188 dispatch_event_(true, false), 189 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), 190 task_pending_(false), 191 listener_count_(0), 192 top_send_done_watcher_(NULL) { 193 } 194 195 ~ReceivedSyncMsgQueue() {} 196 197 // Holds information about a queued synchronous message or reply. 198 struct QueuedMessage { 199 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } 200 Message* message; 201 scoped_refptr<SyncChannel::SyncContext> context; 202 }; 203 204 typedef std::list<QueuedMessage> SyncMessageQueue; 205 SyncMessageQueue message_queue_; 206 uint32 message_queue_version_; // Used to signal DispatchMessages to rescan 207 208 std::vector<QueuedMessage> received_replies_; 209 210 // Set when we got a synchronous message that we must respond to as the 211 // sender needs its reply before it can reply to our original synchronous 212 // message. 213 WaitableEvent dispatch_event_; 214 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; 215 base::Lock message_lock_; 216 bool task_pending_; 217 int listener_count_; 218 219 // The current send done event watcher for this thread. Used to maintain 220 // a local global stack of send done watchers to ensure that nested sync 221 // message loops complete correctly. 222 base::WaitableEventWatcher* top_send_done_watcher_; 223 }; 224 225 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > 226 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = 227 LAZY_INSTANCE_INITIALIZER; 228 229 SyncChannel::SyncContext::SyncContext( 230 Listener* listener, 231 base::SingleThreadTaskRunner* ipc_task_runner, 232 WaitableEvent* shutdown_event) 233 : ChannelProxy::Context(listener, ipc_task_runner), 234 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), 235 shutdown_event_(shutdown_event), 236 restrict_dispatch_group_(kRestrictDispatchGroup_None) { 237 } 238 239 SyncChannel::SyncContext::~SyncContext() { 240 while (!deserializers_.empty()) 241 Pop(); 242 } 243 244 // Adds information about an outgoing sync message to the context so that 245 // we know how to deserialize the reply. Returns a handle that's set when 246 // the reply has arrived. 247 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { 248 // Create the tracking information for this message. This object is stored 249 // by value since all members are pointers that are cheap to copy. These 250 // pointers are cleaned up in the Pop() function. 251 // 252 // The event is created as manual reset because in between Signal and 253 // OnObjectSignalled, another Send can happen which would stop the watcher 254 // from being called. The event would get watched later, when the nested 255 // Send completes, so the event will need to remain set. 256 PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg), 257 sync_msg->GetReplyDeserializer(), 258 new WaitableEvent(true, false)); 259 base::AutoLock auto_lock(deserializers_lock_); 260 deserializers_.push_back(pending); 261 } 262 263 bool SyncChannel::SyncContext::Pop() { 264 bool result; 265 { 266 base::AutoLock auto_lock(deserializers_lock_); 267 PendingSyncMsg msg = deserializers_.back(); 268 delete msg.deserializer; 269 delete msg.done_event; 270 msg.done_event = NULL; 271 deserializers_.pop_back(); 272 result = msg.send_result; 273 } 274 275 // We got a reply to a synchronous Send() call that's blocking the listener 276 // thread. However, further down the call stack there could be another 277 // blocking Send() call, whose reply we received after we made this last 278 // Send() call. So check if we have any queued replies available that 279 // can now unblock the listener thread. 280 ipc_task_runner()->PostTask( 281 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, 282 received_sync_msgs_.get())); 283 284 return result; 285 } 286 287 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { 288 base::AutoLock auto_lock(deserializers_lock_); 289 return deserializers_.back().done_event; 290 } 291 292 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { 293 return received_sync_msgs_->dispatch_event(); 294 } 295 296 void SyncChannel::SyncContext::DispatchMessages() { 297 received_sync_msgs_->DispatchMessages(this); 298 } 299 300 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { 301 base::AutoLock auto_lock(deserializers_lock_); 302 if (deserializers_.empty() || 303 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { 304 return false; 305 } 306 307 // TODO(bauerb): Remove logging once investigation of http://crbug.com/141055 308 // has finished. 309 if (!msg->is_reply_error()) { 310 bool send_result = deserializers_.back().deserializer-> 311 SerializeOutputParameters(*msg); 312 deserializers_.back().send_result = send_result; 313 VLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; 314 } else { 315 VLOG(1) << "Received error reply"; 316 } 317 deserializers_.back().done_event->Signal(); 318 319 return true; 320 } 321 322 void SyncChannel::SyncContext::Clear() { 323 CancelPendingSends(); 324 received_sync_msgs_->RemoveContext(this); 325 Context::Clear(); 326 } 327 328 bool SyncChannel::SyncContext::OnMessageReceived(const Message& msg) { 329 // Give the filters a chance at processing this message. 330 if (TryFilters(msg)) 331 return true; 332 333 if (TryToUnblockListener(&msg)) 334 return true; 335 336 if (msg.is_reply()) { 337 received_sync_msgs_->QueueReply(msg, this); 338 return true; 339 } 340 341 if (msg.should_unblock()) { 342 received_sync_msgs_->QueueMessage(msg, this); 343 return true; 344 } 345 346 return Context::OnMessageReceivedNoFilter(msg); 347 } 348 349 void SyncChannel::SyncContext::OnChannelError() { 350 CancelPendingSends(); 351 shutdown_watcher_.StopWatching(); 352 Context::OnChannelError(); 353 } 354 355 void SyncChannel::SyncContext::OnChannelOpened() { 356 shutdown_watcher_.StartWatching( 357 shutdown_event_, 358 base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, 359 base::Unretained(this))); 360 Context::OnChannelOpened(); 361 } 362 363 void SyncChannel::SyncContext::OnChannelClosed() { 364 CancelPendingSends(); 365 shutdown_watcher_.StopWatching(); 366 Context::OnChannelClosed(); 367 } 368 369 void SyncChannel::SyncContext::OnSendTimeout(int message_id) { 370 base::AutoLock auto_lock(deserializers_lock_); 371 PendingSyncMessageQueue::iterator iter; 372 VLOG(1) << "Send timeout"; 373 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { 374 if (iter->id == message_id) { 375 iter->done_event->Signal(); 376 break; 377 } 378 } 379 } 380 381 void SyncChannel::SyncContext::CancelPendingSends() { 382 base::AutoLock auto_lock(deserializers_lock_); 383 PendingSyncMessageQueue::iterator iter; 384 // TODO(bauerb): Remove once http://crbug/141055 is fixed. 385 VLOG(1) << "Canceling pending sends"; 386 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) 387 iter->done_event->Signal(); 388 } 389 390 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) { 391 if (event == shutdown_event_) { 392 // Process shut down before we can get a reply to a synchronous message. 393 // Cancel pending Send calls, which will end up setting the send done event. 394 CancelPendingSends(); 395 } else { 396 // We got the reply, timed out or the process shutdown. 397 DCHECK_EQ(GetSendDoneEvent(), event); 398 base::MessageLoop::current()->QuitNow(); 399 } 400 } 401 402 base::WaitableEventWatcher::EventCallback 403 SyncChannel::SyncContext::MakeWaitableEventCallback() { 404 return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, this); 405 } 406 407 // static 408 scoped_ptr<SyncChannel> SyncChannel::Create( 409 const IPC::ChannelHandle& channel_handle, 410 Channel::Mode mode, 411 Listener* listener, 412 base::SingleThreadTaskRunner* ipc_task_runner, 413 bool create_pipe_now, 414 base::WaitableEvent* shutdown_event) { 415 scoped_ptr<SyncChannel> channel = 416 Create(listener, ipc_task_runner, shutdown_event); 417 channel->Init(channel_handle, mode, create_pipe_now); 418 return channel.Pass(); 419 } 420 421 // static 422 scoped_ptr<SyncChannel> SyncChannel::Create( 423 Listener* listener, 424 base::SingleThreadTaskRunner* ipc_task_runner, 425 WaitableEvent* shutdown_event) { 426 return make_scoped_ptr( 427 new SyncChannel(listener, ipc_task_runner, shutdown_event)); 428 } 429 430 SyncChannel::SyncChannel( 431 Listener* listener, 432 base::SingleThreadTaskRunner* ipc_task_runner, 433 WaitableEvent* shutdown_event) 434 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) { 435 // The current (listener) thread must be distinct from the IPC thread, or else 436 // sending synchronous messages will deadlock. 437 DCHECK_NE(ipc_task_runner, base::ThreadTaskRunnerHandle::Get()); 438 StartWatching(); 439 } 440 441 SyncChannel::~SyncChannel() { 442 } 443 444 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { 445 sync_context()->set_restrict_dispatch_group(group); 446 } 447 448 bool SyncChannel::Send(Message* message) { 449 #ifdef IPC_MESSAGE_LOG_ENABLED 450 Logging* logger = Logging::GetInstance(); 451 std::string name; 452 logger->GetMessageText(message->type(), &name, message, NULL); 453 TRACE_EVENT1("ipc", "SyncChannel::Send", "name", name); 454 #else 455 TRACE_EVENT2("ipc", "SyncChannel::Send", 456 "class", IPC_MESSAGE_ID_CLASS(message->type()), 457 "line", IPC_MESSAGE_ID_LINE(message->type())); 458 #endif 459 if (!message->is_sync()) { 460 ChannelProxy::Send(message); 461 return true; 462 } 463 464 // *this* might get deleted in WaitForReply. 465 scoped_refptr<SyncContext> context(sync_context()); 466 if (context->shutdown_event()->IsSignaled()) { 467 VLOG(1) << "shutdown event is signaled"; 468 delete message; 469 return false; 470 } 471 472 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); 473 context->Push(sync_msg); 474 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event(); 475 476 ChannelProxy::Send(message); 477 478 // Wait for reply, or for any other incoming synchronous messages. 479 // *this* might get deleted, so only call static functions at this point. 480 WaitForReply(context.get(), pump_messages_event); 481 482 return context->Pop(); 483 } 484 485 void SyncChannel::WaitForReply( 486 SyncContext* context, WaitableEvent* pump_messages_event) { 487 context->DispatchMessages(); 488 while (true) { 489 WaitableEvent* objects[] = { 490 context->GetDispatchEvent(), 491 context->GetSendDoneEvent(), 492 pump_messages_event 493 }; 494 495 unsigned count = pump_messages_event ? 3: 2; 496 size_t result = WaitableEvent::WaitMany(objects, count); 497 if (result == 0 /* dispatch event */) { 498 // We're waiting for a reply, but we received a blocking synchronous 499 // call. We must process it or otherwise a deadlock might occur. 500 context->GetDispatchEvent()->Reset(); 501 context->DispatchMessages(); 502 continue; 503 } 504 505 if (result == 2 /* pump_messages_event */) 506 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. 507 508 break; 509 } 510 } 511 512 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { 513 base::WaitableEventWatcher send_done_watcher; 514 515 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); 516 DCHECK(sync_msg_queue != NULL); 517 518 base::WaitableEventWatcher* old_send_done_event_watcher = 519 sync_msg_queue->top_send_done_watcher(); 520 521 base::WaitableEventWatcher::EventCallback old_callback; 522 base::WaitableEvent* old_event = NULL; 523 524 // Maintain a local global stack of send done delegates to ensure that 525 // nested sync calls complete in the correct sequence, i.e. the 526 // outermost call completes first, etc. 527 if (old_send_done_event_watcher) { 528 old_callback = old_send_done_event_watcher->callback(); 529 old_event = old_send_done_event_watcher->GetWatchedEvent(); 530 old_send_done_event_watcher->StopWatching(); 531 } 532 533 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); 534 535 send_done_watcher.StartWatching(context->GetSendDoneEvent(), 536 context->MakeWaitableEventCallback()); 537 538 { 539 base::MessageLoop::ScopedNestableTaskAllower allow( 540 base::MessageLoop::current()); 541 base::MessageLoop::current()->Run(); 542 } 543 544 sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher); 545 if (old_send_done_event_watcher && old_event) { 546 old_send_done_event_watcher->StartWatching(old_event, old_callback); 547 } 548 } 549 550 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { 551 DCHECK(event == sync_context()->GetDispatchEvent()); 552 // The call to DispatchMessages might delete this object, so reregister 553 // the object watcher first. 554 event->Reset(); 555 dispatch_watcher_.StartWatching(event, dispatch_watcher_callback_); 556 sync_context()->DispatchMessages(); 557 } 558 559 void SyncChannel::StartWatching() { 560 // Ideally we only want to watch this object when running a nested message 561 // loop. However, we don't know when it exits if there's another nested 562 // message loop running under it or not, so we wouldn't know whether to 563 // stop or keep watching. So we always watch it, and create the event as 564 // manual reset since the object watcher might otherwise reset the event 565 // when we're doing a WaitMany. 566 dispatch_watcher_callback_ = 567 base::Bind(&SyncChannel::OnWaitableEventSignaled, 568 base::Unretained(this)); 569 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), 570 dispatch_watcher_callback_); 571 } 572 573 } // namespace IPC 574