1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "ipc/ipc_sync_channel.h" 6 7 #include <string> 8 #include <vector> 9 10 #include "base/basictypes.h" 11 #include "base/bind.h" 12 #include "base/logging.h" 13 #include "base/memory/scoped_ptr.h" 14 #include "base/message_loop/message_loop.h" 15 #include "base/process/process_handle.h" 16 #include "base/run_loop.h" 17 #include "base/strings/string_util.h" 18 #include "base/synchronization/waitable_event.h" 19 #include "base/threading/platform_thread.h" 20 #include "base/threading/thread.h" 21 #include "ipc/ipc_listener.h" 22 #include "ipc/ipc_message.h" 23 #include "ipc/ipc_sender.h" 24 #include "ipc/ipc_sync_message_filter.h" 25 #include "ipc/ipc_sync_message_unittest.h" 26 #include "testing/gtest/include/gtest/gtest.h" 27 28 using base::WaitableEvent; 29 30 namespace IPC { 31 namespace { 32 33 // Base class for a "process" with listener and IPC threads. 34 class Worker : public Listener, public Sender { 35 public: 36 // Will create a channel without a name. 37 Worker(Channel::Mode mode, const std::string& thread_name) 38 : done_(new WaitableEvent(false, false)), 39 channel_created_(new WaitableEvent(false, false)), 40 mode_(mode), 41 ipc_thread_((thread_name + "_ipc").c_str()), 42 listener_thread_((thread_name + "_listener").c_str()), 43 overrided_thread_(NULL), 44 shutdown_event_(true, false), 45 is_shutdown_(false) { 46 } 47 48 // Will create a named channel and use this name for the threads' name. 49 Worker(const std::string& channel_name, Channel::Mode mode) 50 : done_(new WaitableEvent(false, false)), 51 channel_created_(new WaitableEvent(false, false)), 52 channel_name_(channel_name), 53 mode_(mode), 54 ipc_thread_((channel_name + "_ipc").c_str()), 55 listener_thread_((channel_name + "_listener").c_str()), 56 overrided_thread_(NULL), 57 shutdown_event_(true, false), 58 is_shutdown_(false) { 59 } 60 61 virtual ~Worker() { 62 // Shutdown() must be called before destruction. 63 CHECK(is_shutdown_); 64 } 65 void AddRef() { } 66 void Release() { } 67 virtual bool Send(Message* msg) OVERRIDE { return channel_->Send(msg); } 68 void WaitForChannelCreation() { channel_created_->Wait(); } 69 void CloseChannel() { 70 DCHECK(base::MessageLoop::current() == ListenerThread()->message_loop()); 71 channel_->Close(); 72 } 73 void Start() { 74 StartThread(&listener_thread_, base::MessageLoop::TYPE_DEFAULT); 75 ListenerThread()->message_loop()->PostTask( 76 FROM_HERE, base::Bind(&Worker::OnStart, this)); 77 } 78 void Shutdown() { 79 // The IPC thread needs to outlive SyncChannel. We can't do this in 80 // ~Worker(), since that'll reset the vtable pointer (to Worker's), which 81 // may result in a race conditions. See http://crbug.com/25841. 82 WaitableEvent listener_done(false, false), ipc_done(false, false); 83 ListenerThread()->message_loop()->PostTask( 84 FROM_HERE, base::Bind(&Worker::OnListenerThreadShutdown1, this, 85 &listener_done, &ipc_done)); 86 listener_done.Wait(); 87 ipc_done.Wait(); 88 ipc_thread_.Stop(); 89 listener_thread_.Stop(); 90 is_shutdown_ = true; 91 } 92 void OverrideThread(base::Thread* overrided_thread) { 93 DCHECK(overrided_thread_ == NULL); 94 overrided_thread_ = overrided_thread; 95 } 96 bool SendAnswerToLife(bool pump, bool succeed) { 97 int answer = 0; 98 SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer); 99 if (pump) 100 msg->EnableMessagePumping(); 101 bool result = Send(msg); 102 DCHECK_EQ(result, succeed); 103 DCHECK_EQ(answer, (succeed ? 42 : 0)); 104 return result; 105 } 106 bool SendDouble(bool pump, bool succeed) { 107 int answer = 0; 108 SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer); 109 if (pump) 110 msg->EnableMessagePumping(); 111 bool result = Send(msg); 112 DCHECK_EQ(result, succeed); 113 DCHECK_EQ(answer, (succeed ? 10 : 0)); 114 return result; 115 } 116 const std::string& channel_name() { return channel_name_; } 117 Channel::Mode mode() { return mode_; } 118 WaitableEvent* done_event() { return done_.get(); } 119 WaitableEvent* shutdown_event() { return &shutdown_event_; } 120 void ResetChannel() { channel_.reset(); } 121 // Derived classes need to call this when they've completed their part of 122 // the test. 123 void Done() { done_->Signal(); } 124 125 protected: 126 SyncChannel* channel() { return channel_.get(); } 127 // Functions for dervied classes to implement if they wish. 128 virtual void Run() { } 129 virtual void OnAnswer(int* answer) { NOTREACHED(); } 130 virtual void OnAnswerDelay(Message* reply_msg) { 131 // The message handler map below can only take one entry for 132 // SyncChannelTestMsg_AnswerToLife, so since some classes want 133 // the normal version while other want the delayed reply, we 134 // call the normal version if the derived class didn't override 135 // this function. 136 int answer; 137 OnAnswer(&answer); 138 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, answer); 139 Send(reply_msg); 140 } 141 virtual void OnDouble(int in, int* out) { NOTREACHED(); } 142 virtual void OnDoubleDelay(int in, Message* reply_msg) { 143 int result; 144 OnDouble(in, &result); 145 SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, result); 146 Send(reply_msg); 147 } 148 149 virtual void OnNestedTestMsg(Message* reply_msg) { 150 NOTREACHED(); 151 } 152 153 virtual SyncChannel* CreateChannel() { 154 scoped_ptr<SyncChannel> channel = SyncChannel::Create( 155 channel_name_, mode_, this, ipc_thread_.message_loop_proxy().get(), 156 true, &shutdown_event_); 157 return channel.release(); 158 } 159 160 base::Thread* ListenerThread() { 161 return overrided_thread_ ? overrided_thread_ : &listener_thread_; 162 } 163 164 const base::Thread& ipc_thread() const { return ipc_thread_; } 165 166 private: 167 // Called on the listener thread to create the sync channel. 168 void OnStart() { 169 // Link ipc_thread_, listener_thread_ and channel_ altogether. 170 StartThread(&ipc_thread_, base::MessageLoop::TYPE_IO); 171 channel_.reset(CreateChannel()); 172 channel_created_->Signal(); 173 Run(); 174 } 175 176 void OnListenerThreadShutdown1(WaitableEvent* listener_event, 177 WaitableEvent* ipc_event) { 178 // SyncChannel needs to be destructed on the thread that it was created on. 179 channel_.reset(); 180 181 base::RunLoop().RunUntilIdle(); 182 183 ipc_thread_.message_loop()->PostTask( 184 FROM_HERE, base::Bind(&Worker::OnIPCThreadShutdown, this, 185 listener_event, ipc_event)); 186 } 187 188 void OnIPCThreadShutdown(WaitableEvent* listener_event, 189 WaitableEvent* ipc_event) { 190 base::RunLoop().RunUntilIdle(); 191 ipc_event->Signal(); 192 193 listener_thread_.message_loop()->PostTask( 194 FROM_HERE, base::Bind(&Worker::OnListenerThreadShutdown2, this, 195 listener_event)); 196 } 197 198 void OnListenerThreadShutdown2(WaitableEvent* listener_event) { 199 base::RunLoop().RunUntilIdle(); 200 listener_event->Signal(); 201 } 202 203 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 204 IPC_BEGIN_MESSAGE_MAP(Worker, message) 205 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_Double, OnDoubleDelay) 206 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_AnswerToLife, 207 OnAnswerDelay) 208 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelNestedTestMsg_String, 209 OnNestedTestMsg) 210 IPC_END_MESSAGE_MAP() 211 return true; 212 } 213 214 void StartThread(base::Thread* thread, base::MessageLoop::Type type) { 215 base::Thread::Options options; 216 options.message_loop_type = type; 217 thread->StartWithOptions(options); 218 } 219 220 scoped_ptr<WaitableEvent> done_; 221 scoped_ptr<WaitableEvent> channel_created_; 222 std::string channel_name_; 223 Channel::Mode mode_; 224 scoped_ptr<SyncChannel> channel_; 225 base::Thread ipc_thread_; 226 base::Thread listener_thread_; 227 base::Thread* overrided_thread_; 228 229 base::WaitableEvent shutdown_event_; 230 231 bool is_shutdown_; 232 233 DISALLOW_COPY_AND_ASSIGN(Worker); 234 }; 235 236 237 // Starts the test with the given workers. This function deletes the workers 238 // when it's done. 239 void RunTest(std::vector<Worker*> workers) { 240 // First we create the workers that are channel servers, or else the other 241 // workers' channel initialization might fail because the pipe isn't created.. 242 for (size_t i = 0; i < workers.size(); ++i) { 243 if (workers[i]->mode() & Channel::MODE_SERVER_FLAG) { 244 workers[i]->Start(); 245 workers[i]->WaitForChannelCreation(); 246 } 247 } 248 249 // now create the clients 250 for (size_t i = 0; i < workers.size(); ++i) { 251 if (workers[i]->mode() & Channel::MODE_CLIENT_FLAG) 252 workers[i]->Start(); 253 } 254 255 // wait for all the workers to finish 256 for (size_t i = 0; i < workers.size(); ++i) 257 workers[i]->done_event()->Wait(); 258 259 for (size_t i = 0; i < workers.size(); ++i) { 260 workers[i]->Shutdown(); 261 delete workers[i]; 262 } 263 } 264 265 class IPCSyncChannelTest : public testing::Test { 266 private: 267 base::MessageLoop message_loop_; 268 }; 269 270 //------------------------------------------------------------------------------ 271 272 class SimpleServer : public Worker { 273 public: 274 explicit SimpleServer(bool pump_during_send) 275 : Worker(Channel::MODE_SERVER, "simpler_server"), 276 pump_during_send_(pump_during_send) { } 277 virtual void Run() OVERRIDE { 278 SendAnswerToLife(pump_during_send_, true); 279 Done(); 280 } 281 282 bool pump_during_send_; 283 }; 284 285 class SimpleClient : public Worker { 286 public: 287 SimpleClient() : Worker(Channel::MODE_CLIENT, "simple_client") { } 288 289 virtual void OnAnswer(int* answer) OVERRIDE { 290 *answer = 42; 291 Done(); 292 } 293 }; 294 295 void Simple(bool pump_during_send) { 296 std::vector<Worker*> workers; 297 workers.push_back(new SimpleServer(pump_during_send)); 298 workers.push_back(new SimpleClient()); 299 RunTest(workers); 300 } 301 302 // Tests basic synchronous call 303 TEST_F(IPCSyncChannelTest, Simple) { 304 Simple(false); 305 Simple(true); 306 } 307 308 //------------------------------------------------------------------------------ 309 310 // Worker classes which override how the sync channel is created to use the 311 // two-step initialization (calling the lightweight constructor and then 312 // ChannelProxy::Init separately) process. 313 class TwoStepServer : public Worker { 314 public: 315 explicit TwoStepServer(bool create_pipe_now) 316 : Worker(Channel::MODE_SERVER, "simpler_server"), 317 create_pipe_now_(create_pipe_now) { } 318 319 virtual void Run() OVERRIDE { 320 SendAnswerToLife(false, true); 321 Done(); 322 } 323 324 virtual SyncChannel* CreateChannel() OVERRIDE { 325 SyncChannel* channel = 326 SyncChannel::Create(channel_name(), mode(), this, 327 ipc_thread().message_loop_proxy().get(), 328 create_pipe_now_, 329 shutdown_event()).release(); 330 return channel; 331 } 332 333 bool create_pipe_now_; 334 }; 335 336 class TwoStepClient : public Worker { 337 public: 338 TwoStepClient(bool create_pipe_now) 339 : Worker(Channel::MODE_CLIENT, "simple_client"), 340 create_pipe_now_(create_pipe_now) { } 341 342 virtual void OnAnswer(int* answer) OVERRIDE { 343 *answer = 42; 344 Done(); 345 } 346 347 virtual SyncChannel* CreateChannel() OVERRIDE { 348 SyncChannel* channel = 349 SyncChannel::Create(channel_name(), mode(), this, 350 ipc_thread().message_loop_proxy().get(), 351 create_pipe_now_, 352 shutdown_event()).release(); 353 return channel; 354 } 355 356 bool create_pipe_now_; 357 }; 358 359 void TwoStep(bool create_server_pipe_now, bool create_client_pipe_now) { 360 std::vector<Worker*> workers; 361 workers.push_back(new TwoStepServer(create_server_pipe_now)); 362 workers.push_back(new TwoStepClient(create_client_pipe_now)); 363 RunTest(workers); 364 } 365 366 // Tests basic two-step initialization, where you call the lightweight 367 // constructor then Init. 368 TEST_F(IPCSyncChannelTest, TwoStepInitialization) { 369 TwoStep(false, false); 370 TwoStep(false, true); 371 TwoStep(true, false); 372 TwoStep(true, true); 373 } 374 375 //------------------------------------------------------------------------------ 376 377 class DelayClient : public Worker { 378 public: 379 DelayClient() : Worker(Channel::MODE_CLIENT, "delay_client") { } 380 381 virtual void OnAnswerDelay(Message* reply_msg) OVERRIDE { 382 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42); 383 Send(reply_msg); 384 Done(); 385 } 386 }; 387 388 void DelayReply(bool pump_during_send) { 389 std::vector<Worker*> workers; 390 workers.push_back(new SimpleServer(pump_during_send)); 391 workers.push_back(new DelayClient()); 392 RunTest(workers); 393 } 394 395 // Tests that asynchronous replies work 396 TEST_F(IPCSyncChannelTest, DelayReply) { 397 DelayReply(false); 398 DelayReply(true); 399 } 400 401 //------------------------------------------------------------------------------ 402 403 class NoHangServer : public Worker { 404 public: 405 NoHangServer(WaitableEvent* got_first_reply, bool pump_during_send) 406 : Worker(Channel::MODE_SERVER, "no_hang_server"), 407 got_first_reply_(got_first_reply), 408 pump_during_send_(pump_during_send) { } 409 virtual void Run() OVERRIDE { 410 SendAnswerToLife(pump_during_send_, true); 411 got_first_reply_->Signal(); 412 413 SendAnswerToLife(pump_during_send_, false); 414 Done(); 415 } 416 417 WaitableEvent* got_first_reply_; 418 bool pump_during_send_; 419 }; 420 421 class NoHangClient : public Worker { 422 public: 423 explicit NoHangClient(WaitableEvent* got_first_reply) 424 : Worker(Channel::MODE_CLIENT, "no_hang_client"), 425 got_first_reply_(got_first_reply) { } 426 427 virtual void OnAnswerDelay(Message* reply_msg) OVERRIDE { 428 // Use the DELAY_REPLY macro so that we can force the reply to be sent 429 // before this function returns (when the channel will be reset). 430 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42); 431 Send(reply_msg); 432 got_first_reply_->Wait(); 433 CloseChannel(); 434 Done(); 435 } 436 437 WaitableEvent* got_first_reply_; 438 }; 439 440 void NoHang(bool pump_during_send) { 441 WaitableEvent got_first_reply(false, false); 442 std::vector<Worker*> workers; 443 workers.push_back(new NoHangServer(&got_first_reply, pump_during_send)); 444 workers.push_back(new NoHangClient(&got_first_reply)); 445 RunTest(workers); 446 } 447 448 // Tests that caller doesn't hang if receiver dies 449 TEST_F(IPCSyncChannelTest, NoHang) { 450 NoHang(false); 451 NoHang(true); 452 } 453 454 //------------------------------------------------------------------------------ 455 456 class UnblockServer : public Worker { 457 public: 458 UnblockServer(bool pump_during_send, bool delete_during_send) 459 : Worker(Channel::MODE_SERVER, "unblock_server"), 460 pump_during_send_(pump_during_send), 461 delete_during_send_(delete_during_send) { } 462 virtual void Run() OVERRIDE { 463 if (delete_during_send_) { 464 // Use custom code since race conditions mean the answer may or may not be 465 // available. 466 int answer = 0; 467 SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer); 468 if (pump_during_send_) 469 msg->EnableMessagePumping(); 470 Send(msg); 471 } else { 472 SendAnswerToLife(pump_during_send_, true); 473 } 474 Done(); 475 } 476 477 virtual void OnDoubleDelay(int in, Message* reply_msg) OVERRIDE { 478 SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2); 479 Send(reply_msg); 480 if (delete_during_send_) 481 ResetChannel(); 482 } 483 484 bool pump_during_send_; 485 bool delete_during_send_; 486 }; 487 488 class UnblockClient : public Worker { 489 public: 490 explicit UnblockClient(bool pump_during_send) 491 : Worker(Channel::MODE_CLIENT, "unblock_client"), 492 pump_during_send_(pump_during_send) { } 493 494 virtual void OnAnswer(int* answer) OVERRIDE { 495 SendDouble(pump_during_send_, true); 496 *answer = 42; 497 Done(); 498 } 499 500 bool pump_during_send_; 501 }; 502 503 void Unblock(bool server_pump, bool client_pump, bool delete_during_send) { 504 std::vector<Worker*> workers; 505 workers.push_back(new UnblockServer(server_pump, delete_during_send)); 506 workers.push_back(new UnblockClient(client_pump)); 507 RunTest(workers); 508 } 509 510 // Tests that the caller unblocks to answer a sync message from the receiver. 511 TEST_F(IPCSyncChannelTest, Unblock) { 512 Unblock(false, false, false); 513 Unblock(false, true, false); 514 Unblock(true, false, false); 515 Unblock(true, true, false); 516 } 517 518 //------------------------------------------------------------------------------ 519 520 // Tests that the the SyncChannel object can be deleted during a Send. 521 TEST_F(IPCSyncChannelTest, ChannelDeleteDuringSend) { 522 Unblock(false, false, true); 523 Unblock(false, true, true); 524 Unblock(true, false, true); 525 Unblock(true, true, true); 526 } 527 528 //------------------------------------------------------------------------------ 529 530 class RecursiveServer : public Worker { 531 public: 532 RecursiveServer(bool expected_send_result, bool pump_first, bool pump_second) 533 : Worker(Channel::MODE_SERVER, "recursive_server"), 534 expected_send_result_(expected_send_result), 535 pump_first_(pump_first), pump_second_(pump_second) {} 536 virtual void Run() OVERRIDE { 537 SendDouble(pump_first_, expected_send_result_); 538 Done(); 539 } 540 541 virtual void OnDouble(int in, int* out) OVERRIDE { 542 *out = in * 2; 543 SendAnswerToLife(pump_second_, expected_send_result_); 544 } 545 546 bool expected_send_result_, pump_first_, pump_second_; 547 }; 548 549 class RecursiveClient : public Worker { 550 public: 551 RecursiveClient(bool pump_during_send, bool close_channel) 552 : Worker(Channel::MODE_CLIENT, "recursive_client"), 553 pump_during_send_(pump_during_send), close_channel_(close_channel) {} 554 555 virtual void OnDoubleDelay(int in, Message* reply_msg) OVERRIDE { 556 SendDouble(pump_during_send_, !close_channel_); 557 if (close_channel_) { 558 delete reply_msg; 559 } else { 560 SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2); 561 Send(reply_msg); 562 } 563 Done(); 564 } 565 566 virtual void OnAnswerDelay(Message* reply_msg) OVERRIDE { 567 if (close_channel_) { 568 delete reply_msg; 569 CloseChannel(); 570 } else { 571 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42); 572 Send(reply_msg); 573 } 574 } 575 576 bool pump_during_send_, close_channel_; 577 }; 578 579 void Recursive( 580 bool server_pump_first, bool server_pump_second, bool client_pump) { 581 std::vector<Worker*> workers; 582 workers.push_back( 583 new RecursiveServer(true, server_pump_first, server_pump_second)); 584 workers.push_back(new RecursiveClient(client_pump, false)); 585 RunTest(workers); 586 } 587 588 // Tests a server calling Send while another Send is pending. 589 TEST_F(IPCSyncChannelTest, Recursive) { 590 Recursive(false, false, false); 591 Recursive(false, false, true); 592 Recursive(false, true, false); 593 Recursive(false, true, true); 594 Recursive(true, false, false); 595 Recursive(true, false, true); 596 Recursive(true, true, false); 597 Recursive(true, true, true); 598 } 599 600 //------------------------------------------------------------------------------ 601 602 void RecursiveNoHang( 603 bool server_pump_first, bool server_pump_second, bool client_pump) { 604 std::vector<Worker*> workers; 605 workers.push_back( 606 new RecursiveServer(false, server_pump_first, server_pump_second)); 607 workers.push_back(new RecursiveClient(client_pump, true)); 608 RunTest(workers); 609 } 610 611 // Tests that if a caller makes a sync call during an existing sync call and 612 // the receiver dies, neither of the Send() calls hang. 613 TEST_F(IPCSyncChannelTest, RecursiveNoHang) { 614 RecursiveNoHang(false, false, false); 615 RecursiveNoHang(false, false, true); 616 RecursiveNoHang(false, true, false); 617 RecursiveNoHang(false, true, true); 618 RecursiveNoHang(true, false, false); 619 RecursiveNoHang(true, false, true); 620 RecursiveNoHang(true, true, false); 621 RecursiveNoHang(true, true, true); 622 } 623 624 //------------------------------------------------------------------------------ 625 626 class MultipleServer1 : public Worker { 627 public: 628 explicit MultipleServer1(bool pump_during_send) 629 : Worker("test_channel1", Channel::MODE_SERVER), 630 pump_during_send_(pump_during_send) { } 631 632 virtual void Run() OVERRIDE { 633 SendDouble(pump_during_send_, true); 634 Done(); 635 } 636 637 bool pump_during_send_; 638 }; 639 640 class MultipleClient1 : public Worker { 641 public: 642 MultipleClient1(WaitableEvent* client1_msg_received, 643 WaitableEvent* client1_can_reply) : 644 Worker("test_channel1", Channel::MODE_CLIENT), 645 client1_msg_received_(client1_msg_received), 646 client1_can_reply_(client1_can_reply) { } 647 648 virtual void OnDouble(int in, int* out) OVERRIDE { 649 client1_msg_received_->Signal(); 650 *out = in * 2; 651 client1_can_reply_->Wait(); 652 Done(); 653 } 654 655 private: 656 WaitableEvent *client1_msg_received_, *client1_can_reply_; 657 }; 658 659 class MultipleServer2 : public Worker { 660 public: 661 MultipleServer2() : Worker("test_channel2", Channel::MODE_SERVER) { } 662 663 virtual void OnAnswer(int* result) OVERRIDE { 664 *result = 42; 665 Done(); 666 } 667 }; 668 669 class MultipleClient2 : public Worker { 670 public: 671 MultipleClient2( 672 WaitableEvent* client1_msg_received, WaitableEvent* client1_can_reply, 673 bool pump_during_send) 674 : Worker("test_channel2", Channel::MODE_CLIENT), 675 client1_msg_received_(client1_msg_received), 676 client1_can_reply_(client1_can_reply), 677 pump_during_send_(pump_during_send) { } 678 679 virtual void Run() OVERRIDE { 680 client1_msg_received_->Wait(); 681 SendAnswerToLife(pump_during_send_, true); 682 client1_can_reply_->Signal(); 683 Done(); 684 } 685 686 private: 687 WaitableEvent *client1_msg_received_, *client1_can_reply_; 688 bool pump_during_send_; 689 }; 690 691 void Multiple(bool server_pump, bool client_pump) { 692 std::vector<Worker*> workers; 693 694 // A shared worker thread so that server1 and server2 run on one thread. 695 base::Thread worker_thread("Multiple"); 696 ASSERT_TRUE(worker_thread.Start()); 697 698 // Server1 sends a sync msg to client1, which blocks the reply until 699 // server2 (which runs on the same worker thread as server1) responds 700 // to a sync msg from client2. 701 WaitableEvent client1_msg_received(false, false); 702 WaitableEvent client1_can_reply(false, false); 703 704 Worker* worker; 705 706 worker = new MultipleServer2(); 707 worker->OverrideThread(&worker_thread); 708 workers.push_back(worker); 709 710 worker = new MultipleClient2( 711 &client1_msg_received, &client1_can_reply, client_pump); 712 workers.push_back(worker); 713 714 worker = new MultipleServer1(server_pump); 715 worker->OverrideThread(&worker_thread); 716 workers.push_back(worker); 717 718 worker = new MultipleClient1( 719 &client1_msg_received, &client1_can_reply); 720 workers.push_back(worker); 721 722 RunTest(workers); 723 } 724 725 // Tests that multiple SyncObjects on the same listener thread can unblock each 726 // other. 727 TEST_F(IPCSyncChannelTest, Multiple) { 728 Multiple(false, false); 729 Multiple(false, true); 730 Multiple(true, false); 731 Multiple(true, true); 732 } 733 734 //------------------------------------------------------------------------------ 735 736 // This class provides server side functionality to test the case where 737 // multiple sync channels are in use on the same thread on the client and 738 // nested calls are issued. 739 class QueuedReplyServer : public Worker { 740 public: 741 QueuedReplyServer(base::Thread* listener_thread, 742 const std::string& channel_name, 743 const std::string& reply_text) 744 : Worker(channel_name, Channel::MODE_SERVER), 745 reply_text_(reply_text) { 746 Worker::OverrideThread(listener_thread); 747 } 748 749 virtual void OnNestedTestMsg(Message* reply_msg) OVERRIDE { 750 VLOG(1) << __FUNCTION__ << " Sending reply: " << reply_text_; 751 SyncChannelNestedTestMsg_String::WriteReplyParams(reply_msg, reply_text_); 752 Send(reply_msg); 753 Done(); 754 } 755 756 private: 757 std::string reply_text_; 758 }; 759 760 // The QueuedReplyClient class provides functionality to test the case where 761 // multiple sync channels are in use on the same thread and they make nested 762 // sync calls, i.e. while the first channel waits for a response it makes a 763 // sync call on another channel. 764 // The callstack should unwind correctly, i.e. the outermost call should 765 // complete first, and so on. 766 class QueuedReplyClient : public Worker { 767 public: 768 QueuedReplyClient(base::Thread* listener_thread, 769 const std::string& channel_name, 770 const std::string& expected_text, 771 bool pump_during_send) 772 : Worker(channel_name, Channel::MODE_CLIENT), 773 pump_during_send_(pump_during_send), 774 expected_text_(expected_text) { 775 Worker::OverrideThread(listener_thread); 776 } 777 778 virtual void Run() OVERRIDE { 779 std::string response; 780 SyncMessage* msg = new SyncChannelNestedTestMsg_String(&response); 781 if (pump_during_send_) 782 msg->EnableMessagePumping(); 783 bool result = Send(msg); 784 DCHECK(result); 785 DCHECK_EQ(response, expected_text_); 786 787 VLOG(1) << __FUNCTION__ << " Received reply: " << response; 788 Done(); 789 } 790 791 private: 792 bool pump_during_send_; 793 std::string expected_text_; 794 }; 795 796 void QueuedReply(bool client_pump) { 797 std::vector<Worker*> workers; 798 799 // A shared worker thread for servers 800 base::Thread server_worker_thread("QueuedReply_ServerListener"); 801 ASSERT_TRUE(server_worker_thread.Start()); 802 803 base::Thread client_worker_thread("QueuedReply_ClientListener"); 804 ASSERT_TRUE(client_worker_thread.Start()); 805 806 Worker* worker; 807 808 worker = new QueuedReplyServer(&server_worker_thread, 809 "QueuedReply_Server1", 810 "Got first message"); 811 workers.push_back(worker); 812 813 worker = new QueuedReplyServer(&server_worker_thread, 814 "QueuedReply_Server2", 815 "Got second message"); 816 workers.push_back(worker); 817 818 worker = new QueuedReplyClient(&client_worker_thread, 819 "QueuedReply_Server1", 820 "Got first message", 821 client_pump); 822 workers.push_back(worker); 823 824 worker = new QueuedReplyClient(&client_worker_thread, 825 "QueuedReply_Server2", 826 "Got second message", 827 client_pump); 828 workers.push_back(worker); 829 830 RunTest(workers); 831 } 832 833 // While a blocking send is in progress, the listener thread might answer other 834 // synchronous messages. This tests that if during the response to another 835 // message the reply to the original messages comes, it is queued up correctly 836 // and the original Send is unblocked later. 837 // We also test that the send call stacks unwind correctly when the channel 838 // pumps messages while waiting for a response. 839 TEST_F(IPCSyncChannelTest, QueuedReply) { 840 QueuedReply(false); 841 QueuedReply(true); 842 } 843 844 //------------------------------------------------------------------------------ 845 846 class ChattyClient : public Worker { 847 public: 848 ChattyClient() : 849 Worker(Channel::MODE_CLIENT, "chatty_client") { } 850 851 virtual void OnAnswer(int* answer) OVERRIDE { 852 // The PostMessage limit is 10k. Send 20% more than that. 853 const int kMessageLimit = 10000; 854 const int kMessagesToSend = kMessageLimit * 120 / 100; 855 for (int i = 0; i < kMessagesToSend; ++i) { 856 if (!SendDouble(false, true)) 857 break; 858 } 859 *answer = 42; 860 Done(); 861 } 862 }; 863 864 void ChattyServer(bool pump_during_send) { 865 std::vector<Worker*> workers; 866 workers.push_back(new UnblockServer(pump_during_send, false)); 867 workers.push_back(new ChattyClient()); 868 RunTest(workers); 869 } 870 871 // Tests http://b/1093251 - that sending lots of sync messages while 872 // the receiver is waiting for a sync reply does not overflow the PostMessage 873 // queue. 874 TEST_F(IPCSyncChannelTest, ChattyServer) { 875 ChattyServer(false); 876 ChattyServer(true); 877 } 878 879 //------------------------------------------------------------------------------ 880 881 void NestedCallback(Worker* server) { 882 // Sleep a bit so that we wake up after the reply has been received. 883 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(250)); 884 server->SendAnswerToLife(true, true); 885 } 886 887 bool timeout_occurred = false; 888 889 void TimeoutCallback() { 890 timeout_occurred = true; 891 } 892 893 class DoneEventRaceServer : public Worker { 894 public: 895 DoneEventRaceServer() 896 : Worker(Channel::MODE_SERVER, "done_event_race_server") { } 897 898 virtual void Run() OVERRIDE { 899 base::MessageLoop::current()->PostTask(FROM_HERE, 900 base::Bind(&NestedCallback, this)); 901 base::MessageLoop::current()->PostDelayedTask( 902 FROM_HERE, 903 base::Bind(&TimeoutCallback), 904 base::TimeDelta::FromSeconds(9)); 905 // Even though we have a timeout on the Send, it will succeed since for this 906 // bug, the reply message comes back and is deserialized, however the done 907 // event wasn't set. So we indirectly use the timeout task to notice if a 908 // timeout occurred. 909 SendAnswerToLife(true, true); 910 DCHECK(!timeout_occurred); 911 Done(); 912 } 913 }; 914 915 // Tests http://b/1474092 - that if after the done_event is set but before 916 // OnObjectSignaled is called another message is sent out, then after its 917 // reply comes back OnObjectSignaled will be called for the first message. 918 TEST_F(IPCSyncChannelTest, DoneEventRace) { 919 std::vector<Worker*> workers; 920 workers.push_back(new DoneEventRaceServer()); 921 workers.push_back(new SimpleClient()); 922 RunTest(workers); 923 } 924 925 //------------------------------------------------------------------------------ 926 927 class TestSyncMessageFilter : public SyncMessageFilter { 928 public: 929 TestSyncMessageFilter(base::WaitableEvent* shutdown_event, 930 Worker* worker, 931 scoped_refptr<base::MessageLoopProxy> message_loop) 932 : SyncMessageFilter(shutdown_event), 933 worker_(worker), 934 message_loop_(message_loop) { 935 } 936 937 virtual void OnFilterAdded(Sender* sender) OVERRIDE { 938 SyncMessageFilter::OnFilterAdded(sender); 939 message_loop_->PostTask( 940 FROM_HERE, 941 base::Bind(&TestSyncMessageFilter::SendMessageOnHelperThread, this)); 942 } 943 944 void SendMessageOnHelperThread() { 945 int answer = 0; 946 bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer)); 947 DCHECK(result); 948 DCHECK_EQ(answer, 42); 949 950 worker_->Done(); 951 } 952 953 private: 954 virtual ~TestSyncMessageFilter() {} 955 956 Worker* worker_; 957 scoped_refptr<base::MessageLoopProxy> message_loop_; 958 }; 959 960 class SyncMessageFilterServer : public Worker { 961 public: 962 SyncMessageFilterServer() 963 : Worker(Channel::MODE_SERVER, "sync_message_filter_server"), 964 thread_("helper_thread") { 965 base::Thread::Options options; 966 options.message_loop_type = base::MessageLoop::TYPE_DEFAULT; 967 thread_.StartWithOptions(options); 968 filter_ = new TestSyncMessageFilter(shutdown_event(), this, 969 thread_.message_loop_proxy()); 970 } 971 972 virtual void Run() OVERRIDE { 973 channel()->AddFilter(filter_.get()); 974 } 975 976 base::Thread thread_; 977 scoped_refptr<TestSyncMessageFilter> filter_; 978 }; 979 980 // This class provides functionality to test the case that a Send on the sync 981 // channel does not crash after the channel has been closed. 982 class ServerSendAfterClose : public Worker { 983 public: 984 ServerSendAfterClose() 985 : Worker(Channel::MODE_SERVER, "simpler_server"), 986 send_result_(true) { 987 } 988 989 bool SendDummy() { 990 ListenerThread()->message_loop()->PostTask( 991 FROM_HERE, base::Bind(base::IgnoreResult(&ServerSendAfterClose::Send), 992 this, new SyncChannelTestMsg_NoArgs)); 993 return true; 994 } 995 996 bool send_result() const { 997 return send_result_; 998 } 999 1000 private: 1001 virtual void Run() OVERRIDE { 1002 CloseChannel(); 1003 Done(); 1004 } 1005 1006 virtual bool Send(Message* msg) OVERRIDE { 1007 send_result_ = Worker::Send(msg); 1008 Done(); 1009 return send_result_; 1010 } 1011 1012 bool send_result_; 1013 }; 1014 1015 // Tests basic synchronous call 1016 TEST_F(IPCSyncChannelTest, SyncMessageFilter) { 1017 std::vector<Worker*> workers; 1018 workers.push_back(new SyncMessageFilterServer()); 1019 workers.push_back(new SimpleClient()); 1020 RunTest(workers); 1021 } 1022 1023 // Test the case when the channel is closed and a Send is attempted after that. 1024 TEST_F(IPCSyncChannelTest, SendAfterClose) { 1025 ServerSendAfterClose server; 1026 server.Start(); 1027 1028 server.done_event()->Wait(); 1029 server.done_event()->Reset(); 1030 1031 server.SendDummy(); 1032 server.done_event()->Wait(); 1033 1034 EXPECT_FALSE(server.send_result()); 1035 1036 server.Shutdown(); 1037 } 1038 1039 //------------------------------------------------------------------------------ 1040 1041 class RestrictedDispatchServer : public Worker { 1042 public: 1043 RestrictedDispatchServer(WaitableEvent* sent_ping_event, 1044 WaitableEvent* wait_event) 1045 : Worker("restricted_channel", Channel::MODE_SERVER), 1046 sent_ping_event_(sent_ping_event), 1047 wait_event_(wait_event) { } 1048 1049 void OnDoPing(int ping) { 1050 // Send an asynchronous message that unblocks the caller. 1051 Message* msg = new SyncChannelTestMsg_Ping(ping); 1052 msg->set_unblock(true); 1053 Send(msg); 1054 // Signal the event after the message has been sent on the channel, on the 1055 // IPC thread. 1056 ipc_thread().message_loop()->PostTask( 1057 FROM_HERE, base::Bind(&RestrictedDispatchServer::OnPingSent, this)); 1058 } 1059 1060 void OnPingTTL(int ping, int* out) { 1061 *out = ping; 1062 wait_event_->Wait(); 1063 } 1064 1065 base::Thread* ListenerThread() { return Worker::ListenerThread(); } 1066 1067 private: 1068 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1069 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchServer, message) 1070 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1071 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL) 1072 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done) 1073 IPC_END_MESSAGE_MAP() 1074 return true; 1075 } 1076 1077 void OnPingSent() { 1078 sent_ping_event_->Signal(); 1079 } 1080 1081 void OnNoArgs() { } 1082 WaitableEvent* sent_ping_event_; 1083 WaitableEvent* wait_event_; 1084 }; 1085 1086 class NonRestrictedDispatchServer : public Worker { 1087 public: 1088 NonRestrictedDispatchServer(WaitableEvent* signal_event) 1089 : Worker("non_restricted_channel", Channel::MODE_SERVER), 1090 signal_event_(signal_event) {} 1091 1092 base::Thread* ListenerThread() { return Worker::ListenerThread(); } 1093 1094 void OnDoPingTTL(int ping) { 1095 int value = 0; 1096 Send(new SyncChannelTestMsg_PingTTL(ping, &value)); 1097 signal_event_->Signal(); 1098 } 1099 1100 private: 1101 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1102 IPC_BEGIN_MESSAGE_MAP(NonRestrictedDispatchServer, message) 1103 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1104 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done) 1105 IPC_END_MESSAGE_MAP() 1106 return true; 1107 } 1108 1109 void OnNoArgs() { } 1110 WaitableEvent* signal_event_; 1111 }; 1112 1113 class RestrictedDispatchClient : public Worker { 1114 public: 1115 RestrictedDispatchClient(WaitableEvent* sent_ping_event, 1116 RestrictedDispatchServer* server, 1117 NonRestrictedDispatchServer* server2, 1118 int* success) 1119 : Worker("restricted_channel", Channel::MODE_CLIENT), 1120 ping_(0), 1121 server_(server), 1122 server2_(server2), 1123 success_(success), 1124 sent_ping_event_(sent_ping_event) {} 1125 1126 virtual void Run() OVERRIDE { 1127 // Incoming messages from our channel should only be dispatched when we 1128 // send a message on that same channel. 1129 channel()->SetRestrictDispatchChannelGroup(1); 1130 1131 server_->ListenerThread()->message_loop()->PostTask( 1132 FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing, server_, 1)); 1133 sent_ping_event_->Wait(); 1134 Send(new SyncChannelTestMsg_NoArgs); 1135 if (ping_ == 1) 1136 ++*success_; 1137 else 1138 LOG(ERROR) << "Send failed to dispatch incoming message on same channel"; 1139 1140 non_restricted_channel_ = 1141 SyncChannel::Create("non_restricted_channel", 1142 IPC::Channel::MODE_CLIENT, 1143 this, 1144 ipc_thread().message_loop_proxy().get(), 1145 true, 1146 shutdown_event()); 1147 1148 server_->ListenerThread()->message_loop()->PostTask( 1149 FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing, server_, 2)); 1150 sent_ping_event_->Wait(); 1151 // Check that the incoming message is *not* dispatched when sending on the 1152 // non restricted channel. 1153 // TODO(piman): there is a possibility of a false positive race condition 1154 // here, if the message that was posted on the server-side end of the pipe 1155 // is not visible yet on the client side, but I don't know how to solve this 1156 // without hooking into the internals of SyncChannel. I haven't seen it in 1157 // practice (i.e. not setting SetRestrictDispatchToSameChannel does cause 1158 // the following to fail). 1159 non_restricted_channel_->Send(new SyncChannelTestMsg_NoArgs); 1160 if (ping_ == 1) 1161 ++*success_; 1162 else 1163 LOG(ERROR) << "Send dispatched message from restricted channel"; 1164 1165 Send(new SyncChannelTestMsg_NoArgs); 1166 if (ping_ == 2) 1167 ++*success_; 1168 else 1169 LOG(ERROR) << "Send failed to dispatch incoming message on same channel"; 1170 1171 // Check that the incoming message on the non-restricted channel is 1172 // dispatched when sending on the restricted channel. 1173 server2_->ListenerThread()->message_loop()->PostTask( 1174 FROM_HERE, 1175 base::Bind(&NonRestrictedDispatchServer::OnDoPingTTL, server2_, 3)); 1176 int value = 0; 1177 Send(new SyncChannelTestMsg_PingTTL(4, &value)); 1178 if (ping_ == 3 && value == 4) 1179 ++*success_; 1180 else 1181 LOG(ERROR) << "Send failed to dispatch message from unrestricted channel"; 1182 1183 non_restricted_channel_->Send(new SyncChannelTestMsg_Done); 1184 non_restricted_channel_.reset(); 1185 Send(new SyncChannelTestMsg_Done); 1186 Done(); 1187 } 1188 1189 private: 1190 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1191 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchClient, message) 1192 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Ping, OnPing) 1193 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_PingTTL, OnPingTTL) 1194 IPC_END_MESSAGE_MAP() 1195 return true; 1196 } 1197 1198 void OnPing(int ping) { 1199 ping_ = ping; 1200 } 1201 1202 void OnPingTTL(int ping, IPC::Message* reply) { 1203 ping_ = ping; 1204 // This message comes from the NonRestrictedDispatchServer, we have to send 1205 // the reply back manually. 1206 SyncChannelTestMsg_PingTTL::WriteReplyParams(reply, ping); 1207 non_restricted_channel_->Send(reply); 1208 } 1209 1210 int ping_; 1211 RestrictedDispatchServer* server_; 1212 NonRestrictedDispatchServer* server2_; 1213 int* success_; 1214 WaitableEvent* sent_ping_event_; 1215 scoped_ptr<SyncChannel> non_restricted_channel_; 1216 }; 1217 1218 TEST_F(IPCSyncChannelTest, RestrictedDispatch) { 1219 WaitableEvent sent_ping_event(false, false); 1220 WaitableEvent wait_event(false, false); 1221 RestrictedDispatchServer* server = 1222 new RestrictedDispatchServer(&sent_ping_event, &wait_event); 1223 NonRestrictedDispatchServer* server2 = 1224 new NonRestrictedDispatchServer(&wait_event); 1225 1226 int success = 0; 1227 std::vector<Worker*> workers; 1228 workers.push_back(server); 1229 workers.push_back(server2); 1230 workers.push_back(new RestrictedDispatchClient( 1231 &sent_ping_event, server, server2, &success)); 1232 RunTest(workers); 1233 EXPECT_EQ(4, success); 1234 } 1235 1236 //------------------------------------------------------------------------------ 1237 1238 // This test case inspired by crbug.com/108491 1239 // We create two servers that use the same ListenerThread but have 1240 // SetRestrictDispatchToSameChannel set to true. 1241 // We create clients, then use some specific WaitableEvent wait/signalling to 1242 // ensure that messages get dispatched in a way that causes a deadlock due to 1243 // a nested dispatch and an eligible message in a higher-level dispatch's 1244 // delayed_queue. Specifically, we start with client1 about so send an 1245 // unblocking message to server1, while the shared listener thread for the 1246 // servers server1 and server2 is about to send a non-unblocking message to 1247 // client1. At the same time, client2 will be about to send an unblocking 1248 // message to server2. Server1 will handle the client1->server1 message by 1249 // telling server2 to send a non-unblocking message to client2. 1250 // What should happen is that the send to server2 should find the pending, 1251 // same-context client2->server2 message to dispatch, causing client2 to 1252 // unblock then handle the server2->client2 message, so that the shared 1253 // servers' listener thread can then respond to the client1->server1 message. 1254 // Then client1 can handle the non-unblocking server1->client1 message. 1255 // The old code would end up in a state where the server2->client2 message is 1256 // sent, but the client2->server2 message (which is eligible for dispatch, and 1257 // which is what client2 is waiting for) is stashed in a local delayed_queue 1258 // that has server1's channel context, causing a deadlock. 1259 // WaitableEvents in the events array are used to: 1260 // event 0: indicate to client1 that server listener is in OnDoServerTask 1261 // event 1: indicate to client1 that client2 listener is in OnDoClient2Task 1262 // event 2: indicate to server1 that client2 listener is in OnDoClient2Task 1263 // event 3: indicate to client2 that server listener is in OnDoServerTask 1264 1265 class RestrictedDispatchDeadlockServer : public Worker { 1266 public: 1267 RestrictedDispatchDeadlockServer(int server_num, 1268 WaitableEvent* server_ready_event, 1269 WaitableEvent** events, 1270 RestrictedDispatchDeadlockServer* peer) 1271 : Worker(server_num == 1 ? "channel1" : "channel2", Channel::MODE_SERVER), 1272 server_num_(server_num), 1273 server_ready_event_(server_ready_event), 1274 events_(events), 1275 peer_(peer) { } 1276 1277 void OnDoServerTask() { 1278 events_[3]->Signal(); 1279 events_[2]->Wait(); 1280 events_[0]->Signal(); 1281 SendMessageToClient(); 1282 } 1283 1284 virtual void Run() OVERRIDE { 1285 channel()->SetRestrictDispatchChannelGroup(1); 1286 server_ready_event_->Signal(); 1287 } 1288 1289 base::Thread* ListenerThread() { return Worker::ListenerThread(); } 1290 1291 private: 1292 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1293 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockServer, message) 1294 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1295 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done) 1296 IPC_END_MESSAGE_MAP() 1297 return true; 1298 } 1299 1300 void OnNoArgs() { 1301 if (server_num_ == 1) { 1302 DCHECK(peer_ != NULL); 1303 peer_->SendMessageToClient(); 1304 } 1305 } 1306 1307 void SendMessageToClient() { 1308 Message* msg = new SyncChannelTestMsg_NoArgs; 1309 msg->set_unblock(false); 1310 DCHECK(!msg->should_unblock()); 1311 Send(msg); 1312 } 1313 1314 int server_num_; 1315 WaitableEvent* server_ready_event_; 1316 WaitableEvent** events_; 1317 RestrictedDispatchDeadlockServer* peer_; 1318 }; 1319 1320 class RestrictedDispatchDeadlockClient2 : public Worker { 1321 public: 1322 RestrictedDispatchDeadlockClient2(RestrictedDispatchDeadlockServer* server, 1323 WaitableEvent* server_ready_event, 1324 WaitableEvent** events) 1325 : Worker("channel2", Channel::MODE_CLIENT), 1326 server_ready_event_(server_ready_event), 1327 events_(events), 1328 received_msg_(false), 1329 received_noarg_reply_(false), 1330 done_issued_(false) {} 1331 1332 virtual void Run() OVERRIDE { 1333 server_ready_event_->Wait(); 1334 } 1335 1336 void OnDoClient2Task() { 1337 events_[3]->Wait(); 1338 events_[1]->Signal(); 1339 events_[2]->Signal(); 1340 DCHECK(received_msg_ == false); 1341 1342 Message* message = new SyncChannelTestMsg_NoArgs; 1343 message->set_unblock(true); 1344 Send(message); 1345 received_noarg_reply_ = true; 1346 } 1347 1348 base::Thread* ListenerThread() { return Worker::ListenerThread(); } 1349 private: 1350 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1351 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient2, message) 1352 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1353 IPC_END_MESSAGE_MAP() 1354 return true; 1355 } 1356 1357 void OnNoArgs() { 1358 received_msg_ = true; 1359 PossiblyDone(); 1360 } 1361 1362 void PossiblyDone() { 1363 if (received_noarg_reply_ && received_msg_) { 1364 DCHECK(done_issued_ == false); 1365 done_issued_ = true; 1366 Send(new SyncChannelTestMsg_Done); 1367 Done(); 1368 } 1369 } 1370 1371 WaitableEvent* server_ready_event_; 1372 WaitableEvent** events_; 1373 bool received_msg_; 1374 bool received_noarg_reply_; 1375 bool done_issued_; 1376 }; 1377 1378 class RestrictedDispatchDeadlockClient1 : public Worker { 1379 public: 1380 RestrictedDispatchDeadlockClient1(RestrictedDispatchDeadlockServer* server, 1381 RestrictedDispatchDeadlockClient2* peer, 1382 WaitableEvent* server_ready_event, 1383 WaitableEvent** events) 1384 : Worker("channel1", Channel::MODE_CLIENT), 1385 server_(server), 1386 peer_(peer), 1387 server_ready_event_(server_ready_event), 1388 events_(events), 1389 received_msg_(false), 1390 received_noarg_reply_(false), 1391 done_issued_(false) {} 1392 1393 virtual void Run() OVERRIDE { 1394 server_ready_event_->Wait(); 1395 server_->ListenerThread()->message_loop()->PostTask( 1396 FROM_HERE, 1397 base::Bind(&RestrictedDispatchDeadlockServer::OnDoServerTask, server_)); 1398 peer_->ListenerThread()->message_loop()->PostTask( 1399 FROM_HERE, 1400 base::Bind(&RestrictedDispatchDeadlockClient2::OnDoClient2Task, peer_)); 1401 events_[0]->Wait(); 1402 events_[1]->Wait(); 1403 DCHECK(received_msg_ == false); 1404 1405 Message* message = new SyncChannelTestMsg_NoArgs; 1406 message->set_unblock(true); 1407 Send(message); 1408 received_noarg_reply_ = true; 1409 PossiblyDone(); 1410 } 1411 1412 private: 1413 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1414 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient1, message) 1415 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1416 IPC_END_MESSAGE_MAP() 1417 return true; 1418 } 1419 1420 void OnNoArgs() { 1421 received_msg_ = true; 1422 PossiblyDone(); 1423 } 1424 1425 void PossiblyDone() { 1426 if (received_noarg_reply_ && received_msg_) { 1427 DCHECK(done_issued_ == false); 1428 done_issued_ = true; 1429 Send(new SyncChannelTestMsg_Done); 1430 Done(); 1431 } 1432 } 1433 1434 RestrictedDispatchDeadlockServer* server_; 1435 RestrictedDispatchDeadlockClient2* peer_; 1436 WaitableEvent* server_ready_event_; 1437 WaitableEvent** events_; 1438 bool received_msg_; 1439 bool received_noarg_reply_; 1440 bool done_issued_; 1441 }; 1442 1443 TEST_F(IPCSyncChannelTest, RestrictedDispatchDeadlock) { 1444 std::vector<Worker*> workers; 1445 1446 // A shared worker thread so that server1 and server2 run on one thread. 1447 base::Thread worker_thread("RestrictedDispatchDeadlock"); 1448 ASSERT_TRUE(worker_thread.Start()); 1449 1450 WaitableEvent server1_ready(false, false); 1451 WaitableEvent server2_ready(false, false); 1452 1453 WaitableEvent event0(false, false); 1454 WaitableEvent event1(false, false); 1455 WaitableEvent event2(false, false); 1456 WaitableEvent event3(false, false); 1457 WaitableEvent* events[4] = {&event0, &event1, &event2, &event3}; 1458 1459 RestrictedDispatchDeadlockServer* server1; 1460 RestrictedDispatchDeadlockServer* server2; 1461 RestrictedDispatchDeadlockClient1* client1; 1462 RestrictedDispatchDeadlockClient2* client2; 1463 1464 server2 = new RestrictedDispatchDeadlockServer(2, &server2_ready, events, 1465 NULL); 1466 server2->OverrideThread(&worker_thread); 1467 workers.push_back(server2); 1468 1469 client2 = new RestrictedDispatchDeadlockClient2(server2, &server2_ready, 1470 events); 1471 workers.push_back(client2); 1472 1473 server1 = new RestrictedDispatchDeadlockServer(1, &server1_ready, events, 1474 server2); 1475 server1->OverrideThread(&worker_thread); 1476 workers.push_back(server1); 1477 1478 client1 = new RestrictedDispatchDeadlockClient1(server1, client2, 1479 &server1_ready, events); 1480 workers.push_back(client1); 1481 1482 RunTest(workers); 1483 } 1484 1485 //------------------------------------------------------------------------------ 1486 1487 // This test case inspired by crbug.com/120530 1488 // We create 4 workers that pipe to each other W1->W2->W3->W4->W1 then we send a 1489 // message that recurses through 3, 4 or 5 steps to make sure, say, W1 can 1490 // re-enter when called from W4 while it's sending a message to W2. 1491 // The first worker drives the whole test so it must be treated specially. 1492 1493 class RestrictedDispatchPipeWorker : public Worker { 1494 public: 1495 RestrictedDispatchPipeWorker( 1496 const std::string &channel1, 1497 WaitableEvent* event1, 1498 const std::string &channel2, 1499 WaitableEvent* event2, 1500 int group, 1501 int* success) 1502 : Worker(channel1, Channel::MODE_SERVER), 1503 event1_(event1), 1504 event2_(event2), 1505 other_channel_name_(channel2), 1506 group_(group), 1507 success_(success) { 1508 } 1509 1510 void OnPingTTL(int ping, int* ret) { 1511 *ret = 0; 1512 if (!ping) 1513 return; 1514 other_channel_->Send(new SyncChannelTestMsg_PingTTL(ping - 1, ret)); 1515 ++*ret; 1516 } 1517 1518 void OnDone() { 1519 if (is_first()) 1520 return; 1521 other_channel_->Send(new SyncChannelTestMsg_Done); 1522 other_channel_.reset(); 1523 Done(); 1524 } 1525 1526 virtual void Run() OVERRIDE { 1527 channel()->SetRestrictDispatchChannelGroup(group_); 1528 if (is_first()) 1529 event1_->Signal(); 1530 event2_->Wait(); 1531 other_channel_ = 1532 SyncChannel::Create(other_channel_name_, 1533 IPC::Channel::MODE_CLIENT, 1534 this, 1535 ipc_thread().message_loop_proxy().get(), 1536 true, 1537 shutdown_event()); 1538 other_channel_->SetRestrictDispatchChannelGroup(group_); 1539 if (!is_first()) { 1540 event1_->Signal(); 1541 return; 1542 } 1543 *success_ = 0; 1544 int value = 0; 1545 OnPingTTL(3, &value); 1546 *success_ += (value == 3); 1547 OnPingTTL(4, &value); 1548 *success_ += (value == 4); 1549 OnPingTTL(5, &value); 1550 *success_ += (value == 5); 1551 other_channel_->Send(new SyncChannelTestMsg_Done); 1552 other_channel_.reset(); 1553 Done(); 1554 } 1555 1556 bool is_first() { return !!success_; } 1557 1558 private: 1559 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1560 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchPipeWorker, message) 1561 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL) 1562 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, OnDone) 1563 IPC_END_MESSAGE_MAP() 1564 return true; 1565 } 1566 1567 scoped_ptr<SyncChannel> other_channel_; 1568 WaitableEvent* event1_; 1569 WaitableEvent* event2_; 1570 std::string other_channel_name_; 1571 int group_; 1572 int* success_; 1573 }; 1574 1575 TEST_F(IPCSyncChannelTest, RestrictedDispatch4WayDeadlock) { 1576 int success = 0; 1577 std::vector<Worker*> workers; 1578 WaitableEvent event0(true, false); 1579 WaitableEvent event1(true, false); 1580 WaitableEvent event2(true, false); 1581 WaitableEvent event3(true, false); 1582 workers.push_back(new RestrictedDispatchPipeWorker( 1583 "channel0", &event0, "channel1", &event1, 1, &success)); 1584 workers.push_back(new RestrictedDispatchPipeWorker( 1585 "channel1", &event1, "channel2", &event2, 2, NULL)); 1586 workers.push_back(new RestrictedDispatchPipeWorker( 1587 "channel2", &event2, "channel3", &event3, 3, NULL)); 1588 workers.push_back(new RestrictedDispatchPipeWorker( 1589 "channel3", &event3, "channel0", &event0, 4, NULL)); 1590 RunTest(workers); 1591 EXPECT_EQ(3, success); 1592 } 1593 1594 //------------------------------------------------------------------------------ 1595 1596 // This test case inspired by crbug.com/122443 1597 // We want to make sure a reply message with the unblock flag set correctly 1598 // behaves as a reply, not a regular message. 1599 // We have 3 workers. Server1 will send a message to Server2 (which will block), 1600 // during which it will dispatch a message comming from Client, at which point 1601 // it will send another message to Server2. While sending that second message it 1602 // will receive a reply from Server1 with the unblock flag. 1603 1604 class ReentrantReplyServer1 : public Worker { 1605 public: 1606 ReentrantReplyServer1(WaitableEvent* server_ready) 1607 : Worker("reentrant_reply1", Channel::MODE_SERVER), 1608 server_ready_(server_ready) { } 1609 1610 virtual void Run() OVERRIDE { 1611 server2_channel_ = 1612 SyncChannel::Create("reentrant_reply2", 1613 IPC::Channel::MODE_CLIENT, 1614 this, 1615 ipc_thread().message_loop_proxy().get(), 1616 true, 1617 shutdown_event()); 1618 server_ready_->Signal(); 1619 Message* msg = new SyncChannelTestMsg_Reentrant1(); 1620 server2_channel_->Send(msg); 1621 server2_channel_.reset(); 1622 Done(); 1623 } 1624 1625 private: 1626 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1627 IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer1, message) 1628 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant2, OnReentrant2) 1629 IPC_REPLY_HANDLER(OnReply) 1630 IPC_END_MESSAGE_MAP() 1631 return true; 1632 } 1633 1634 void OnReentrant2() { 1635 Message* msg = new SyncChannelTestMsg_Reentrant3(); 1636 server2_channel_->Send(msg); 1637 } 1638 1639 void OnReply(const Message& message) { 1640 // If we get here, the Send() will never receive the reply (thus would 1641 // hang), so abort instead. 1642 LOG(FATAL) << "Reply message was dispatched"; 1643 } 1644 1645 WaitableEvent* server_ready_; 1646 scoped_ptr<SyncChannel> server2_channel_; 1647 }; 1648 1649 class ReentrantReplyServer2 : public Worker { 1650 public: 1651 ReentrantReplyServer2() 1652 : Worker("reentrant_reply2", Channel::MODE_SERVER), 1653 reply_(NULL) { } 1654 1655 private: 1656 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1657 IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer2, message) 1658 IPC_MESSAGE_HANDLER_DELAY_REPLY( 1659 SyncChannelTestMsg_Reentrant1, OnReentrant1) 1660 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant3, OnReentrant3) 1661 IPC_END_MESSAGE_MAP() 1662 return true; 1663 } 1664 1665 void OnReentrant1(Message* reply) { 1666 DCHECK(!reply_); 1667 reply_ = reply; 1668 } 1669 1670 void OnReentrant3() { 1671 DCHECK(reply_); 1672 Message* reply = reply_; 1673 reply_ = NULL; 1674 reply->set_unblock(true); 1675 Send(reply); 1676 Done(); 1677 } 1678 1679 Message* reply_; 1680 }; 1681 1682 class ReentrantReplyClient : public Worker { 1683 public: 1684 ReentrantReplyClient(WaitableEvent* server_ready) 1685 : Worker("reentrant_reply1", Channel::MODE_CLIENT), 1686 server_ready_(server_ready) { } 1687 1688 virtual void Run() OVERRIDE { 1689 server_ready_->Wait(); 1690 Send(new SyncChannelTestMsg_Reentrant2()); 1691 Done(); 1692 } 1693 1694 private: 1695 WaitableEvent* server_ready_; 1696 }; 1697 1698 TEST_F(IPCSyncChannelTest, ReentrantReply) { 1699 std::vector<Worker*> workers; 1700 WaitableEvent server_ready(false, false); 1701 workers.push_back(new ReentrantReplyServer2()); 1702 workers.push_back(new ReentrantReplyServer1(&server_ready)); 1703 workers.push_back(new ReentrantReplyClient(&server_ready)); 1704 RunTest(workers); 1705 } 1706 1707 //------------------------------------------------------------------------------ 1708 1709 // Generate a validated channel ID using Channel::GenerateVerifiedChannelID(). 1710 1711 class VerifiedServer : public Worker { 1712 public: 1713 VerifiedServer(base::Thread* listener_thread, 1714 const std::string& channel_name, 1715 const std::string& reply_text) 1716 : Worker(channel_name, Channel::MODE_SERVER), 1717 reply_text_(reply_text) { 1718 Worker::OverrideThread(listener_thread); 1719 } 1720 1721 virtual void OnNestedTestMsg(Message* reply_msg) OVERRIDE { 1722 VLOG(1) << __FUNCTION__ << " Sending reply: " << reply_text_; 1723 SyncChannelNestedTestMsg_String::WriteReplyParams(reply_msg, reply_text_); 1724 Send(reply_msg); 1725 ASSERT_EQ(channel()->GetPeerPID(), base::GetCurrentProcId()); 1726 Done(); 1727 } 1728 1729 private: 1730 std::string reply_text_; 1731 }; 1732 1733 class VerifiedClient : public Worker { 1734 public: 1735 VerifiedClient(base::Thread* listener_thread, 1736 const std::string& channel_name, 1737 const std::string& expected_text) 1738 : Worker(channel_name, Channel::MODE_CLIENT), 1739 expected_text_(expected_text) { 1740 Worker::OverrideThread(listener_thread); 1741 } 1742 1743 virtual void Run() OVERRIDE { 1744 std::string response; 1745 SyncMessage* msg = new SyncChannelNestedTestMsg_String(&response); 1746 bool result = Send(msg); 1747 DCHECK(result); 1748 DCHECK_EQ(response, expected_text_); 1749 // expected_text_ is only used in the above DCHECK. This line suppresses the 1750 // "unused private field" warning in release builds. 1751 (void)expected_text_; 1752 1753 VLOG(1) << __FUNCTION__ << " Received reply: " << response; 1754 ASSERT_EQ(channel()->GetPeerPID(), base::GetCurrentProcId()); 1755 Done(); 1756 } 1757 1758 private: 1759 std::string expected_text_; 1760 }; 1761 1762 void Verified() { 1763 std::vector<Worker*> workers; 1764 1765 // A shared worker thread for servers 1766 base::Thread server_worker_thread("Verified_ServerListener"); 1767 ASSERT_TRUE(server_worker_thread.Start()); 1768 1769 base::Thread client_worker_thread("Verified_ClientListener"); 1770 ASSERT_TRUE(client_worker_thread.Start()); 1771 1772 std::string channel_id = Channel::GenerateVerifiedChannelID("Verified"); 1773 Worker* worker; 1774 1775 worker = new VerifiedServer(&server_worker_thread, 1776 channel_id, 1777 "Got first message"); 1778 workers.push_back(worker); 1779 1780 worker = new VerifiedClient(&client_worker_thread, 1781 channel_id, 1782 "Got first message"); 1783 workers.push_back(worker); 1784 1785 RunTest(workers); 1786 } 1787 1788 // Windows needs to send an out-of-band secret to verify the client end of the 1789 // channel. Test that we still connect correctly in that case. 1790 TEST_F(IPCSyncChannelTest, Verified) { 1791 Verified(); 1792 } 1793 1794 } // namespace 1795 } // namespace IPC 1796