1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "ipc/ipc_sync_channel.h" 6 7 #include <stddef.h> 8 9 #include <memory> 10 #include <string> 11 #include <utility> 12 #include <vector> 13 14 #include "base/bind.h" 15 #include "base/location.h" 16 #include "base/logging.h" 17 #include "base/macros.h" 18 #include "base/message_loop/message_loop.h" 19 #include "base/process/process_handle.h" 20 #include "base/run_loop.h" 21 #include "base/single_thread_task_runner.h" 22 #include "base/strings/string_util.h" 23 #include "base/synchronization/waitable_event.h" 24 #include "base/threading/platform_thread.h" 25 #include "base/threading/thread.h" 26 #include "base/threading/thread_task_runner_handle.h" 27 #include "build/build_config.h" 28 #include "ipc/ipc_listener.h" 29 #include "ipc/ipc_message.h" 30 #include "ipc/ipc_sender.h" 31 #include "ipc/ipc_sync_message_filter.h" 32 #include "ipc/ipc_sync_message_unittest.h" 33 #include "mojo/public/cpp/system/message_pipe.h" 34 #include "testing/gtest/include/gtest/gtest.h" 35 36 using base::WaitableEvent; 37 38 namespace IPC { 39 namespace { 40 41 // Base class for a "process" with listener and IPC threads. 42 class Worker : public Listener, public Sender { 43 public: 44 // Will create a channel without a name. 45 Worker(Channel::Mode mode, 46 const std::string& thread_name, 47 mojo::ScopedMessagePipeHandle channel_handle) 48 : done_( 49 new WaitableEvent(base::WaitableEvent::ResetPolicy::AUTOMATIC, 50 base::WaitableEvent::InitialState::NOT_SIGNALED)), 51 channel_created_( 52 new WaitableEvent(base::WaitableEvent::ResetPolicy::AUTOMATIC, 53 base::WaitableEvent::InitialState::NOT_SIGNALED)), 54 channel_handle_(std::move(channel_handle)), 55 mode_(mode), 56 ipc_thread_((thread_name + "_ipc").c_str()), 57 listener_thread_((thread_name + "_listener").c_str()), 58 overrided_thread_(NULL), 59 shutdown_event_(base::WaitableEvent::ResetPolicy::MANUAL, 60 base::WaitableEvent::InitialState::NOT_SIGNALED), 61 is_shutdown_(false) {} 62 63 // Will create a named channel and use this name for the threads' name. 64 Worker(mojo::ScopedMessagePipeHandle channel_handle, Channel::Mode mode) 65 : done_( 66 new WaitableEvent(base::WaitableEvent::ResetPolicy::AUTOMATIC, 67 base::WaitableEvent::InitialState::NOT_SIGNALED)), 68 channel_created_( 69 new WaitableEvent(base::WaitableEvent::ResetPolicy::AUTOMATIC, 70 base::WaitableEvent::InitialState::NOT_SIGNALED)), 71 channel_handle_(std::move(channel_handle)), 72 mode_(mode), 73 ipc_thread_("ipc thread"), 74 listener_thread_("listener thread"), 75 overrided_thread_(NULL), 76 shutdown_event_(base::WaitableEvent::ResetPolicy::MANUAL, 77 base::WaitableEvent::InitialState::NOT_SIGNALED), 78 is_shutdown_(false) {} 79 80 ~Worker() override { 81 // Shutdown() must be called before destruction. 82 CHECK(is_shutdown_); 83 } 84 bool Send(Message* msg) override { return channel_->Send(msg); } 85 void WaitForChannelCreation() { channel_created_->Wait(); } 86 void CloseChannel() { 87 DCHECK(ListenerThread()->task_runner()->BelongsToCurrentThread()); 88 channel_->Close(); 89 } 90 void Start() { 91 StartThread(&listener_thread_, base::MessageLoop::TYPE_DEFAULT); 92 ListenerThread()->task_runner()->PostTask( 93 FROM_HERE, base::Bind(&Worker::OnStart, base::Unretained(this))); 94 } 95 void Shutdown() { 96 // The IPC thread needs to outlive SyncChannel. We can't do this in 97 // ~Worker(), since that'll reset the vtable pointer (to Worker's), which 98 // may result in a race conditions. See http://crbug.com/25841. 99 WaitableEvent listener_done( 100 base::WaitableEvent::ResetPolicy::AUTOMATIC, 101 base::WaitableEvent::InitialState::NOT_SIGNALED), 102 ipc_done(base::WaitableEvent::ResetPolicy::AUTOMATIC, 103 base::WaitableEvent::InitialState::NOT_SIGNALED); 104 ListenerThread()->task_runner()->PostTask( 105 FROM_HERE, 106 base::Bind(&Worker::OnListenerThreadShutdown1, base::Unretained(this), 107 &listener_done, &ipc_done)); 108 listener_done.Wait(); 109 ipc_done.Wait(); 110 ipc_thread_.Stop(); 111 listener_thread_.Stop(); 112 is_shutdown_ = true; 113 } 114 void OverrideThread(base::Thread* overrided_thread) { 115 DCHECK(overrided_thread_ == NULL); 116 overrided_thread_ = overrided_thread; 117 } 118 bool SendAnswerToLife(bool pump, bool succeed) { 119 int answer = 0; 120 SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer); 121 if (pump) 122 msg->EnableMessagePumping(); 123 bool result = Send(msg); 124 DCHECK_EQ(result, succeed); 125 DCHECK_EQ(answer, (succeed ? 42 : 0)); 126 return result; 127 } 128 bool SendDouble(bool pump, bool succeed) { 129 int answer = 0; 130 SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer); 131 if (pump) 132 msg->EnableMessagePumping(); 133 bool result = Send(msg); 134 DCHECK_EQ(result, succeed); 135 DCHECK_EQ(answer, (succeed ? 10 : 0)); 136 return result; 137 } 138 mojo::MessagePipeHandle TakeChannelHandle() { 139 DCHECK(channel_handle_.is_valid()); 140 return channel_handle_.release(); 141 } 142 Channel::Mode mode() { return mode_; } 143 WaitableEvent* done_event() { return done_.get(); } 144 WaitableEvent* shutdown_event() { return &shutdown_event_; } 145 void ResetChannel() { channel_.reset(); } 146 // Derived classes need to call this when they've completed their part of 147 // the test. 148 void Done() { done_->Signal(); } 149 150 protected: 151 SyncChannel* channel() { return channel_.get(); } 152 // Functions for derived classes to implement if they wish. 153 virtual void Run() { } 154 virtual void OnAnswer(int* answer) { NOTREACHED(); } 155 virtual void OnAnswerDelay(Message* reply_msg) { 156 // The message handler map below can only take one entry for 157 // SyncChannelTestMsg_AnswerToLife, so since some classes want 158 // the normal version while other want the delayed reply, we 159 // call the normal version if the derived class didn't override 160 // this function. 161 int answer; 162 OnAnswer(&answer); 163 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, answer); 164 Send(reply_msg); 165 } 166 virtual void OnDouble(int in, int* out) { NOTREACHED(); } 167 virtual void OnDoubleDelay(int in, Message* reply_msg) { 168 int result; 169 OnDouble(in, &result); 170 SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, result); 171 Send(reply_msg); 172 } 173 174 virtual void OnNestedTestMsg(Message* reply_msg) { 175 NOTREACHED(); 176 } 177 178 virtual SyncChannel* CreateChannel() { 179 std::unique_ptr<SyncChannel> channel = SyncChannel::Create( 180 TakeChannelHandle(), mode_, this, ipc_thread_.task_runner(), 181 base::ThreadTaskRunnerHandle::Get(), true, &shutdown_event_); 182 return channel.release(); 183 } 184 185 base::Thread* ListenerThread() { 186 return overrided_thread_ ? overrided_thread_ : &listener_thread_; 187 } 188 189 const base::Thread& ipc_thread() const { return ipc_thread_; } 190 191 private: 192 // Called on the listener thread to create the sync channel. 193 void OnStart() { 194 // Link ipc_thread_, listener_thread_ and channel_ altogether. 195 StartThread(&ipc_thread_, base::MessageLoop::TYPE_IO); 196 channel_.reset(CreateChannel()); 197 channel_created_->Signal(); 198 Run(); 199 } 200 201 void OnListenerThreadShutdown1(WaitableEvent* listener_event, 202 WaitableEvent* ipc_event) { 203 // SyncChannel needs to be destructed on the thread that it was created on. 204 channel_.reset(); 205 206 base::RunLoop().RunUntilIdle(); 207 208 ipc_thread_.task_runner()->PostTask( 209 FROM_HERE, 210 base::Bind(&Worker::OnIPCThreadShutdown, base::Unretained(this), 211 listener_event, ipc_event)); 212 } 213 214 void OnIPCThreadShutdown(WaitableEvent* listener_event, 215 WaitableEvent* ipc_event) { 216 base::RunLoop().RunUntilIdle(); 217 ipc_event->Signal(); 218 219 listener_thread_.task_runner()->PostTask( 220 FROM_HERE, base::Bind(&Worker::OnListenerThreadShutdown2, 221 base::Unretained(this), listener_event)); 222 } 223 224 void OnListenerThreadShutdown2(WaitableEvent* listener_event) { 225 base::RunLoop().RunUntilIdle(); 226 listener_event->Signal(); 227 } 228 229 bool OnMessageReceived(const Message& message) override { 230 IPC_BEGIN_MESSAGE_MAP(Worker, message) 231 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_Double, OnDoubleDelay) 232 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_AnswerToLife, 233 OnAnswerDelay) 234 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelNestedTestMsg_String, 235 OnNestedTestMsg) 236 IPC_END_MESSAGE_MAP() 237 return true; 238 } 239 240 void StartThread(base::Thread* thread, base::MessageLoop::Type type) { 241 base::Thread::Options options; 242 options.message_loop_type = type; 243 thread->StartWithOptions(options); 244 } 245 246 std::unique_ptr<WaitableEvent> done_; 247 std::unique_ptr<WaitableEvent> channel_created_; 248 mojo::ScopedMessagePipeHandle channel_handle_; 249 Channel::Mode mode_; 250 std::unique_ptr<SyncChannel> channel_; 251 base::Thread ipc_thread_; 252 base::Thread listener_thread_; 253 base::Thread* overrided_thread_; 254 255 base::WaitableEvent shutdown_event_; 256 257 bool is_shutdown_; 258 259 DISALLOW_COPY_AND_ASSIGN(Worker); 260 }; 261 262 263 // Starts the test with the given workers. This function deletes the workers 264 // when it's done. 265 void RunTest(std::vector<Worker*> workers) { 266 // First we create the workers that are channel servers, or else the other 267 // workers' channel initialization might fail because the pipe isn't created.. 268 for (size_t i = 0; i < workers.size(); ++i) { 269 if (workers[i]->mode() & Channel::MODE_SERVER_FLAG) { 270 workers[i]->Start(); 271 workers[i]->WaitForChannelCreation(); 272 } 273 } 274 275 // now create the clients 276 for (size_t i = 0; i < workers.size(); ++i) { 277 if (workers[i]->mode() & Channel::MODE_CLIENT_FLAG) 278 workers[i]->Start(); 279 } 280 281 // wait for all the workers to finish 282 for (size_t i = 0; i < workers.size(); ++i) 283 workers[i]->done_event()->Wait(); 284 285 for (size_t i = 0; i < workers.size(); ++i) { 286 workers[i]->Shutdown(); 287 delete workers[i]; 288 } 289 } 290 291 class IPCSyncChannelTest : public testing::Test { 292 private: 293 base::MessageLoop message_loop_; 294 }; 295 296 //------------------------------------------------------------------------------ 297 298 class SimpleServer : public Worker { 299 public: 300 SimpleServer(bool pump_during_send, 301 mojo::ScopedMessagePipeHandle channel_handle) 302 : Worker(Channel::MODE_SERVER, 303 "simpler_server", 304 std::move(channel_handle)), 305 pump_during_send_(pump_during_send) {} 306 void Run() override { 307 SendAnswerToLife(pump_during_send_, true); 308 Done(); 309 } 310 311 bool pump_during_send_; 312 }; 313 314 class SimpleClient : public Worker { 315 public: 316 explicit SimpleClient(mojo::ScopedMessagePipeHandle channel_handle) 317 : Worker(Channel::MODE_CLIENT, 318 "simple_client", 319 std::move(channel_handle)) {} 320 321 void OnAnswer(int* answer) override { 322 *answer = 42; 323 Done(); 324 } 325 }; 326 327 void Simple(bool pump_during_send) { 328 std::vector<Worker*> workers; 329 mojo::MessagePipe pipe; 330 workers.push_back( 331 new SimpleServer(pump_during_send, std::move(pipe.handle0))); 332 workers.push_back(new SimpleClient(std::move(pipe.handle1))); 333 RunTest(workers); 334 } 335 336 #if defined(OS_ANDROID) 337 #define MAYBE_Simple DISABLED_Simple 338 #else 339 #define MAYBE_Simple Simple 340 #endif 341 // Tests basic synchronous call 342 TEST_F(IPCSyncChannelTest, MAYBE_Simple) { 343 Simple(false); 344 Simple(true); 345 } 346 347 //------------------------------------------------------------------------------ 348 349 // Worker classes which override how the sync channel is created to use the 350 // two-step initialization (calling the lightweight constructor and then 351 // ChannelProxy::Init separately) process. 352 class TwoStepServer : public Worker { 353 public: 354 TwoStepServer(bool create_pipe_now, 355 mojo::ScopedMessagePipeHandle channel_handle) 356 : Worker(Channel::MODE_SERVER, 357 "simpler_server", 358 std::move(channel_handle)), 359 create_pipe_now_(create_pipe_now) {} 360 361 void Run() override { 362 SendAnswerToLife(false, true); 363 Done(); 364 } 365 366 SyncChannel* CreateChannel() override { 367 SyncChannel* channel = 368 SyncChannel::Create(TakeChannelHandle(), mode(), this, 369 ipc_thread().task_runner(), 370 base::ThreadTaskRunnerHandle::Get(), 371 create_pipe_now_, shutdown_event()) 372 .release(); 373 return channel; 374 } 375 376 bool create_pipe_now_; 377 }; 378 379 class TwoStepClient : public Worker { 380 public: 381 TwoStepClient(bool create_pipe_now, 382 mojo::ScopedMessagePipeHandle channel_handle) 383 : Worker(Channel::MODE_CLIENT, 384 "simple_client", 385 std::move(channel_handle)), 386 create_pipe_now_(create_pipe_now) {} 387 388 void OnAnswer(int* answer) override { 389 *answer = 42; 390 Done(); 391 } 392 393 SyncChannel* CreateChannel() override { 394 SyncChannel* channel = 395 SyncChannel::Create(TakeChannelHandle(), mode(), this, 396 ipc_thread().task_runner(), 397 base::ThreadTaskRunnerHandle::Get(), 398 create_pipe_now_, shutdown_event()) 399 .release(); 400 return channel; 401 } 402 403 bool create_pipe_now_; 404 }; 405 406 void TwoStep(bool create_server_pipe_now, bool create_client_pipe_now) { 407 std::vector<Worker*> workers; 408 mojo::MessagePipe pipe; 409 workers.push_back( 410 new TwoStepServer(create_server_pipe_now, std::move(pipe.handle0))); 411 workers.push_back( 412 new TwoStepClient(create_client_pipe_now, std::move(pipe.handle1))); 413 RunTest(workers); 414 } 415 416 // Tests basic two-step initialization, where you call the lightweight 417 // constructor then Init. 418 TEST_F(IPCSyncChannelTest, TwoStepInitialization) { 419 TwoStep(false, false); 420 TwoStep(false, true); 421 TwoStep(true, false); 422 TwoStep(true, true); 423 } 424 425 //------------------------------------------------------------------------------ 426 427 class DelayClient : public Worker { 428 public: 429 explicit DelayClient(mojo::ScopedMessagePipeHandle channel_handle) 430 : Worker(Channel::MODE_CLIENT, 431 "delay_client", 432 std::move(channel_handle)) {} 433 434 void OnAnswerDelay(Message* reply_msg) override { 435 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42); 436 Send(reply_msg); 437 Done(); 438 } 439 }; 440 441 void DelayReply(bool pump_during_send) { 442 std::vector<Worker*> workers; 443 mojo::MessagePipe pipe; 444 workers.push_back( 445 new SimpleServer(pump_during_send, std::move(pipe.handle0))); 446 workers.push_back(new DelayClient(std::move(pipe.handle1))); 447 RunTest(workers); 448 } 449 450 // Tests that asynchronous replies work 451 TEST_F(IPCSyncChannelTest, DelayReply) { 452 DelayReply(false); 453 DelayReply(true); 454 } 455 456 //------------------------------------------------------------------------------ 457 458 class NoHangServer : public Worker { 459 public: 460 NoHangServer(WaitableEvent* got_first_reply, 461 bool pump_during_send, 462 mojo::ScopedMessagePipeHandle channel_handle) 463 : Worker(Channel::MODE_SERVER, 464 "no_hang_server", 465 std::move(channel_handle)), 466 got_first_reply_(got_first_reply), 467 pump_during_send_(pump_during_send) {} 468 void Run() override { 469 SendAnswerToLife(pump_during_send_, true); 470 got_first_reply_->Signal(); 471 472 SendAnswerToLife(pump_during_send_, false); 473 Done(); 474 } 475 476 WaitableEvent* got_first_reply_; 477 bool pump_during_send_; 478 }; 479 480 class NoHangClient : public Worker { 481 public: 482 NoHangClient(WaitableEvent* got_first_reply, 483 mojo::ScopedMessagePipeHandle channel_handle) 484 : Worker(Channel::MODE_CLIENT, 485 "no_hang_client", 486 std::move(channel_handle)), 487 got_first_reply_(got_first_reply) {} 488 489 void OnAnswerDelay(Message* reply_msg) override { 490 // Use the DELAY_REPLY macro so that we can force the reply to be sent 491 // before this function returns (when the channel will be reset). 492 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42); 493 Send(reply_msg); 494 got_first_reply_->Wait(); 495 CloseChannel(); 496 Done(); 497 } 498 499 WaitableEvent* got_first_reply_; 500 }; 501 502 void NoHang(bool pump_during_send) { 503 WaitableEvent got_first_reply( 504 base::WaitableEvent::ResetPolicy::AUTOMATIC, 505 base::WaitableEvent::InitialState::NOT_SIGNALED); 506 std::vector<Worker*> workers; 507 mojo::MessagePipe pipe; 508 workers.push_back(new NoHangServer(&got_first_reply, pump_during_send, 509 std::move(pipe.handle0))); 510 workers.push_back( 511 new NoHangClient(&got_first_reply, std::move(pipe.handle1))); 512 RunTest(workers); 513 } 514 515 // Tests that caller doesn't hang if receiver dies 516 TEST_F(IPCSyncChannelTest, NoHang) { 517 NoHang(false); 518 NoHang(true); 519 } 520 521 //------------------------------------------------------------------------------ 522 523 class UnblockServer : public Worker { 524 public: 525 UnblockServer(bool pump_during_send, 526 bool delete_during_send, 527 mojo::ScopedMessagePipeHandle channel_handle) 528 : Worker(Channel::MODE_SERVER, 529 "unblock_server", 530 std::move(channel_handle)), 531 pump_during_send_(pump_during_send), 532 delete_during_send_(delete_during_send) {} 533 void Run() override { 534 if (delete_during_send_) { 535 // Use custom code since race conditions mean the answer may or may not be 536 // available. 537 int answer = 0; 538 SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer); 539 if (pump_during_send_) 540 msg->EnableMessagePumping(); 541 Send(msg); 542 } else { 543 SendAnswerToLife(pump_during_send_, true); 544 } 545 Done(); 546 } 547 548 void OnDoubleDelay(int in, Message* reply_msg) override { 549 SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2); 550 Send(reply_msg); 551 if (delete_during_send_) 552 ResetChannel(); 553 } 554 555 bool pump_during_send_; 556 bool delete_during_send_; 557 }; 558 559 class UnblockClient : public Worker { 560 public: 561 UnblockClient(bool pump_during_send, 562 mojo::ScopedMessagePipeHandle channel_handle) 563 : Worker(Channel::MODE_CLIENT, 564 "unblock_client", 565 std::move(channel_handle)), 566 pump_during_send_(pump_during_send) {} 567 568 void OnAnswer(int* answer) override { 569 SendDouble(pump_during_send_, true); 570 *answer = 42; 571 Done(); 572 } 573 574 bool pump_during_send_; 575 }; 576 577 void Unblock(bool server_pump, bool client_pump, bool delete_during_send) { 578 std::vector<Worker*> workers; 579 mojo::MessagePipe pipe; 580 workers.push_back(new UnblockServer(server_pump, delete_during_send, 581 std::move(pipe.handle0))); 582 workers.push_back(new UnblockClient(client_pump, std::move(pipe.handle1))); 583 RunTest(workers); 584 } 585 586 // Tests that the caller unblocks to answer a sync message from the receiver. 587 TEST_F(IPCSyncChannelTest, Unblock) { 588 Unblock(false, false, false); 589 Unblock(false, true, false); 590 Unblock(true, false, false); 591 Unblock(true, true, false); 592 } 593 594 //------------------------------------------------------------------------------ 595 596 #if defined(OS_ANDROID) 597 #define MAYBE_ChannelDeleteDuringSend DISABLED_ChannelDeleteDuringSend 598 #else 599 #define MAYBE_ChannelDeleteDuringSend ChannelDeleteDuringSend 600 #endif 601 // Tests that the the SyncChannel object can be deleted during a Send. 602 TEST_F(IPCSyncChannelTest, MAYBE_ChannelDeleteDuringSend) { 603 Unblock(false, false, true); 604 Unblock(false, true, true); 605 Unblock(true, false, true); 606 Unblock(true, true, true); 607 } 608 609 //------------------------------------------------------------------------------ 610 611 class RecursiveServer : public Worker { 612 public: 613 RecursiveServer(bool expected_send_result, 614 bool pump_first, 615 bool pump_second, 616 mojo::ScopedMessagePipeHandle channel_handle) 617 : Worker(Channel::MODE_SERVER, 618 "recursive_server", 619 std::move(channel_handle)), 620 expected_send_result_(expected_send_result), 621 pump_first_(pump_first), 622 pump_second_(pump_second) {} 623 void Run() override { 624 SendDouble(pump_first_, expected_send_result_); 625 Done(); 626 } 627 628 void OnDouble(int in, int* out) override { 629 *out = in * 2; 630 SendAnswerToLife(pump_second_, expected_send_result_); 631 } 632 633 bool expected_send_result_, pump_first_, pump_second_; 634 }; 635 636 class RecursiveClient : public Worker { 637 public: 638 RecursiveClient(bool pump_during_send, 639 bool close_channel, 640 mojo::ScopedMessagePipeHandle channel_handle) 641 : Worker(Channel::MODE_CLIENT, 642 "recursive_client", 643 std::move(channel_handle)), 644 pump_during_send_(pump_during_send), 645 close_channel_(close_channel) {} 646 647 void OnDoubleDelay(int in, Message* reply_msg) override { 648 SendDouble(pump_during_send_, !close_channel_); 649 if (close_channel_) { 650 delete reply_msg; 651 } else { 652 SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2); 653 Send(reply_msg); 654 } 655 Done(); 656 } 657 658 void OnAnswerDelay(Message* reply_msg) override { 659 if (close_channel_) { 660 delete reply_msg; 661 CloseChannel(); 662 } else { 663 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42); 664 Send(reply_msg); 665 } 666 } 667 668 bool pump_during_send_, close_channel_; 669 }; 670 671 void Recursive( 672 bool server_pump_first, bool server_pump_second, bool client_pump) { 673 std::vector<Worker*> workers; 674 mojo::MessagePipe pipe; 675 workers.push_back(new RecursiveServer( 676 true, server_pump_first, server_pump_second, std::move(pipe.handle0))); 677 workers.push_back( 678 new RecursiveClient(client_pump, false, std::move(pipe.handle1))); 679 RunTest(workers); 680 } 681 682 // Tests a server calling Send while another Send is pending. 683 TEST_F(IPCSyncChannelTest, Recursive) { 684 Recursive(false, false, false); 685 Recursive(false, false, true); 686 Recursive(false, true, false); 687 Recursive(false, true, true); 688 Recursive(true, false, false); 689 Recursive(true, false, true); 690 Recursive(true, true, false); 691 Recursive(true, true, true); 692 } 693 694 //------------------------------------------------------------------------------ 695 696 void RecursiveNoHang( 697 bool server_pump_first, bool server_pump_second, bool client_pump) { 698 std::vector<Worker*> workers; 699 mojo::MessagePipe pipe; 700 workers.push_back(new RecursiveServer( 701 false, server_pump_first, server_pump_second, std::move(pipe.handle0))); 702 workers.push_back( 703 new RecursiveClient(client_pump, true, std::move(pipe.handle1))); 704 RunTest(workers); 705 } 706 707 // Tests that if a caller makes a sync call during an existing sync call and 708 // the receiver dies, neither of the Send() calls hang. 709 TEST_F(IPCSyncChannelTest, RecursiveNoHang) { 710 RecursiveNoHang(false, false, false); 711 RecursiveNoHang(false, false, true); 712 RecursiveNoHang(false, true, false); 713 RecursiveNoHang(false, true, true); 714 RecursiveNoHang(true, false, false); 715 RecursiveNoHang(true, false, true); 716 RecursiveNoHang(true, true, false); 717 RecursiveNoHang(true, true, true); 718 } 719 720 //------------------------------------------------------------------------------ 721 722 class MultipleServer1 : public Worker { 723 public: 724 MultipleServer1(bool pump_during_send, 725 mojo::ScopedMessagePipeHandle channel_handle) 726 : Worker(std::move(channel_handle), Channel::MODE_SERVER), 727 pump_during_send_(pump_during_send) {} 728 729 void Run() override { 730 SendDouble(pump_during_send_, true); 731 Done(); 732 } 733 734 bool pump_during_send_; 735 }; 736 737 class MultipleClient1 : public Worker { 738 public: 739 MultipleClient1(WaitableEvent* client1_msg_received, 740 WaitableEvent* client1_can_reply, 741 mojo::ScopedMessagePipeHandle channel_handle) 742 : Worker(std::move(channel_handle), Channel::MODE_CLIENT), 743 client1_msg_received_(client1_msg_received), 744 client1_can_reply_(client1_can_reply) {} 745 746 void OnDouble(int in, int* out) override { 747 client1_msg_received_->Signal(); 748 *out = in * 2; 749 client1_can_reply_->Wait(); 750 Done(); 751 } 752 753 private: 754 WaitableEvent *client1_msg_received_, *client1_can_reply_; 755 }; 756 757 class MultipleServer2 : public Worker { 758 public: 759 explicit MultipleServer2(mojo::ScopedMessagePipeHandle channel_handle) 760 : Worker(std::move(channel_handle), Channel::MODE_SERVER) {} 761 762 void OnAnswer(int* result) override { 763 *result = 42; 764 Done(); 765 } 766 }; 767 768 class MultipleClient2 : public Worker { 769 public: 770 MultipleClient2(WaitableEvent* client1_msg_received, 771 WaitableEvent* client1_can_reply, 772 bool pump_during_send, 773 mojo::ScopedMessagePipeHandle channel_handle) 774 : Worker(std::move(channel_handle), Channel::MODE_CLIENT), 775 client1_msg_received_(client1_msg_received), 776 client1_can_reply_(client1_can_reply), 777 pump_during_send_(pump_during_send) {} 778 779 void Run() override { 780 client1_msg_received_->Wait(); 781 SendAnswerToLife(pump_during_send_, true); 782 client1_can_reply_->Signal(); 783 Done(); 784 } 785 786 private: 787 WaitableEvent *client1_msg_received_, *client1_can_reply_; 788 bool pump_during_send_; 789 }; 790 791 void Multiple(bool server_pump, bool client_pump) { 792 std::vector<Worker*> workers; 793 794 // A shared worker thread so that server1 and server2 run on one thread. 795 base::Thread worker_thread("Multiple"); 796 ASSERT_TRUE(worker_thread.Start()); 797 798 // Server1 sends a sync msg to client1, which blocks the reply until 799 // server2 (which runs on the same worker thread as server1) responds 800 // to a sync msg from client2. 801 WaitableEvent client1_msg_received( 802 base::WaitableEvent::ResetPolicy::AUTOMATIC, 803 base::WaitableEvent::InitialState::NOT_SIGNALED); 804 WaitableEvent client1_can_reply( 805 base::WaitableEvent::ResetPolicy::AUTOMATIC, 806 base::WaitableEvent::InitialState::NOT_SIGNALED); 807 808 Worker* worker; 809 810 mojo::MessagePipe pipe1, pipe2; 811 worker = new MultipleServer2(std::move(pipe2.handle0)); 812 worker->OverrideThread(&worker_thread); 813 workers.push_back(worker); 814 815 worker = new MultipleClient2(&client1_msg_received, &client1_can_reply, 816 client_pump, std::move(pipe2.handle1)); 817 workers.push_back(worker); 818 819 worker = new MultipleServer1(server_pump, std::move(pipe1.handle0)); 820 worker->OverrideThread(&worker_thread); 821 workers.push_back(worker); 822 823 worker = new MultipleClient1(&client1_msg_received, &client1_can_reply, 824 std::move(pipe1.handle1)); 825 workers.push_back(worker); 826 827 RunTest(workers); 828 } 829 830 // Tests that multiple SyncObjects on the same listener thread can unblock each 831 // other. 832 TEST_F(IPCSyncChannelTest, Multiple) { 833 Multiple(false, false); 834 Multiple(false, true); 835 Multiple(true, false); 836 Multiple(true, true); 837 } 838 839 //------------------------------------------------------------------------------ 840 841 // This class provides server side functionality to test the case where 842 // multiple sync channels are in use on the same thread on the client and 843 // nested calls are issued. 844 class QueuedReplyServer : public Worker { 845 public: 846 QueuedReplyServer(base::Thread* listener_thread, 847 mojo::ScopedMessagePipeHandle channel_handle, 848 const std::string& reply_text) 849 : Worker(std::move(channel_handle), Channel::MODE_SERVER), 850 reply_text_(reply_text) { 851 Worker::OverrideThread(listener_thread); 852 } 853 854 void OnNestedTestMsg(Message* reply_msg) override { 855 VLOG(1) << __FUNCTION__ << " Sending reply: " << reply_text_; 856 SyncChannelNestedTestMsg_String::WriteReplyParams(reply_msg, reply_text_); 857 Send(reply_msg); 858 Done(); 859 } 860 861 private: 862 std::string reply_text_; 863 }; 864 865 // The QueuedReplyClient class provides functionality to test the case where 866 // multiple sync channels are in use on the same thread and they make nested 867 // sync calls, i.e. while the first channel waits for a response it makes a 868 // sync call on another channel. 869 // The callstack should unwind correctly, i.e. the outermost call should 870 // complete first, and so on. 871 class QueuedReplyClient : public Worker { 872 public: 873 QueuedReplyClient(base::Thread* listener_thread, 874 mojo::ScopedMessagePipeHandle channel_handle, 875 const std::string& expected_text, 876 bool pump_during_send) 877 : Worker(std::move(channel_handle), Channel::MODE_CLIENT), 878 pump_during_send_(pump_during_send), 879 expected_text_(expected_text) { 880 Worker::OverrideThread(listener_thread); 881 } 882 883 void Run() override { 884 std::string response; 885 SyncMessage* msg = new SyncChannelNestedTestMsg_String(&response); 886 if (pump_during_send_) 887 msg->EnableMessagePumping(); 888 bool result = Send(msg); 889 DCHECK(result); 890 DCHECK_EQ(response, expected_text_); 891 892 VLOG(1) << __FUNCTION__ << " Received reply: " << response; 893 Done(); 894 } 895 896 private: 897 bool pump_during_send_; 898 std::string expected_text_; 899 }; 900 901 void QueuedReply(bool client_pump) { 902 std::vector<Worker*> workers; 903 904 // A shared worker thread for servers 905 base::Thread server_worker_thread("QueuedReply_ServerListener"); 906 ASSERT_TRUE(server_worker_thread.Start()); 907 908 base::Thread client_worker_thread("QueuedReply_ClientListener"); 909 ASSERT_TRUE(client_worker_thread.Start()); 910 911 Worker* worker; 912 913 mojo::MessagePipe pipe1, pipe2; 914 worker = new QueuedReplyServer(&server_worker_thread, 915 std::move(pipe1.handle0), "Got first message"); 916 workers.push_back(worker); 917 918 worker = new QueuedReplyServer( 919 &server_worker_thread, std::move(pipe2.handle0), "Got second message"); 920 workers.push_back(worker); 921 922 worker = 923 new QueuedReplyClient(&client_worker_thread, std::move(pipe1.handle1), 924 "Got first message", client_pump); 925 workers.push_back(worker); 926 927 worker = 928 new QueuedReplyClient(&client_worker_thread, std::move(pipe2.handle1), 929 "Got second message", client_pump); 930 workers.push_back(worker); 931 932 RunTest(workers); 933 } 934 935 // While a blocking send is in progress, the listener thread might answer other 936 // synchronous messages. This tests that if during the response to another 937 // message the reply to the original messages comes, it is queued up correctly 938 // and the original Send is unblocked later. 939 // We also test that the send call stacks unwind correctly when the channel 940 // pumps messages while waiting for a response. 941 TEST_F(IPCSyncChannelTest, QueuedReply) { 942 QueuedReply(false); 943 QueuedReply(true); 944 } 945 946 //------------------------------------------------------------------------------ 947 948 class ChattyClient : public Worker { 949 public: 950 explicit ChattyClient(mojo::ScopedMessagePipeHandle channel_handle) 951 : Worker(Channel::MODE_CLIENT, 952 "chatty_client", 953 std::move(channel_handle)) {} 954 955 void OnAnswer(int* answer) override { 956 // The PostMessage limit is 10k. Send 20% more than that. 957 const int kMessageLimit = 10000; 958 const int kMessagesToSend = kMessageLimit * 120 / 100; 959 for (int i = 0; i < kMessagesToSend; ++i) { 960 if (!SendDouble(false, true)) 961 break; 962 } 963 *answer = 42; 964 Done(); 965 } 966 }; 967 968 void ChattyServer(bool pump_during_send) { 969 std::vector<Worker*> workers; 970 mojo::MessagePipe pipe; 971 workers.push_back( 972 new UnblockServer(pump_during_send, false, std::move(pipe.handle0))); 973 workers.push_back(new ChattyClient(std::move(pipe.handle1))); 974 RunTest(workers); 975 } 976 977 #if defined(OS_ANDROID) 978 // Times out. 979 #define MAYBE_ChattyServer DISABLED_ChattyServer 980 #else 981 #define MAYBE_ChattyServer ChattyServer 982 #endif 983 // Tests http://b/1093251 - that sending lots of sync messages while 984 // the receiver is waiting for a sync reply does not overflow the PostMessage 985 // queue. 986 TEST_F(IPCSyncChannelTest, MAYBE_ChattyServer) { 987 ChattyServer(false); 988 } 989 990 #if defined(OS_ANDROID) 991 // Times out. 992 #define MAYBE_ChattyServerPumpDuringSend DISABLED_ChattyServerPumpDuringSend 993 #else 994 #define MAYBE_ChattyServerPumpDuringSend ChattyServerPumpDuringSend 995 #endif 996 TEST_F(IPCSyncChannelTest, MAYBE_ChattyServerPumpDuringSend) { 997 ChattyServer(true); 998 } 999 1000 //------------------------------------------------------------------------------ 1001 1002 void NestedCallback(Worker* server) { 1003 // Sleep a bit so that we wake up after the reply has been received. 1004 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(250)); 1005 server->SendAnswerToLife(true, true); 1006 } 1007 1008 bool timeout_occurred = false; 1009 1010 void TimeoutCallback() { 1011 timeout_occurred = true; 1012 } 1013 1014 class DoneEventRaceServer : public Worker { 1015 public: 1016 explicit DoneEventRaceServer(mojo::ScopedMessagePipeHandle channel_handle) 1017 : Worker(Channel::MODE_SERVER, 1018 "done_event_race_server", 1019 std::move(channel_handle)) {} 1020 1021 void Run() override { 1022 base::ThreadTaskRunnerHandle::Get()->PostTask( 1023 FROM_HERE, base::Bind(&NestedCallback, base::Unretained(this))); 1024 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask( 1025 FROM_HERE, base::Bind(&TimeoutCallback), 1026 base::TimeDelta::FromSeconds(9)); 1027 // Even though we have a timeout on the Send, it will succeed since for this 1028 // bug, the reply message comes back and is deserialized, however the done 1029 // event wasn't set. So we indirectly use the timeout task to notice if a 1030 // timeout occurred. 1031 SendAnswerToLife(true, true); 1032 DCHECK(!timeout_occurred); 1033 Done(); 1034 } 1035 }; 1036 1037 #if defined(OS_ANDROID) 1038 #define MAYBE_DoneEventRace DISABLED_DoneEventRace 1039 #else 1040 #define MAYBE_DoneEventRace DoneEventRace 1041 #endif 1042 // Tests http://b/1474092 - that if after the done_event is set but before 1043 // OnObjectSignaled is called another message is sent out, then after its 1044 // reply comes back OnObjectSignaled will be called for the first message. 1045 TEST_F(IPCSyncChannelTest, MAYBE_DoneEventRace) { 1046 std::vector<Worker*> workers; 1047 mojo::MessagePipe pipe; 1048 workers.push_back(new DoneEventRaceServer(std::move(pipe.handle0))); 1049 workers.push_back(new SimpleClient(std::move(pipe.handle1))); 1050 RunTest(workers); 1051 } 1052 1053 //------------------------------------------------------------------------------ 1054 1055 class TestSyncMessageFilter : public SyncMessageFilter { 1056 public: 1057 TestSyncMessageFilter( 1058 base::WaitableEvent* shutdown_event, 1059 Worker* worker, 1060 scoped_refptr<base::SingleThreadTaskRunner> task_runner) 1061 : SyncMessageFilter(shutdown_event), 1062 worker_(worker), 1063 task_runner_(task_runner) {} 1064 1065 void OnFilterAdded(Channel* channel) override { 1066 SyncMessageFilter::OnFilterAdded(channel); 1067 task_runner_->PostTask( 1068 FROM_HERE, 1069 base::Bind(&TestSyncMessageFilter::SendMessageOnHelperThread, this)); 1070 } 1071 1072 void SendMessageOnHelperThread() { 1073 int answer = 0; 1074 bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer)); 1075 DCHECK(result); 1076 DCHECK_EQ(answer, 42); 1077 1078 worker_->Done(); 1079 } 1080 1081 private: 1082 ~TestSyncMessageFilter() override = default; 1083 1084 Worker* worker_; 1085 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 1086 }; 1087 1088 class SyncMessageFilterServer : public Worker { 1089 public: 1090 explicit SyncMessageFilterServer(mojo::ScopedMessagePipeHandle channel_handle) 1091 : Worker(Channel::MODE_SERVER, 1092 "sync_message_filter_server", 1093 std::move(channel_handle)), 1094 thread_("helper_thread") { 1095 base::Thread::Options options; 1096 options.message_loop_type = base::MessageLoop::TYPE_DEFAULT; 1097 thread_.StartWithOptions(options); 1098 filter_ = new TestSyncMessageFilter(shutdown_event(), this, 1099 thread_.task_runner()); 1100 } 1101 1102 void Run() override { 1103 channel()->AddFilter(filter_.get()); 1104 } 1105 1106 base::Thread thread_; 1107 scoped_refptr<TestSyncMessageFilter> filter_; 1108 }; 1109 1110 // This class provides functionality to test the case that a Send on the sync 1111 // channel does not crash after the channel has been closed. 1112 class ServerSendAfterClose : public Worker { 1113 public: 1114 explicit ServerSendAfterClose(mojo::ScopedMessagePipeHandle channel_handle) 1115 : Worker(Channel::MODE_SERVER, 1116 "simpler_server", 1117 std::move(channel_handle)), 1118 send_result_(true) {} 1119 1120 bool SendDummy() { 1121 ListenerThread()->task_runner()->PostTask( 1122 FROM_HERE, 1123 base::Bind(base::IgnoreResult(&ServerSendAfterClose::Send), 1124 base::Unretained(this), new SyncChannelTestMsg_NoArgs)); 1125 return true; 1126 } 1127 1128 bool send_result() const { 1129 return send_result_; 1130 } 1131 1132 private: 1133 void Run() override { 1134 CloseChannel(); 1135 Done(); 1136 } 1137 1138 bool Send(Message* msg) override { 1139 send_result_ = Worker::Send(msg); 1140 Done(); 1141 return send_result_; 1142 } 1143 1144 bool send_result_; 1145 }; 1146 1147 // Tests basic synchronous call 1148 TEST_F(IPCSyncChannelTest, SyncMessageFilter) { 1149 std::vector<Worker*> workers; 1150 mojo::MessagePipe pipe; 1151 workers.push_back(new SyncMessageFilterServer(std::move(pipe.handle0))); 1152 workers.push_back(new SimpleClient(std::move(pipe.handle1))); 1153 RunTest(workers); 1154 } 1155 1156 // Test the case when the channel is closed and a Send is attempted after that. 1157 TEST_F(IPCSyncChannelTest, SendAfterClose) { 1158 mojo::MessagePipe pipe; 1159 ServerSendAfterClose server(std::move(pipe.handle0)); 1160 server.Start(); 1161 1162 server.done_event()->Wait(); 1163 server.done_event()->Reset(); 1164 1165 server.SendDummy(); 1166 server.done_event()->Wait(); 1167 1168 EXPECT_FALSE(server.send_result()); 1169 1170 server.Shutdown(); 1171 } 1172 1173 //------------------------------------------------------------------------------ 1174 1175 class RestrictedDispatchServer : public Worker { 1176 public: 1177 RestrictedDispatchServer(WaitableEvent* sent_ping_event, 1178 WaitableEvent* wait_event, 1179 mojo::ScopedMessagePipeHandle channel_handle) 1180 : Worker(std::move(channel_handle), Channel::MODE_SERVER), 1181 sent_ping_event_(sent_ping_event), 1182 wait_event_(wait_event) {} 1183 1184 void OnDoPing(int ping) { 1185 // Send an asynchronous message that unblocks the caller. 1186 Message* msg = new SyncChannelTestMsg_Ping(ping); 1187 msg->set_unblock(true); 1188 Send(msg); 1189 // Signal the event after the message has been sent on the channel, on the 1190 // IPC thread. 1191 ipc_thread().task_runner()->PostTask( 1192 FROM_HERE, base::Bind(&RestrictedDispatchServer::OnPingSent, 1193 base::Unretained(this))); 1194 } 1195 1196 void OnPingTTL(int ping, int* out) { 1197 *out = ping; 1198 wait_event_->Wait(); 1199 } 1200 1201 base::Thread* ListenerThread() { return Worker::ListenerThread(); } 1202 1203 private: 1204 bool OnMessageReceived(const Message& message) override { 1205 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchServer, message) 1206 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1207 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL) 1208 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done) 1209 IPC_END_MESSAGE_MAP() 1210 return true; 1211 } 1212 1213 void OnPingSent() { 1214 sent_ping_event_->Signal(); 1215 } 1216 1217 void OnNoArgs() { } 1218 WaitableEvent* sent_ping_event_; 1219 WaitableEvent* wait_event_; 1220 }; 1221 1222 class NonRestrictedDispatchServer : public Worker { 1223 public: 1224 NonRestrictedDispatchServer(WaitableEvent* signal_event, 1225 mojo::ScopedMessagePipeHandle channel_handle) 1226 : Worker(std::move(channel_handle), Channel::MODE_SERVER), 1227 signal_event_(signal_event) {} 1228 1229 base::Thread* ListenerThread() { return Worker::ListenerThread(); } 1230 1231 void OnDoPingTTL(int ping) { 1232 int value = 0; 1233 Send(new SyncChannelTestMsg_PingTTL(ping, &value)); 1234 signal_event_->Signal(); 1235 } 1236 1237 private: 1238 bool OnMessageReceived(const Message& message) override { 1239 IPC_BEGIN_MESSAGE_MAP(NonRestrictedDispatchServer, message) 1240 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1241 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done) 1242 IPC_END_MESSAGE_MAP() 1243 return true; 1244 } 1245 1246 void OnNoArgs() { } 1247 WaitableEvent* signal_event_; 1248 }; 1249 1250 class RestrictedDispatchClient : public Worker { 1251 public: 1252 RestrictedDispatchClient( 1253 WaitableEvent* sent_ping_event, 1254 RestrictedDispatchServer* server, 1255 NonRestrictedDispatchServer* server2, 1256 int* success, 1257 mojo::ScopedMessagePipeHandle restricted_channel_handle, 1258 mojo::ScopedMessagePipeHandle non_restricted_channel_handle) 1259 : Worker(std::move(restricted_channel_handle), Channel::MODE_CLIENT), 1260 ping_(0), 1261 server_(server), 1262 server2_(server2), 1263 success_(success), 1264 sent_ping_event_(sent_ping_event), 1265 non_restricted_channel_handle_( 1266 std::move(non_restricted_channel_handle)) {} 1267 1268 void Run() override { 1269 // Incoming messages from our channel should only be dispatched when we 1270 // send a message on that same channel. 1271 channel()->SetRestrictDispatchChannelGroup(1); 1272 1273 server_->ListenerThread()->task_runner()->PostTask( 1274 FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing, 1275 base::Unretained(server_), 1)); 1276 sent_ping_event_->Wait(); 1277 Send(new SyncChannelTestMsg_NoArgs); 1278 if (ping_ == 1) 1279 ++*success_; 1280 else 1281 LOG(ERROR) << "Send failed to dispatch incoming message on same channel"; 1282 1283 non_restricted_channel_ = SyncChannel::Create( 1284 non_restricted_channel_handle_.release(), IPC::Channel::MODE_CLIENT, 1285 this, ipc_thread().task_runner(), base::ThreadTaskRunnerHandle::Get(), 1286 true, shutdown_event()); 1287 1288 server_->ListenerThread()->task_runner()->PostTask( 1289 FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing, 1290 base::Unretained(server_), 2)); 1291 sent_ping_event_->Wait(); 1292 // Check that the incoming message is *not* dispatched when sending on the 1293 // non restricted channel. 1294 // TODO(piman): there is a possibility of a false positive race condition 1295 // here, if the message that was posted on the server-side end of the pipe 1296 // is not visible yet on the client side, but I don't know how to solve this 1297 // without hooking into the internals of SyncChannel. I haven't seen it in 1298 // practice (i.e. not setting SetRestrictDispatchToSameChannel does cause 1299 // the following to fail). 1300 non_restricted_channel_->Send(new SyncChannelTestMsg_NoArgs); 1301 if (ping_ == 1) 1302 ++*success_; 1303 else 1304 LOG(ERROR) << "Send dispatched message from restricted channel"; 1305 1306 Send(new SyncChannelTestMsg_NoArgs); 1307 if (ping_ == 2) 1308 ++*success_; 1309 else 1310 LOG(ERROR) << "Send failed to dispatch incoming message on same channel"; 1311 1312 // Check that the incoming message on the non-restricted channel is 1313 // dispatched when sending on the restricted channel. 1314 server2_->ListenerThread()->task_runner()->PostTask( 1315 FROM_HERE, base::Bind(&NonRestrictedDispatchServer::OnDoPingTTL, 1316 base::Unretained(server2_), 3)); 1317 int value = 0; 1318 Send(new SyncChannelTestMsg_PingTTL(4, &value)); 1319 if (ping_ == 3 && value == 4) 1320 ++*success_; 1321 else 1322 LOG(ERROR) << "Send failed to dispatch message from unrestricted channel"; 1323 1324 non_restricted_channel_->Send(new SyncChannelTestMsg_Done); 1325 non_restricted_channel_.reset(); 1326 Send(new SyncChannelTestMsg_Done); 1327 Done(); 1328 } 1329 1330 private: 1331 bool OnMessageReceived(const Message& message) override { 1332 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchClient, message) 1333 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Ping, OnPing) 1334 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_PingTTL, OnPingTTL) 1335 IPC_END_MESSAGE_MAP() 1336 return true; 1337 } 1338 1339 void OnPing(int ping) { 1340 ping_ = ping; 1341 } 1342 1343 void OnPingTTL(int ping, IPC::Message* reply) { 1344 ping_ = ping; 1345 // This message comes from the NonRestrictedDispatchServer, we have to send 1346 // the reply back manually. 1347 SyncChannelTestMsg_PingTTL::WriteReplyParams(reply, ping); 1348 non_restricted_channel_->Send(reply); 1349 } 1350 1351 int ping_; 1352 RestrictedDispatchServer* server_; 1353 NonRestrictedDispatchServer* server2_; 1354 int* success_; 1355 WaitableEvent* sent_ping_event_; 1356 std::unique_ptr<SyncChannel> non_restricted_channel_; 1357 mojo::ScopedMessagePipeHandle non_restricted_channel_handle_; 1358 }; 1359 1360 TEST_F(IPCSyncChannelTest, RestrictedDispatch) { 1361 WaitableEvent sent_ping_event( 1362 base::WaitableEvent::ResetPolicy::AUTOMATIC, 1363 base::WaitableEvent::InitialState::NOT_SIGNALED); 1364 WaitableEvent wait_event(base::WaitableEvent::ResetPolicy::AUTOMATIC, 1365 base::WaitableEvent::InitialState::NOT_SIGNALED); 1366 mojo::MessagePipe restricted_pipe, non_restricted_pipe; 1367 RestrictedDispatchServer* server = new RestrictedDispatchServer( 1368 &sent_ping_event, &wait_event, std::move(restricted_pipe.handle0)); 1369 NonRestrictedDispatchServer* server2 = new NonRestrictedDispatchServer( 1370 &wait_event, std::move(non_restricted_pipe.handle0)); 1371 1372 int success = 0; 1373 std::vector<Worker*> workers; 1374 workers.push_back(server); 1375 workers.push_back(server2); 1376 workers.push_back( 1377 new RestrictedDispatchClient(&sent_ping_event, server, server2, &success, 1378 std::move(restricted_pipe.handle1), 1379 std::move(non_restricted_pipe.handle1))); 1380 RunTest(workers); 1381 EXPECT_EQ(4, success); 1382 } 1383 1384 //------------------------------------------------------------------------------ 1385 1386 // This test case inspired by crbug.com/108491 1387 // We create two servers that use the same ListenerThread but have 1388 // SetRestrictDispatchToSameChannel set to true. 1389 // We create clients, then use some specific WaitableEvent wait/signalling to 1390 // ensure that messages get dispatched in a way that causes a deadlock due to 1391 // a nested dispatch and an eligible message in a higher-level dispatch's 1392 // delayed_queue. Specifically, we start with client1 about so send an 1393 // unblocking message to server1, while the shared listener thread for the 1394 // servers server1 and server2 is about to send a non-unblocking message to 1395 // client1. At the same time, client2 will be about to send an unblocking 1396 // message to server2. Server1 will handle the client1->server1 message by 1397 // telling server2 to send a non-unblocking message to client2. 1398 // What should happen is that the send to server2 should find the pending, 1399 // same-context client2->server2 message to dispatch, causing client2 to 1400 // unblock then handle the server2->client2 message, so that the shared 1401 // servers' listener thread can then respond to the client1->server1 message. 1402 // Then client1 can handle the non-unblocking server1->client1 message. 1403 // The old code would end up in a state where the server2->client2 message is 1404 // sent, but the client2->server2 message (which is eligible for dispatch, and 1405 // which is what client2 is waiting for) is stashed in a local delayed_queue 1406 // that has server1's channel context, causing a deadlock. 1407 // WaitableEvents in the events array are used to: 1408 // event 0: indicate to client1 that server listener is in OnDoServerTask 1409 // event 1: indicate to client1 that client2 listener is in OnDoClient2Task 1410 // event 2: indicate to server1 that client2 listener is in OnDoClient2Task 1411 // event 3: indicate to client2 that server listener is in OnDoServerTask 1412 1413 class RestrictedDispatchDeadlockServer : public Worker { 1414 public: 1415 RestrictedDispatchDeadlockServer(int server_num, 1416 WaitableEvent* server_ready_event, 1417 WaitableEvent** events, 1418 RestrictedDispatchDeadlockServer* peer, 1419 mojo::ScopedMessagePipeHandle channel_handle) 1420 : Worker(std::move(channel_handle), Channel::MODE_SERVER), 1421 server_num_(server_num), 1422 server_ready_event_(server_ready_event), 1423 events_(events), 1424 peer_(peer) {} 1425 1426 void OnDoServerTask() { 1427 events_[3]->Signal(); 1428 events_[2]->Wait(); 1429 events_[0]->Signal(); 1430 SendMessageToClient(); 1431 } 1432 1433 void Run() override { 1434 channel()->SetRestrictDispatchChannelGroup(1); 1435 server_ready_event_->Signal(); 1436 } 1437 1438 base::Thread* ListenerThread() { return Worker::ListenerThread(); } 1439 1440 private: 1441 bool OnMessageReceived(const Message& message) override { 1442 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockServer, message) 1443 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1444 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done) 1445 IPC_END_MESSAGE_MAP() 1446 return true; 1447 } 1448 1449 void OnNoArgs() { 1450 if (server_num_ == 1) { 1451 DCHECK(peer_ != NULL); 1452 peer_->SendMessageToClient(); 1453 } 1454 } 1455 1456 void SendMessageToClient() { 1457 Message* msg = new SyncChannelTestMsg_NoArgs; 1458 msg->set_unblock(false); 1459 DCHECK(!msg->should_unblock()); 1460 Send(msg); 1461 } 1462 1463 int server_num_; 1464 WaitableEvent* server_ready_event_; 1465 WaitableEvent** events_; 1466 RestrictedDispatchDeadlockServer* peer_; 1467 }; 1468 1469 class RestrictedDispatchDeadlockClient2 : public Worker { 1470 public: 1471 RestrictedDispatchDeadlockClient2( 1472 RestrictedDispatchDeadlockServer* server, 1473 WaitableEvent* server_ready_event, 1474 WaitableEvent** events, 1475 mojo::ScopedMessagePipeHandle channel_handle) 1476 : Worker(std::move(channel_handle), Channel::MODE_CLIENT), 1477 server_ready_event_(server_ready_event), 1478 events_(events), 1479 received_msg_(false), 1480 received_noarg_reply_(false), 1481 done_issued_(false) {} 1482 1483 void Run() override { 1484 server_ready_event_->Wait(); 1485 } 1486 1487 void OnDoClient2Task() { 1488 events_[3]->Wait(); 1489 events_[1]->Signal(); 1490 events_[2]->Signal(); 1491 DCHECK(received_msg_ == false); 1492 1493 Message* message = new SyncChannelTestMsg_NoArgs; 1494 message->set_unblock(true); 1495 Send(message); 1496 received_noarg_reply_ = true; 1497 } 1498 1499 base::Thread* ListenerThread() { return Worker::ListenerThread(); } 1500 private: 1501 bool OnMessageReceived(const Message& message) override { 1502 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient2, message) 1503 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1504 IPC_END_MESSAGE_MAP() 1505 return true; 1506 } 1507 1508 void OnNoArgs() { 1509 received_msg_ = true; 1510 PossiblyDone(); 1511 } 1512 1513 void PossiblyDone() { 1514 if (received_noarg_reply_ && received_msg_) { 1515 DCHECK(done_issued_ == false); 1516 done_issued_ = true; 1517 Send(new SyncChannelTestMsg_Done); 1518 Done(); 1519 } 1520 } 1521 1522 WaitableEvent* server_ready_event_; 1523 WaitableEvent** events_; 1524 bool received_msg_; 1525 bool received_noarg_reply_; 1526 bool done_issued_; 1527 }; 1528 1529 class RestrictedDispatchDeadlockClient1 : public Worker { 1530 public: 1531 RestrictedDispatchDeadlockClient1( 1532 RestrictedDispatchDeadlockServer* server, 1533 RestrictedDispatchDeadlockClient2* peer, 1534 WaitableEvent* server_ready_event, 1535 WaitableEvent** events, 1536 mojo::ScopedMessagePipeHandle channel_handle) 1537 : Worker(std::move(channel_handle), Channel::MODE_CLIENT), 1538 server_(server), 1539 peer_(peer), 1540 server_ready_event_(server_ready_event), 1541 events_(events), 1542 received_msg_(false), 1543 received_noarg_reply_(false), 1544 done_issued_(false) {} 1545 1546 void Run() override { 1547 server_ready_event_->Wait(); 1548 server_->ListenerThread()->task_runner()->PostTask( 1549 FROM_HERE, base::Bind(&RestrictedDispatchDeadlockServer::OnDoServerTask, 1550 base::Unretained(server_))); 1551 peer_->ListenerThread()->task_runner()->PostTask( 1552 FROM_HERE, 1553 base::Bind(&RestrictedDispatchDeadlockClient2::OnDoClient2Task, 1554 base::Unretained(peer_))); 1555 events_[0]->Wait(); 1556 events_[1]->Wait(); 1557 DCHECK(received_msg_ == false); 1558 1559 Message* message = new SyncChannelTestMsg_NoArgs; 1560 message->set_unblock(true); 1561 Send(message); 1562 received_noarg_reply_ = true; 1563 PossiblyDone(); 1564 } 1565 1566 private: 1567 bool OnMessageReceived(const Message& message) override { 1568 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient1, message) 1569 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1570 IPC_END_MESSAGE_MAP() 1571 return true; 1572 } 1573 1574 void OnNoArgs() { 1575 received_msg_ = true; 1576 PossiblyDone(); 1577 } 1578 1579 void PossiblyDone() { 1580 if (received_noarg_reply_ && received_msg_) { 1581 DCHECK(done_issued_ == false); 1582 done_issued_ = true; 1583 Send(new SyncChannelTestMsg_Done); 1584 Done(); 1585 } 1586 } 1587 1588 RestrictedDispatchDeadlockServer* server_; 1589 RestrictedDispatchDeadlockClient2* peer_; 1590 WaitableEvent* server_ready_event_; 1591 WaitableEvent** events_; 1592 bool received_msg_; 1593 bool received_noarg_reply_; 1594 bool done_issued_; 1595 }; 1596 1597 TEST_F(IPCSyncChannelTest, RestrictedDispatchDeadlock) { 1598 std::vector<Worker*> workers; 1599 1600 // A shared worker thread so that server1 and server2 run on one thread. 1601 base::Thread worker_thread("RestrictedDispatchDeadlock"); 1602 ASSERT_TRUE(worker_thread.Start()); 1603 1604 WaitableEvent server1_ready(base::WaitableEvent::ResetPolicy::AUTOMATIC, 1605 base::WaitableEvent::InitialState::NOT_SIGNALED); 1606 WaitableEvent server2_ready(base::WaitableEvent::ResetPolicy::AUTOMATIC, 1607 base::WaitableEvent::InitialState::NOT_SIGNALED); 1608 1609 WaitableEvent event0(base::WaitableEvent::ResetPolicy::AUTOMATIC, 1610 base::WaitableEvent::InitialState::NOT_SIGNALED); 1611 WaitableEvent event1(base::WaitableEvent::ResetPolicy::AUTOMATIC, 1612 base::WaitableEvent::InitialState::NOT_SIGNALED); 1613 WaitableEvent event2(base::WaitableEvent::ResetPolicy::AUTOMATIC, 1614 base::WaitableEvent::InitialState::NOT_SIGNALED); 1615 WaitableEvent event3(base::WaitableEvent::ResetPolicy::AUTOMATIC, 1616 base::WaitableEvent::InitialState::NOT_SIGNALED); 1617 WaitableEvent* events[4] = {&event0, &event1, &event2, &event3}; 1618 1619 RestrictedDispatchDeadlockServer* server1; 1620 RestrictedDispatchDeadlockServer* server2; 1621 RestrictedDispatchDeadlockClient1* client1; 1622 RestrictedDispatchDeadlockClient2* client2; 1623 1624 mojo::MessagePipe pipe1, pipe2; 1625 server2 = new RestrictedDispatchDeadlockServer( 1626 2, &server2_ready, events, NULL, std::move(pipe2.handle0)); 1627 server2->OverrideThread(&worker_thread); 1628 workers.push_back(server2); 1629 1630 client2 = new RestrictedDispatchDeadlockClient2( 1631 server2, &server2_ready, events, std::move(pipe2.handle1)); 1632 workers.push_back(client2); 1633 1634 server1 = new RestrictedDispatchDeadlockServer( 1635 1, &server1_ready, events, server2, std::move(pipe1.handle0)); 1636 server1->OverrideThread(&worker_thread); 1637 workers.push_back(server1); 1638 1639 client1 = new RestrictedDispatchDeadlockClient1( 1640 server1, client2, &server1_ready, events, std::move(pipe1.handle1)); 1641 workers.push_back(client1); 1642 1643 RunTest(workers); 1644 } 1645 1646 //------------------------------------------------------------------------------ 1647 1648 // This test case inspired by crbug.com/120530 1649 // We create 4 workers that pipe to each other W1->W2->W3->W4->W1 then we send a 1650 // message that recurses through 3, 4 or 5 steps to make sure, say, W1 can 1651 // re-enter when called from W4 while it's sending a message to W2. 1652 // The first worker drives the whole test so it must be treated specially. 1653 1654 class RestrictedDispatchPipeWorker : public Worker { 1655 public: 1656 RestrictedDispatchPipeWorker(mojo::ScopedMessagePipeHandle channel_handle1, 1657 WaitableEvent* event1, 1658 mojo::ScopedMessagePipeHandle channel_handle2, 1659 WaitableEvent* event2, 1660 int group, 1661 int* success) 1662 : Worker(std::move(channel_handle1), Channel::MODE_SERVER), 1663 event1_(event1), 1664 event2_(event2), 1665 other_channel_handle_(std::move(channel_handle2)), 1666 group_(group), 1667 success_(success) {} 1668 1669 void OnPingTTL(int ping, int* ret) { 1670 *ret = 0; 1671 if (!ping) 1672 return; 1673 other_channel_->Send(new SyncChannelTestMsg_PingTTL(ping - 1, ret)); 1674 ++*ret; 1675 } 1676 1677 void OnDone() { 1678 if (is_first()) 1679 return; 1680 other_channel_->Send(new SyncChannelTestMsg_Done); 1681 other_channel_.reset(); 1682 Done(); 1683 } 1684 1685 void Run() override { 1686 channel()->SetRestrictDispatchChannelGroup(group_); 1687 if (is_first()) 1688 event1_->Signal(); 1689 event2_->Wait(); 1690 other_channel_ = SyncChannel::Create( 1691 other_channel_handle_.release(), IPC::Channel::MODE_CLIENT, this, 1692 ipc_thread().task_runner(), base::ThreadTaskRunnerHandle::Get(), true, 1693 shutdown_event()); 1694 other_channel_->SetRestrictDispatchChannelGroup(group_); 1695 if (!is_first()) { 1696 event1_->Signal(); 1697 return; 1698 } 1699 *success_ = 0; 1700 int value = 0; 1701 OnPingTTL(3, &value); 1702 *success_ += (value == 3); 1703 OnPingTTL(4, &value); 1704 *success_ += (value == 4); 1705 OnPingTTL(5, &value); 1706 *success_ += (value == 5); 1707 other_channel_->Send(new SyncChannelTestMsg_Done); 1708 other_channel_.reset(); 1709 Done(); 1710 } 1711 1712 bool is_first() { return !!success_; } 1713 1714 private: 1715 bool OnMessageReceived(const Message& message) override { 1716 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchPipeWorker, message) 1717 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL) 1718 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, OnDone) 1719 IPC_END_MESSAGE_MAP() 1720 return true; 1721 } 1722 1723 std::unique_ptr<SyncChannel> other_channel_; 1724 WaitableEvent* event1_; 1725 WaitableEvent* event2_; 1726 mojo::ScopedMessagePipeHandle other_channel_handle_; 1727 int group_; 1728 int* success_; 1729 }; 1730 1731 #if defined(OS_ANDROID) 1732 #define MAYBE_RestrictedDispatch4WayDeadlock \ 1733 DISABLED_RestrictedDispatch4WayDeadlock 1734 #else 1735 #define MAYBE_RestrictedDispatch4WayDeadlock RestrictedDispatch4WayDeadlock 1736 #endif 1737 TEST_F(IPCSyncChannelTest, MAYBE_RestrictedDispatch4WayDeadlock) { 1738 int success = 0; 1739 std::vector<Worker*> workers; 1740 WaitableEvent event0(base::WaitableEvent::ResetPolicy::MANUAL, 1741 base::WaitableEvent::InitialState::NOT_SIGNALED); 1742 WaitableEvent event1(base::WaitableEvent::ResetPolicy::MANUAL, 1743 base::WaitableEvent::InitialState::NOT_SIGNALED); 1744 WaitableEvent event2(base::WaitableEvent::ResetPolicy::MANUAL, 1745 base::WaitableEvent::InitialState::NOT_SIGNALED); 1746 WaitableEvent event3(base::WaitableEvent::ResetPolicy::MANUAL, 1747 base::WaitableEvent::InitialState::NOT_SIGNALED); 1748 mojo::MessagePipe pipe0, pipe1, pipe2, pipe3; 1749 workers.push_back(new RestrictedDispatchPipeWorker( 1750 std::move(pipe0.handle0), &event0, std::move(pipe1.handle1), &event1, 1, 1751 &success)); 1752 workers.push_back(new RestrictedDispatchPipeWorker( 1753 std::move(pipe1.handle0), &event1, std::move(pipe2.handle1), &event2, 2, 1754 NULL)); 1755 workers.push_back(new RestrictedDispatchPipeWorker( 1756 std::move(pipe2.handle0), &event2, std::move(pipe3.handle1), &event3, 3, 1757 NULL)); 1758 workers.push_back(new RestrictedDispatchPipeWorker( 1759 std::move(pipe3.handle0), &event3, std::move(pipe0.handle1), &event0, 4, 1760 NULL)); 1761 RunTest(workers); 1762 EXPECT_EQ(3, success); 1763 } 1764 1765 //------------------------------------------------------------------------------ 1766 1767 // This test case inspired by crbug.com/122443 1768 // We want to make sure a reply message with the unblock flag set correctly 1769 // behaves as a reply, not a regular message. 1770 // We have 3 workers. Server1 will send a message to Server2 (which will block), 1771 // during which it will dispatch a message comming from Client, at which point 1772 // it will send another message to Server2. While sending that second message it 1773 // will receive a reply from Server1 with the unblock flag. 1774 1775 class ReentrantReplyServer1 : public Worker { 1776 public: 1777 ReentrantReplyServer1(WaitableEvent* server_ready, 1778 mojo::ScopedMessagePipeHandle channel_handle1, 1779 mojo::ScopedMessagePipeHandle channel_handle2) 1780 : Worker(std::move(channel_handle1), Channel::MODE_SERVER), 1781 server_ready_(server_ready), 1782 other_channel_handle_(std::move(channel_handle2)) {} 1783 1784 void Run() override { 1785 server2_channel_ = SyncChannel::Create( 1786 other_channel_handle_.release(), IPC::Channel::MODE_CLIENT, this, 1787 ipc_thread().task_runner(), base::ThreadTaskRunnerHandle::Get(), true, 1788 shutdown_event()); 1789 server_ready_->Signal(); 1790 Message* msg = new SyncChannelTestMsg_Reentrant1(); 1791 server2_channel_->Send(msg); 1792 server2_channel_.reset(); 1793 Done(); 1794 } 1795 1796 private: 1797 bool OnMessageReceived(const Message& message) override { 1798 IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer1, message) 1799 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant2, OnReentrant2) 1800 IPC_REPLY_HANDLER(OnReply) 1801 IPC_END_MESSAGE_MAP() 1802 return true; 1803 } 1804 1805 void OnReentrant2() { 1806 Message* msg = new SyncChannelTestMsg_Reentrant3(); 1807 server2_channel_->Send(msg); 1808 } 1809 1810 void OnReply(const Message& message) { 1811 // If we get here, the Send() will never receive the reply (thus would 1812 // hang), so abort instead. 1813 LOG(FATAL) << "Reply message was dispatched"; 1814 } 1815 1816 WaitableEvent* server_ready_; 1817 std::unique_ptr<SyncChannel> server2_channel_; 1818 mojo::ScopedMessagePipeHandle other_channel_handle_; 1819 }; 1820 1821 class ReentrantReplyServer2 : public Worker { 1822 public: 1823 ReentrantReplyServer2(mojo::ScopedMessagePipeHandle channel_handle) 1824 : Worker(std::move(channel_handle), Channel::MODE_SERVER), reply_(NULL) {} 1825 1826 private: 1827 bool OnMessageReceived(const Message& message) override { 1828 IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer2, message) 1829 IPC_MESSAGE_HANDLER_DELAY_REPLY( 1830 SyncChannelTestMsg_Reentrant1, OnReentrant1) 1831 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant3, OnReentrant3) 1832 IPC_END_MESSAGE_MAP() 1833 return true; 1834 } 1835 1836 void OnReentrant1(Message* reply) { 1837 DCHECK(!reply_); 1838 reply_ = reply; 1839 } 1840 1841 void OnReentrant3() { 1842 DCHECK(reply_); 1843 Message* reply = reply_; 1844 reply_ = NULL; 1845 reply->set_unblock(true); 1846 Send(reply); 1847 Done(); 1848 } 1849 1850 Message* reply_; 1851 }; 1852 1853 class ReentrantReplyClient : public Worker { 1854 public: 1855 ReentrantReplyClient(WaitableEvent* server_ready, 1856 mojo::ScopedMessagePipeHandle channel_handle) 1857 : Worker(std::move(channel_handle), Channel::MODE_CLIENT), 1858 server_ready_(server_ready) {} 1859 1860 void Run() override { 1861 server_ready_->Wait(); 1862 Send(new SyncChannelTestMsg_Reentrant2()); 1863 Done(); 1864 } 1865 1866 private: 1867 WaitableEvent* server_ready_; 1868 }; 1869 1870 TEST_F(IPCSyncChannelTest, ReentrantReply) { 1871 std::vector<Worker*> workers; 1872 WaitableEvent server_ready(base::WaitableEvent::ResetPolicy::AUTOMATIC, 1873 base::WaitableEvent::InitialState::NOT_SIGNALED); 1874 mojo::MessagePipe pipe1, pipe2; 1875 workers.push_back(new ReentrantReplyServer2(std::move(pipe2.handle0))); 1876 workers.push_back(new ReentrantReplyServer1( 1877 &server_ready, std::move(pipe1.handle0), std::move(pipe2.handle1))); 1878 workers.push_back( 1879 new ReentrantReplyClient(&server_ready, std::move(pipe1.handle1))); 1880 RunTest(workers); 1881 } 1882 1883 } // namespace 1884 } // namespace IPC 1885