1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "ipc/ipc_sync_channel.h" 6 7 #include "base/bind.h" 8 #include "base/debug/trace_event.h" 9 #include "base/lazy_instance.h" 10 #include "base/location.h" 11 #include "base/logging.h" 12 #include "base/synchronization/waitable_event.h" 13 #include "base/synchronization/waitable_event_watcher.h" 14 #include "base/thread_task_runner_handle.h" 15 #include "base/threading/thread_local.h" 16 #include "ipc/ipc_logging.h" 17 #include "ipc/ipc_message_macros.h" 18 #include "ipc/ipc_sync_message.h" 19 20 using base::TimeDelta; 21 using base::TimeTicks; 22 using base::WaitableEvent; 23 24 namespace IPC { 25 // When we're blocked in a Send(), we need to process incoming synchronous 26 // messages right away because it could be blocking our reply (either 27 // directly from the same object we're calling, or indirectly through one or 28 // more other channels). That means that in SyncContext's OnMessageReceived, 29 // we need to process sync message right away if we're blocked. However a 30 // simple check isn't sufficient, because the listener thread can be in the 31 // process of calling Send. 32 // To work around this, when SyncChannel filters a sync message, it sets 33 // an event that the listener thread waits on during its Send() call. This 34 // allows us to dispatch incoming sync messages when blocked. The race 35 // condition is handled because if Send is in the process of being called, it 36 // will check the event. In case the listener thread isn't sending a message, 37 // we queue a task on the listener thread to dispatch the received messages. 38 // The messages are stored in this queue object that's shared among all 39 // SyncChannel objects on the same thread (since one object can receive a 40 // sync message while another one is blocked). 41 42 class SyncChannel::ReceivedSyncMsgQueue : 43 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { 44 public: 45 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one 46 // if necessary. Call RemoveContext on the same thread when done. 47 static ReceivedSyncMsgQueue* AddContext() { 48 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple 49 // SyncChannel objects can block the same thread). 50 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); 51 if (!rv) { 52 rv = new ReceivedSyncMsgQueue(); 53 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); 54 } 55 rv->listener_count_++; 56 return rv; 57 } 58 59 // Called on IPC thread when a synchronous message or reply arrives. 60 void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) { 61 bool was_task_pending; 62 { 63 base::AutoLock auto_lock(message_lock_); 64 65 was_task_pending = task_pending_; 66 task_pending_ = true; 67 68 // We set the event in case the listener thread is blocked (or is about 69 // to). In case it's not, the PostTask dispatches the messages. 70 message_queue_.push_back(QueuedMessage(new Message(msg), context)); 71 message_queue_version_++; 72 } 73 74 dispatch_event_.Signal(); 75 if (!was_task_pending) { 76 listener_task_runner_->PostTask( 77 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask, 78 this, scoped_refptr<SyncContext>(context))); 79 } 80 } 81 82 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { 83 received_replies_.push_back(QueuedMessage(new Message(msg), context)); 84 } 85 86 // Called on the listener's thread to process any queues synchronous 87 // messages. 88 void DispatchMessagesTask(SyncContext* context) { 89 { 90 base::AutoLock auto_lock(message_lock_); 91 task_pending_ = false; 92 } 93 context->DispatchMessages(); 94 } 95 96 void DispatchMessages(SyncContext* dispatching_context) { 97 bool first_time = true; 98 uint32 expected_version = 0; 99 SyncMessageQueue::iterator it; 100 while (true) { 101 Message* message = NULL; 102 scoped_refptr<SyncChannel::SyncContext> context; 103 { 104 base::AutoLock auto_lock(message_lock_); 105 if (first_time || message_queue_version_ != expected_version) { 106 it = message_queue_.begin(); 107 first_time = false; 108 } 109 for (; it != message_queue_.end(); it++) { 110 int message_group = it->context->restrict_dispatch_group(); 111 if (message_group == kRestrictDispatchGroup_None || 112 message_group == dispatching_context->restrict_dispatch_group()) { 113 message = it->message; 114 context = it->context; 115 it = message_queue_.erase(it); 116 message_queue_version_++; 117 expected_version = message_queue_version_; 118 break; 119 } 120 } 121 } 122 123 if (message == NULL) 124 break; 125 context->OnDispatchMessage(*message); 126 delete message; 127 } 128 } 129 130 // SyncChannel calls this in its destructor. 131 void RemoveContext(SyncContext* context) { 132 base::AutoLock auto_lock(message_lock_); 133 134 SyncMessageQueue::iterator iter = message_queue_.begin(); 135 while (iter != message_queue_.end()) { 136 if (iter->context.get() == context) { 137 delete iter->message; 138 iter = message_queue_.erase(iter); 139 message_queue_version_++; 140 } else { 141 iter++; 142 } 143 } 144 145 if (--listener_count_ == 0) { 146 DCHECK(lazy_tls_ptr_.Pointer()->Get()); 147 lazy_tls_ptr_.Pointer()->Set(NULL); 148 } 149 } 150 151 WaitableEvent* dispatch_event() { return &dispatch_event_; } 152 base::SingleThreadTaskRunner* listener_task_runner() { 153 return listener_task_runner_.get(); 154 } 155 156 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. 157 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > 158 lazy_tls_ptr_; 159 160 // Called on the ipc thread to check if we can unblock any current Send() 161 // calls based on a queued reply. 162 void DispatchReplies() { 163 for (size_t i = 0; i < received_replies_.size(); ++i) { 164 Message* message = received_replies_[i].message; 165 if (received_replies_[i].context->TryToUnblockListener(message)) { 166 delete message; 167 received_replies_.erase(received_replies_.begin() + i); 168 return; 169 } 170 } 171 } 172 173 base::WaitableEventWatcher* top_send_done_watcher() { 174 return top_send_done_watcher_; 175 } 176 177 void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) { 178 top_send_done_watcher_ = watcher; 179 } 180 181 private: 182 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; 183 184 // See the comment in SyncChannel::SyncChannel for why this event is created 185 // as manual reset. 186 ReceivedSyncMsgQueue() : 187 message_queue_version_(0), 188 dispatch_event_(true, false), 189 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), 190 task_pending_(false), 191 listener_count_(0), 192 top_send_done_watcher_(NULL) { 193 } 194 195 ~ReceivedSyncMsgQueue() {} 196 197 // Holds information about a queued synchronous message or reply. 198 struct QueuedMessage { 199 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } 200 Message* message; 201 scoped_refptr<SyncChannel::SyncContext> context; 202 }; 203 204 typedef std::list<QueuedMessage> SyncMessageQueue; 205 SyncMessageQueue message_queue_; 206 uint32 message_queue_version_; // Used to signal DispatchMessages to rescan 207 208 std::vector<QueuedMessage> received_replies_; 209 210 // Set when we got a synchronous message that we must respond to as the 211 // sender needs its reply before it can reply to our original synchronous 212 // message. 213 WaitableEvent dispatch_event_; 214 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; 215 base::Lock message_lock_; 216 bool task_pending_; 217 int listener_count_; 218 219 // The current send done event watcher for this thread. Used to maintain 220 // a local global stack of send done watchers to ensure that nested sync 221 // message loops complete correctly. 222 base::WaitableEventWatcher* top_send_done_watcher_; 223 }; 224 225 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > 226 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = 227 LAZY_INSTANCE_INITIALIZER; 228 229 SyncChannel::SyncContext::SyncContext( 230 Listener* listener, 231 base::SingleThreadTaskRunner* ipc_task_runner, 232 WaitableEvent* shutdown_event) 233 : ChannelProxy::Context(listener, ipc_task_runner), 234 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), 235 shutdown_event_(shutdown_event), 236 restrict_dispatch_group_(kRestrictDispatchGroup_None) { 237 } 238 239 SyncChannel::SyncContext::~SyncContext() { 240 while (!deserializers_.empty()) 241 Pop(); 242 } 243 244 // Adds information about an outgoing sync message to the context so that 245 // we know how to deserialize the reply. Returns a handle that's set when 246 // the reply has arrived. 247 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { 248 // Create the tracking information for this message. This object is stored 249 // by value since all members are pointers that are cheap to copy. These 250 // pointers are cleaned up in the Pop() function. 251 // 252 // The event is created as manual reset because in between Signal and 253 // OnObjectSignalled, another Send can happen which would stop the watcher 254 // from being called. The event would get watched later, when the nested 255 // Send completes, so the event will need to remain set. 256 PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg), 257 sync_msg->GetReplyDeserializer(), 258 new WaitableEvent(true, false)); 259 base::AutoLock auto_lock(deserializers_lock_); 260 deserializers_.push_back(pending); 261 } 262 263 bool SyncChannel::SyncContext::Pop() { 264 bool result; 265 { 266 base::AutoLock auto_lock(deserializers_lock_); 267 PendingSyncMsg msg = deserializers_.back(); 268 delete msg.deserializer; 269 delete msg.done_event; 270 msg.done_event = NULL; 271 deserializers_.pop_back(); 272 result = msg.send_result; 273 } 274 275 // We got a reply to a synchronous Send() call that's blocking the listener 276 // thread. However, further down the call stack there could be another 277 // blocking Send() call, whose reply we received after we made this last 278 // Send() call. So check if we have any queued replies available that 279 // can now unblock the listener thread. 280 ipc_task_runner()->PostTask( 281 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, 282 received_sync_msgs_.get())); 283 284 return result; 285 } 286 287 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { 288 base::AutoLock auto_lock(deserializers_lock_); 289 return deserializers_.back().done_event; 290 } 291 292 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { 293 return received_sync_msgs_->dispatch_event(); 294 } 295 296 void SyncChannel::SyncContext::DispatchMessages() { 297 received_sync_msgs_->DispatchMessages(this); 298 } 299 300 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { 301 base::AutoLock auto_lock(deserializers_lock_); 302 if (deserializers_.empty() || 303 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { 304 return false; 305 } 306 307 // TODO(bauerb): Remove logging once investigation of http://crbug.com/141055 308 // has finished. 309 if (!msg->is_reply_error()) { 310 bool send_result = deserializers_.back().deserializer-> 311 SerializeOutputParameters(*msg); 312 deserializers_.back().send_result = send_result; 313 VLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; 314 } else { 315 VLOG(1) << "Received error reply"; 316 } 317 deserializers_.back().done_event->Signal(); 318 319 return true; 320 } 321 322 void SyncChannel::SyncContext::Clear() { 323 CancelPendingSends(); 324 received_sync_msgs_->RemoveContext(this); 325 Context::Clear(); 326 } 327 328 bool SyncChannel::SyncContext::OnMessageReceived(const Message& msg) { 329 // Give the filters a chance at processing this message. 330 if (TryFilters(msg)) 331 return true; 332 333 if (TryToUnblockListener(&msg)) 334 return true; 335 336 if (msg.is_reply()) { 337 received_sync_msgs_->QueueReply(msg, this); 338 return true; 339 } 340 341 if (msg.should_unblock()) { 342 received_sync_msgs_->QueueMessage(msg, this); 343 return true; 344 } 345 346 return Context::OnMessageReceivedNoFilter(msg); 347 } 348 349 void SyncChannel::SyncContext::OnChannelError() { 350 CancelPendingSends(); 351 shutdown_watcher_.StopWatching(); 352 Context::OnChannelError(); 353 } 354 355 void SyncChannel::SyncContext::OnChannelOpened() { 356 shutdown_watcher_.StartWatching( 357 shutdown_event_, 358 base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, 359 base::Unretained(this))); 360 Context::OnChannelOpened(); 361 } 362 363 void SyncChannel::SyncContext::OnChannelClosed() { 364 CancelPendingSends(); 365 shutdown_watcher_.StopWatching(); 366 Context::OnChannelClosed(); 367 } 368 369 void SyncChannel::SyncContext::OnSendTimeout(int message_id) { 370 base::AutoLock auto_lock(deserializers_lock_); 371 PendingSyncMessageQueue::iterator iter; 372 VLOG(1) << "Send timeout"; 373 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { 374 if (iter->id == message_id) { 375 iter->done_event->Signal(); 376 break; 377 } 378 } 379 } 380 381 void SyncChannel::SyncContext::CancelPendingSends() { 382 base::AutoLock auto_lock(deserializers_lock_); 383 PendingSyncMessageQueue::iterator iter; 384 // TODO(bauerb): Remove once http://crbug/141055 is fixed. 385 VLOG(1) << "Canceling pending sends"; 386 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) 387 iter->done_event->Signal(); 388 } 389 390 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) { 391 if (event == shutdown_event_) { 392 // Process shut down before we can get a reply to a synchronous message. 393 // Cancel pending Send calls, which will end up setting the send done event. 394 CancelPendingSends(); 395 } else { 396 // We got the reply, timed out or the process shutdown. 397 DCHECK_EQ(GetSendDoneEvent(), event); 398 base::MessageLoop::current()->QuitNow(); 399 } 400 } 401 402 base::WaitableEventWatcher::EventCallback 403 SyncChannel::SyncContext::MakeWaitableEventCallback() { 404 return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, this); 405 } 406 407 SyncChannel::SyncChannel( 408 const IPC::ChannelHandle& channel_handle, 409 Channel::Mode mode, 410 Listener* listener, 411 base::SingleThreadTaskRunner* ipc_task_runner, 412 bool create_pipe_now, 413 WaitableEvent* shutdown_event) 414 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), 415 sync_messages_with_no_timeout_allowed_(true) { 416 ChannelProxy::Init(channel_handle, mode, create_pipe_now); 417 StartWatching(); 418 } 419 420 SyncChannel::SyncChannel( 421 Listener* listener, 422 base::SingleThreadTaskRunner* ipc_task_runner, 423 WaitableEvent* shutdown_event) 424 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), 425 sync_messages_with_no_timeout_allowed_(true) { 426 StartWatching(); 427 } 428 429 SyncChannel::~SyncChannel() { 430 } 431 432 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { 433 sync_context()->set_restrict_dispatch_group(group); 434 } 435 436 bool SyncChannel::Send(Message* message) { 437 return SendWithTimeout(message, base::kNoTimeout); 438 } 439 440 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) { 441 #ifdef IPC_MESSAGE_LOG_ENABLED 442 Logging* logger = Logging::GetInstance(); 443 std::string name; 444 logger->GetMessageText(message->type(), &name, message, NULL); 445 TRACE_EVENT1("task", "SyncChannel::SendWithTimeout", 446 "name", name); 447 #else 448 TRACE_EVENT2("task", "SyncChannel::SendWithTimeout", 449 "class", IPC_MESSAGE_ID_CLASS(message->type()), 450 "line", IPC_MESSAGE_ID_LINE(message->type())); 451 #endif 452 if (!message->is_sync()) { 453 ChannelProxy::Send(message); 454 return true; 455 } 456 457 // *this* might get deleted in WaitForReply. 458 scoped_refptr<SyncContext> context(sync_context()); 459 if (context->shutdown_event()->IsSignaled()) { 460 VLOG(1) << "shutdown event is signaled"; 461 delete message; 462 return false; 463 } 464 465 DCHECK(sync_messages_with_no_timeout_allowed_ || 466 timeout_ms != base::kNoTimeout); 467 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); 468 context->Push(sync_msg); 469 int message_id = SyncMessage::GetMessageId(*sync_msg); 470 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event(); 471 472 ChannelProxy::Send(message); 473 474 if (timeout_ms != base::kNoTimeout) { 475 // We use the sync message id so that when a message times out, we don't 476 // confuse it with another send that is either above/below this Send in 477 // the call stack. 478 context->ipc_task_runner()->PostDelayedTask( 479 FROM_HERE, 480 base::Bind(&SyncContext::OnSendTimeout, context.get(), message_id), 481 base::TimeDelta::FromMilliseconds(timeout_ms)); 482 } 483 484 // Wait for reply, or for any other incoming synchronous messages. 485 // *this* might get deleted, so only call static functions at this point. 486 WaitForReply(context.get(), pump_messages_event); 487 488 return context->Pop(); 489 } 490 491 void SyncChannel::WaitForReply( 492 SyncContext* context, WaitableEvent* pump_messages_event) { 493 context->DispatchMessages(); 494 while (true) { 495 WaitableEvent* objects[] = { 496 context->GetDispatchEvent(), 497 context->GetSendDoneEvent(), 498 pump_messages_event 499 }; 500 501 unsigned count = pump_messages_event ? 3: 2; 502 size_t result = WaitableEvent::WaitMany(objects, count); 503 if (result == 0 /* dispatch event */) { 504 // We're waiting for a reply, but we received a blocking synchronous 505 // call. We must process it or otherwise a deadlock might occur. 506 context->GetDispatchEvent()->Reset(); 507 context->DispatchMessages(); 508 continue; 509 } 510 511 if (result == 2 /* pump_messages_event */) 512 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. 513 514 break; 515 } 516 } 517 518 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { 519 base::WaitableEventWatcher send_done_watcher; 520 521 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); 522 DCHECK(sync_msg_queue != NULL); 523 524 base::WaitableEventWatcher* old_send_done_event_watcher = 525 sync_msg_queue->top_send_done_watcher(); 526 527 base::WaitableEventWatcher::EventCallback old_callback; 528 base::WaitableEvent* old_event = NULL; 529 530 // Maintain a local global stack of send done delegates to ensure that 531 // nested sync calls complete in the correct sequence, i.e. the 532 // outermost call completes first, etc. 533 if (old_send_done_event_watcher) { 534 old_callback = old_send_done_event_watcher->callback(); 535 old_event = old_send_done_event_watcher->GetWatchedEvent(); 536 old_send_done_event_watcher->StopWatching(); 537 } 538 539 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); 540 541 send_done_watcher.StartWatching(context->GetSendDoneEvent(), 542 context->MakeWaitableEventCallback()); 543 544 { 545 base::MessageLoop::ScopedNestableTaskAllower allow( 546 base::MessageLoop::current()); 547 base::MessageLoop::current()->Run(); 548 } 549 550 sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher); 551 if (old_send_done_event_watcher && old_event) { 552 old_send_done_event_watcher->StartWatching(old_event, old_callback); 553 } 554 } 555 556 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { 557 DCHECK(event == sync_context()->GetDispatchEvent()); 558 // The call to DispatchMessages might delete this object, so reregister 559 // the object watcher first. 560 event->Reset(); 561 dispatch_watcher_.StartWatching(event, dispatch_watcher_callback_); 562 sync_context()->DispatchMessages(); 563 } 564 565 void SyncChannel::StartWatching() { 566 // Ideally we only want to watch this object when running a nested message 567 // loop. However, we don't know when it exits if there's another nested 568 // message loop running under it or not, so we wouldn't know whether to 569 // stop or keep watching. So we always watch it, and create the event as 570 // manual reset since the object watcher might otherwise reset the event 571 // when we're doing a WaitMany. 572 dispatch_watcher_callback_ = 573 base::Bind(&SyncChannel::OnWaitableEventSignaled, 574 base::Unretained(this)); 575 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), 576 dispatch_watcher_callback_); 577 } 578 579 } // namespace IPC 580