Home | History | Annotate | Download | only in ipc
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "ipc/ipc_sync_channel.h"
      6 
      7 #include <stddef.h>
      8 
      9 #include <memory>
     10 #include <string>
     11 #include <utility>
     12 #include <vector>
     13 
     14 #include "base/bind.h"
     15 #include "base/location.h"
     16 #include "base/logging.h"
     17 #include "base/macros.h"
     18 #include "base/message_loop/message_loop.h"
     19 #include "base/process/process_handle.h"
     20 #include "base/run_loop.h"
     21 #include "base/single_thread_task_runner.h"
     22 #include "base/strings/string_util.h"
     23 #include "base/synchronization/waitable_event.h"
     24 #include "base/threading/platform_thread.h"
     25 #include "base/threading/thread.h"
     26 #include "base/threading/thread_task_runner_handle.h"
     27 #include "build/build_config.h"
     28 #include "ipc/ipc_listener.h"
     29 #include "ipc/ipc_message.h"
     30 #include "ipc/ipc_sender.h"
     31 #include "ipc/ipc_sync_message_filter.h"
     32 #include "ipc/ipc_sync_message_unittest.h"
     33 #include "mojo/public/cpp/system/message_pipe.h"
     34 #include "testing/gtest/include/gtest/gtest.h"
     35 
     36 using base::WaitableEvent;
     37 
     38 namespace IPC {
     39 namespace {
     40 
     41 // Base class for a "process" with listener and IPC threads.
     42 class Worker : public Listener, public Sender {
     43  public:
     44   // Will create a channel without a name.
     45   Worker(Channel::Mode mode,
     46          const std::string& thread_name,
     47          mojo::ScopedMessagePipeHandle channel_handle)
     48       : done_(
     49             new WaitableEvent(base::WaitableEvent::ResetPolicy::AUTOMATIC,
     50                               base::WaitableEvent::InitialState::NOT_SIGNALED)),
     51         channel_created_(
     52             new WaitableEvent(base::WaitableEvent::ResetPolicy::AUTOMATIC,
     53                               base::WaitableEvent::InitialState::NOT_SIGNALED)),
     54         channel_handle_(std::move(channel_handle)),
     55         mode_(mode),
     56         ipc_thread_((thread_name + "_ipc").c_str()),
     57         listener_thread_((thread_name + "_listener").c_str()),
     58         overrided_thread_(NULL),
     59         shutdown_event_(base::WaitableEvent::ResetPolicy::MANUAL,
     60                         base::WaitableEvent::InitialState::NOT_SIGNALED),
     61         is_shutdown_(false) {}
     62 
     63   // Will create a named channel and use this name for the threads' name.
     64   Worker(mojo::ScopedMessagePipeHandle channel_handle, Channel::Mode mode)
     65       : done_(
     66             new WaitableEvent(base::WaitableEvent::ResetPolicy::AUTOMATIC,
     67                               base::WaitableEvent::InitialState::NOT_SIGNALED)),
     68         channel_created_(
     69             new WaitableEvent(base::WaitableEvent::ResetPolicy::AUTOMATIC,
     70                               base::WaitableEvent::InitialState::NOT_SIGNALED)),
     71         channel_handle_(std::move(channel_handle)),
     72         mode_(mode),
     73         ipc_thread_("ipc thread"),
     74         listener_thread_("listener thread"),
     75         overrided_thread_(NULL),
     76         shutdown_event_(base::WaitableEvent::ResetPolicy::MANUAL,
     77                         base::WaitableEvent::InitialState::NOT_SIGNALED),
     78         is_shutdown_(false) {}
     79 
     80   ~Worker() override {
     81     // Shutdown() must be called before destruction.
     82     CHECK(is_shutdown_);
     83   }
     84   bool Send(Message* msg) override { return channel_->Send(msg); }
     85   void WaitForChannelCreation() { channel_created_->Wait(); }
     86   void CloseChannel() {
     87     DCHECK(ListenerThread()->task_runner()->BelongsToCurrentThread());
     88     channel_->Close();
     89   }
     90   void Start() {
     91     StartThread(&listener_thread_, base::MessageLoop::TYPE_DEFAULT);
     92     ListenerThread()->task_runner()->PostTask(
     93         FROM_HERE, base::Bind(&Worker::OnStart, base::Unretained(this)));
     94   }
     95   void Shutdown() {
     96     // The IPC thread needs to outlive SyncChannel. We can't do this in
     97     // ~Worker(), since that'll reset the vtable pointer (to Worker's), which
     98     // may result in a race conditions. See http://crbug.com/25841.
     99     WaitableEvent listener_done(
    100         base::WaitableEvent::ResetPolicy::AUTOMATIC,
    101         base::WaitableEvent::InitialState::NOT_SIGNALED),
    102         ipc_done(base::WaitableEvent::ResetPolicy::AUTOMATIC,
    103                  base::WaitableEvent::InitialState::NOT_SIGNALED);
    104     ListenerThread()->task_runner()->PostTask(
    105         FROM_HERE,
    106         base::Bind(&Worker::OnListenerThreadShutdown1, base::Unretained(this),
    107                    &listener_done, &ipc_done));
    108     listener_done.Wait();
    109     ipc_done.Wait();
    110     ipc_thread_.Stop();
    111     listener_thread_.Stop();
    112     is_shutdown_ = true;
    113   }
    114   void OverrideThread(base::Thread* overrided_thread) {
    115     DCHECK(overrided_thread_ == NULL);
    116     overrided_thread_ = overrided_thread;
    117   }
    118   bool SendAnswerToLife(bool pump, bool succeed) {
    119     int answer = 0;
    120     SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
    121     if (pump)
    122       msg->EnableMessagePumping();
    123     bool result = Send(msg);
    124     DCHECK_EQ(result, succeed);
    125     DCHECK_EQ(answer, (succeed ? 42 : 0));
    126     return result;
    127   }
    128   bool SendDouble(bool pump, bool succeed) {
    129     int answer = 0;
    130     SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer);
    131     if (pump)
    132       msg->EnableMessagePumping();
    133     bool result = Send(msg);
    134     DCHECK_EQ(result, succeed);
    135     DCHECK_EQ(answer, (succeed ? 10 : 0));
    136     return result;
    137   }
    138   mojo::MessagePipeHandle TakeChannelHandle() {
    139     DCHECK(channel_handle_.is_valid());
    140     return channel_handle_.release();
    141   }
    142   Channel::Mode mode() { return mode_; }
    143   WaitableEvent* done_event() { return done_.get(); }
    144   WaitableEvent* shutdown_event() { return &shutdown_event_; }
    145   void ResetChannel() { channel_.reset(); }
    146   // Derived classes need to call this when they've completed their part of
    147   // the test.
    148   void Done() { done_->Signal(); }
    149 
    150  protected:
    151   SyncChannel* channel() { return channel_.get(); }
    152   // Functions for derived classes to implement if they wish.
    153   virtual void Run() { }
    154   virtual void OnAnswer(int* answer) { NOTREACHED(); }
    155   virtual void OnAnswerDelay(Message* reply_msg) {
    156     // The message handler map below can only take one entry for
    157     // SyncChannelTestMsg_AnswerToLife, so since some classes want
    158     // the normal version while other want the delayed reply, we
    159     // call the normal version if the derived class didn't override
    160     // this function.
    161     int answer;
    162     OnAnswer(&answer);
    163     SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, answer);
    164     Send(reply_msg);
    165   }
    166   virtual void OnDouble(int in, int* out) { NOTREACHED(); }
    167   virtual void OnDoubleDelay(int in, Message* reply_msg) {
    168     int result;
    169     OnDouble(in, &result);
    170     SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, result);
    171     Send(reply_msg);
    172   }
    173 
    174   virtual void OnNestedTestMsg(Message* reply_msg) {
    175     NOTREACHED();
    176   }
    177 
    178   virtual SyncChannel* CreateChannel() {
    179     std::unique_ptr<SyncChannel> channel = SyncChannel::Create(
    180         TakeChannelHandle(), mode_, this, ipc_thread_.task_runner(),
    181         base::ThreadTaskRunnerHandle::Get(), true, &shutdown_event_);
    182     return channel.release();
    183   }
    184 
    185   base::Thread* ListenerThread() {
    186     return overrided_thread_ ? overrided_thread_ : &listener_thread_;
    187   }
    188 
    189   const base::Thread& ipc_thread() const { return ipc_thread_; }
    190 
    191  private:
    192   // Called on the listener thread to create the sync channel.
    193   void OnStart() {
    194     // Link ipc_thread_, listener_thread_ and channel_ altogether.
    195     StartThread(&ipc_thread_, base::MessageLoop::TYPE_IO);
    196     channel_.reset(CreateChannel());
    197     channel_created_->Signal();
    198     Run();
    199   }
    200 
    201   void OnListenerThreadShutdown1(WaitableEvent* listener_event,
    202                                  WaitableEvent* ipc_event) {
    203     // SyncChannel needs to be destructed on the thread that it was created on.
    204     channel_.reset();
    205 
    206     base::RunLoop().RunUntilIdle();
    207 
    208     ipc_thread_.task_runner()->PostTask(
    209         FROM_HERE,
    210         base::Bind(&Worker::OnIPCThreadShutdown, base::Unretained(this),
    211                    listener_event, ipc_event));
    212   }
    213 
    214   void OnIPCThreadShutdown(WaitableEvent* listener_event,
    215                            WaitableEvent* ipc_event) {
    216     base::RunLoop().RunUntilIdle();
    217     ipc_event->Signal();
    218 
    219     listener_thread_.task_runner()->PostTask(
    220         FROM_HERE, base::Bind(&Worker::OnListenerThreadShutdown2,
    221                               base::Unretained(this), listener_event));
    222   }
    223 
    224   void OnListenerThreadShutdown2(WaitableEvent* listener_event) {
    225     base::RunLoop().RunUntilIdle();
    226     listener_event->Signal();
    227   }
    228 
    229   bool OnMessageReceived(const Message& message) override {
    230     IPC_BEGIN_MESSAGE_MAP(Worker, message)
    231      IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_Double, OnDoubleDelay)
    232      IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_AnswerToLife,
    233                                      OnAnswerDelay)
    234      IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelNestedTestMsg_String,
    235                                      OnNestedTestMsg)
    236     IPC_END_MESSAGE_MAP()
    237     return true;
    238   }
    239 
    240   void StartThread(base::Thread* thread, base::MessageLoop::Type type) {
    241     base::Thread::Options options;
    242     options.message_loop_type = type;
    243     thread->StartWithOptions(options);
    244   }
    245 
    246   std::unique_ptr<WaitableEvent> done_;
    247   std::unique_ptr<WaitableEvent> channel_created_;
    248   mojo::ScopedMessagePipeHandle channel_handle_;
    249   Channel::Mode mode_;
    250   std::unique_ptr<SyncChannel> channel_;
    251   base::Thread ipc_thread_;
    252   base::Thread listener_thread_;
    253   base::Thread* overrided_thread_;
    254 
    255   base::WaitableEvent shutdown_event_;
    256 
    257   bool is_shutdown_;
    258 
    259   DISALLOW_COPY_AND_ASSIGN(Worker);
    260 };
    261 
    262 
    263 // Starts the test with the given workers.  This function deletes the workers
    264 // when it's done.
    265 void RunTest(std::vector<Worker*> workers) {
    266   // First we create the workers that are channel servers, or else the other
    267   // workers' channel initialization might fail because the pipe isn't created..
    268   for (size_t i = 0; i < workers.size(); ++i) {
    269     if (workers[i]->mode() & Channel::MODE_SERVER_FLAG) {
    270       workers[i]->Start();
    271       workers[i]->WaitForChannelCreation();
    272     }
    273   }
    274 
    275   // now create the clients
    276   for (size_t i = 0; i < workers.size(); ++i) {
    277     if (workers[i]->mode() & Channel::MODE_CLIENT_FLAG)
    278       workers[i]->Start();
    279   }
    280 
    281   // wait for all the workers to finish
    282   for (size_t i = 0; i < workers.size(); ++i)
    283     workers[i]->done_event()->Wait();
    284 
    285   for (size_t i = 0; i < workers.size(); ++i) {
    286     workers[i]->Shutdown();
    287     delete workers[i];
    288   }
    289 }
    290 
    291 class IPCSyncChannelTest : public testing::Test {
    292  private:
    293   base::MessageLoop message_loop_;
    294 };
    295 
    296 //------------------------------------------------------------------------------
    297 
    298 class SimpleServer : public Worker {
    299  public:
    300   SimpleServer(bool pump_during_send,
    301                mojo::ScopedMessagePipeHandle channel_handle)
    302       : Worker(Channel::MODE_SERVER,
    303                "simpler_server",
    304                std::move(channel_handle)),
    305         pump_during_send_(pump_during_send) {}
    306   void Run() override {
    307     SendAnswerToLife(pump_during_send_, true);
    308     Done();
    309   }
    310 
    311   bool pump_during_send_;
    312 };
    313 
    314 class SimpleClient : public Worker {
    315  public:
    316   explicit SimpleClient(mojo::ScopedMessagePipeHandle channel_handle)
    317       : Worker(Channel::MODE_CLIENT,
    318                "simple_client",
    319                std::move(channel_handle)) {}
    320 
    321   void OnAnswer(int* answer) override {
    322     *answer = 42;
    323     Done();
    324   }
    325 };
    326 
    327 void Simple(bool pump_during_send) {
    328   std::vector<Worker*> workers;
    329   mojo::MessagePipe pipe;
    330   workers.push_back(
    331       new SimpleServer(pump_during_send, std::move(pipe.handle0)));
    332   workers.push_back(new SimpleClient(std::move(pipe.handle1)));
    333   RunTest(workers);
    334 }
    335 
    336 #if defined(OS_ANDROID)
    337 #define MAYBE_Simple DISABLED_Simple
    338 #else
    339 #define MAYBE_Simple Simple
    340 #endif
    341 // Tests basic synchronous call
    342 TEST_F(IPCSyncChannelTest, MAYBE_Simple) {
    343   Simple(false);
    344   Simple(true);
    345 }
    346 
    347 //------------------------------------------------------------------------------
    348 
    349 // Worker classes which override how the sync channel is created to use the
    350 // two-step initialization (calling the lightweight constructor and then
    351 // ChannelProxy::Init separately) process.
    352 class TwoStepServer : public Worker {
    353  public:
    354   TwoStepServer(bool create_pipe_now,
    355                 mojo::ScopedMessagePipeHandle channel_handle)
    356       : Worker(Channel::MODE_SERVER,
    357                "simpler_server",
    358                std::move(channel_handle)),
    359         create_pipe_now_(create_pipe_now) {}
    360 
    361   void Run() override {
    362     SendAnswerToLife(false, true);
    363     Done();
    364   }
    365 
    366   SyncChannel* CreateChannel() override {
    367     SyncChannel* channel =
    368         SyncChannel::Create(TakeChannelHandle(), mode(), this,
    369                             ipc_thread().task_runner(),
    370                             base::ThreadTaskRunnerHandle::Get(),
    371                             create_pipe_now_, shutdown_event())
    372             .release();
    373     return channel;
    374   }
    375 
    376   bool create_pipe_now_;
    377 };
    378 
    379 class TwoStepClient : public Worker {
    380  public:
    381   TwoStepClient(bool create_pipe_now,
    382                 mojo::ScopedMessagePipeHandle channel_handle)
    383       : Worker(Channel::MODE_CLIENT,
    384                "simple_client",
    385                std::move(channel_handle)),
    386         create_pipe_now_(create_pipe_now) {}
    387 
    388   void OnAnswer(int* answer) override {
    389     *answer = 42;
    390     Done();
    391   }
    392 
    393   SyncChannel* CreateChannel() override {
    394     SyncChannel* channel =
    395         SyncChannel::Create(TakeChannelHandle(), mode(), this,
    396                             ipc_thread().task_runner(),
    397                             base::ThreadTaskRunnerHandle::Get(),
    398                             create_pipe_now_, shutdown_event())
    399             .release();
    400     return channel;
    401   }
    402 
    403   bool create_pipe_now_;
    404 };
    405 
    406 void TwoStep(bool create_server_pipe_now, bool create_client_pipe_now) {
    407   std::vector<Worker*> workers;
    408   mojo::MessagePipe pipe;
    409   workers.push_back(
    410       new TwoStepServer(create_server_pipe_now, std::move(pipe.handle0)));
    411   workers.push_back(
    412       new TwoStepClient(create_client_pipe_now, std::move(pipe.handle1)));
    413   RunTest(workers);
    414 }
    415 
    416 // Tests basic two-step initialization, where you call the lightweight
    417 // constructor then Init.
    418 TEST_F(IPCSyncChannelTest, TwoStepInitialization) {
    419   TwoStep(false, false);
    420   TwoStep(false, true);
    421   TwoStep(true, false);
    422   TwoStep(true, true);
    423 }
    424 
    425 //------------------------------------------------------------------------------
    426 
    427 class DelayClient : public Worker {
    428  public:
    429   explicit DelayClient(mojo::ScopedMessagePipeHandle channel_handle)
    430       : Worker(Channel::MODE_CLIENT,
    431                "delay_client",
    432                std::move(channel_handle)) {}
    433 
    434   void OnAnswerDelay(Message* reply_msg) override {
    435     SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
    436     Send(reply_msg);
    437     Done();
    438   }
    439 };
    440 
    441 void DelayReply(bool pump_during_send) {
    442   std::vector<Worker*> workers;
    443   mojo::MessagePipe pipe;
    444   workers.push_back(
    445       new SimpleServer(pump_during_send, std::move(pipe.handle0)));
    446   workers.push_back(new DelayClient(std::move(pipe.handle1)));
    447   RunTest(workers);
    448 }
    449 
    450 // Tests that asynchronous replies work
    451 TEST_F(IPCSyncChannelTest, DelayReply) {
    452   DelayReply(false);
    453   DelayReply(true);
    454 }
    455 
    456 //------------------------------------------------------------------------------
    457 
    458 class NoHangServer : public Worker {
    459  public:
    460   NoHangServer(WaitableEvent* got_first_reply,
    461                bool pump_during_send,
    462                mojo::ScopedMessagePipeHandle channel_handle)
    463       : Worker(Channel::MODE_SERVER,
    464                "no_hang_server",
    465                std::move(channel_handle)),
    466         got_first_reply_(got_first_reply),
    467         pump_during_send_(pump_during_send) {}
    468   void Run() override {
    469     SendAnswerToLife(pump_during_send_, true);
    470     got_first_reply_->Signal();
    471 
    472     SendAnswerToLife(pump_during_send_, false);
    473     Done();
    474   }
    475 
    476   WaitableEvent* got_first_reply_;
    477   bool pump_during_send_;
    478 };
    479 
    480 class NoHangClient : public Worker {
    481  public:
    482   NoHangClient(WaitableEvent* got_first_reply,
    483                mojo::ScopedMessagePipeHandle channel_handle)
    484       : Worker(Channel::MODE_CLIENT,
    485                "no_hang_client",
    486                std::move(channel_handle)),
    487         got_first_reply_(got_first_reply) {}
    488 
    489   void OnAnswerDelay(Message* reply_msg) override {
    490     // Use the DELAY_REPLY macro so that we can force the reply to be sent
    491     // before this function returns (when the channel will be reset).
    492     SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
    493     Send(reply_msg);
    494     got_first_reply_->Wait();
    495     CloseChannel();
    496     Done();
    497   }
    498 
    499   WaitableEvent* got_first_reply_;
    500 };
    501 
    502 void NoHang(bool pump_during_send) {
    503   WaitableEvent got_first_reply(
    504       base::WaitableEvent::ResetPolicy::AUTOMATIC,
    505       base::WaitableEvent::InitialState::NOT_SIGNALED);
    506   std::vector<Worker*> workers;
    507   mojo::MessagePipe pipe;
    508   workers.push_back(new NoHangServer(&got_first_reply, pump_during_send,
    509                                      std::move(pipe.handle0)));
    510   workers.push_back(
    511       new NoHangClient(&got_first_reply, std::move(pipe.handle1)));
    512   RunTest(workers);
    513 }
    514 
    515 // Tests that caller doesn't hang if receiver dies
    516 TEST_F(IPCSyncChannelTest, NoHang) {
    517   NoHang(false);
    518   NoHang(true);
    519 }
    520 
    521 //------------------------------------------------------------------------------
    522 
    523 class UnblockServer : public Worker {
    524  public:
    525   UnblockServer(bool pump_during_send,
    526                 bool delete_during_send,
    527                 mojo::ScopedMessagePipeHandle channel_handle)
    528       : Worker(Channel::MODE_SERVER,
    529                "unblock_server",
    530                std::move(channel_handle)),
    531         pump_during_send_(pump_during_send),
    532         delete_during_send_(delete_during_send) {}
    533   void Run() override {
    534     if (delete_during_send_) {
    535       // Use custom code since race conditions mean the answer may or may not be
    536       // available.
    537       int answer = 0;
    538       SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
    539       if (pump_during_send_)
    540         msg->EnableMessagePumping();
    541       Send(msg);
    542     } else {
    543       SendAnswerToLife(pump_during_send_, true);
    544     }
    545     Done();
    546   }
    547 
    548   void OnDoubleDelay(int in, Message* reply_msg) override {
    549     SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2);
    550     Send(reply_msg);
    551     if (delete_during_send_)
    552       ResetChannel();
    553   }
    554 
    555   bool pump_during_send_;
    556   bool delete_during_send_;
    557 };
    558 
    559 class UnblockClient : public Worker {
    560  public:
    561   UnblockClient(bool pump_during_send,
    562                 mojo::ScopedMessagePipeHandle channel_handle)
    563       : Worker(Channel::MODE_CLIENT,
    564                "unblock_client",
    565                std::move(channel_handle)),
    566         pump_during_send_(pump_during_send) {}
    567 
    568   void OnAnswer(int* answer) override {
    569     SendDouble(pump_during_send_, true);
    570     *answer = 42;
    571     Done();
    572   }
    573 
    574   bool pump_during_send_;
    575 };
    576 
    577 void Unblock(bool server_pump, bool client_pump, bool delete_during_send) {
    578   std::vector<Worker*> workers;
    579   mojo::MessagePipe pipe;
    580   workers.push_back(new UnblockServer(server_pump, delete_during_send,
    581                                       std::move(pipe.handle0)));
    582   workers.push_back(new UnblockClient(client_pump, std::move(pipe.handle1)));
    583   RunTest(workers);
    584 }
    585 
    586 // Tests that the caller unblocks to answer a sync message from the receiver.
    587 TEST_F(IPCSyncChannelTest, Unblock) {
    588   Unblock(false, false, false);
    589   Unblock(false, true, false);
    590   Unblock(true, false, false);
    591   Unblock(true, true, false);
    592 }
    593 
    594 //------------------------------------------------------------------------------
    595 
    596 #if defined(OS_ANDROID)
    597 #define MAYBE_ChannelDeleteDuringSend DISABLED_ChannelDeleteDuringSend
    598 #else
    599 #define MAYBE_ChannelDeleteDuringSend ChannelDeleteDuringSend
    600 #endif
    601 // Tests that the the SyncChannel object can be deleted during a Send.
    602 TEST_F(IPCSyncChannelTest, MAYBE_ChannelDeleteDuringSend) {
    603   Unblock(false, false, true);
    604   Unblock(false, true, true);
    605   Unblock(true, false, true);
    606   Unblock(true, true, true);
    607 }
    608 
    609 //------------------------------------------------------------------------------
    610 
    611 class RecursiveServer : public Worker {
    612  public:
    613   RecursiveServer(bool expected_send_result,
    614                   bool pump_first,
    615                   bool pump_second,
    616                   mojo::ScopedMessagePipeHandle channel_handle)
    617       : Worker(Channel::MODE_SERVER,
    618                "recursive_server",
    619                std::move(channel_handle)),
    620         expected_send_result_(expected_send_result),
    621         pump_first_(pump_first),
    622         pump_second_(pump_second) {}
    623   void Run() override {
    624     SendDouble(pump_first_, expected_send_result_);
    625     Done();
    626   }
    627 
    628   void OnDouble(int in, int* out) override {
    629     *out = in * 2;
    630     SendAnswerToLife(pump_second_, expected_send_result_);
    631   }
    632 
    633   bool expected_send_result_, pump_first_, pump_second_;
    634 };
    635 
    636 class RecursiveClient : public Worker {
    637  public:
    638   RecursiveClient(bool pump_during_send,
    639                   bool close_channel,
    640                   mojo::ScopedMessagePipeHandle channel_handle)
    641       : Worker(Channel::MODE_CLIENT,
    642                "recursive_client",
    643                std::move(channel_handle)),
    644         pump_during_send_(pump_during_send),
    645         close_channel_(close_channel) {}
    646 
    647   void OnDoubleDelay(int in, Message* reply_msg) override {
    648     SendDouble(pump_during_send_, !close_channel_);
    649     if (close_channel_) {
    650       delete reply_msg;
    651     } else {
    652       SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2);
    653       Send(reply_msg);
    654     }
    655     Done();
    656   }
    657 
    658   void OnAnswerDelay(Message* reply_msg) override {
    659     if (close_channel_) {
    660       delete reply_msg;
    661       CloseChannel();
    662     } else {
    663       SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
    664       Send(reply_msg);
    665     }
    666   }
    667 
    668   bool pump_during_send_, close_channel_;
    669 };
    670 
    671 void Recursive(
    672     bool server_pump_first, bool server_pump_second, bool client_pump) {
    673   std::vector<Worker*> workers;
    674   mojo::MessagePipe pipe;
    675   workers.push_back(new RecursiveServer(
    676       true, server_pump_first, server_pump_second, std::move(pipe.handle0)));
    677   workers.push_back(
    678       new RecursiveClient(client_pump, false, std::move(pipe.handle1)));
    679   RunTest(workers);
    680 }
    681 
    682 // Tests a server calling Send while another Send is pending.
    683 TEST_F(IPCSyncChannelTest, Recursive) {
    684   Recursive(false, false, false);
    685   Recursive(false, false, true);
    686   Recursive(false, true, false);
    687   Recursive(false, true, true);
    688   Recursive(true, false, false);
    689   Recursive(true, false, true);
    690   Recursive(true, true, false);
    691   Recursive(true, true, true);
    692 }
    693 
    694 //------------------------------------------------------------------------------
    695 
    696 void RecursiveNoHang(
    697     bool server_pump_first, bool server_pump_second, bool client_pump) {
    698   std::vector<Worker*> workers;
    699   mojo::MessagePipe pipe;
    700   workers.push_back(new RecursiveServer(
    701       false, server_pump_first, server_pump_second, std::move(pipe.handle0)));
    702   workers.push_back(
    703       new RecursiveClient(client_pump, true, std::move(pipe.handle1)));
    704   RunTest(workers);
    705 }
    706 
    707 // Tests that if a caller makes a sync call during an existing sync call and
    708 // the receiver dies, neither of the Send() calls hang.
    709 TEST_F(IPCSyncChannelTest, RecursiveNoHang) {
    710   RecursiveNoHang(false, false, false);
    711   RecursiveNoHang(false, false, true);
    712   RecursiveNoHang(false, true, false);
    713   RecursiveNoHang(false, true, true);
    714   RecursiveNoHang(true, false, false);
    715   RecursiveNoHang(true, false, true);
    716   RecursiveNoHang(true, true, false);
    717   RecursiveNoHang(true, true, true);
    718 }
    719 
    720 //------------------------------------------------------------------------------
    721 
    722 class MultipleServer1 : public Worker {
    723  public:
    724   MultipleServer1(bool pump_during_send,
    725                   mojo::ScopedMessagePipeHandle channel_handle)
    726       : Worker(std::move(channel_handle), Channel::MODE_SERVER),
    727         pump_during_send_(pump_during_send) {}
    728 
    729   void Run() override {
    730     SendDouble(pump_during_send_, true);
    731     Done();
    732   }
    733 
    734   bool pump_during_send_;
    735 };
    736 
    737 class MultipleClient1 : public Worker {
    738  public:
    739   MultipleClient1(WaitableEvent* client1_msg_received,
    740                   WaitableEvent* client1_can_reply,
    741                   mojo::ScopedMessagePipeHandle channel_handle)
    742       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
    743         client1_msg_received_(client1_msg_received),
    744         client1_can_reply_(client1_can_reply) {}
    745 
    746   void OnDouble(int in, int* out) override {
    747     client1_msg_received_->Signal();
    748     *out = in * 2;
    749     client1_can_reply_->Wait();
    750     Done();
    751   }
    752 
    753  private:
    754   WaitableEvent *client1_msg_received_, *client1_can_reply_;
    755 };
    756 
    757 class MultipleServer2 : public Worker {
    758  public:
    759   explicit MultipleServer2(mojo::ScopedMessagePipeHandle channel_handle)
    760       : Worker(std::move(channel_handle), Channel::MODE_SERVER) {}
    761 
    762   void OnAnswer(int* result) override {
    763     *result = 42;
    764     Done();
    765   }
    766 };
    767 
    768 class MultipleClient2 : public Worker {
    769  public:
    770   MultipleClient2(WaitableEvent* client1_msg_received,
    771                   WaitableEvent* client1_can_reply,
    772                   bool pump_during_send,
    773                   mojo::ScopedMessagePipeHandle channel_handle)
    774       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
    775         client1_msg_received_(client1_msg_received),
    776         client1_can_reply_(client1_can_reply),
    777         pump_during_send_(pump_during_send) {}
    778 
    779   void Run() override {
    780     client1_msg_received_->Wait();
    781     SendAnswerToLife(pump_during_send_, true);
    782     client1_can_reply_->Signal();
    783     Done();
    784   }
    785 
    786  private:
    787   WaitableEvent *client1_msg_received_, *client1_can_reply_;
    788   bool pump_during_send_;
    789 };
    790 
    791 void Multiple(bool server_pump, bool client_pump) {
    792   std::vector<Worker*> workers;
    793 
    794   // A shared worker thread so that server1 and server2 run on one thread.
    795   base::Thread worker_thread("Multiple");
    796   ASSERT_TRUE(worker_thread.Start());
    797 
    798   // Server1 sends a sync msg to client1, which blocks the reply until
    799   // server2 (which runs on the same worker thread as server1) responds
    800   // to a sync msg from client2.
    801   WaitableEvent client1_msg_received(
    802       base::WaitableEvent::ResetPolicy::AUTOMATIC,
    803       base::WaitableEvent::InitialState::NOT_SIGNALED);
    804   WaitableEvent client1_can_reply(
    805       base::WaitableEvent::ResetPolicy::AUTOMATIC,
    806       base::WaitableEvent::InitialState::NOT_SIGNALED);
    807 
    808   Worker* worker;
    809 
    810   mojo::MessagePipe pipe1, pipe2;
    811   worker = new MultipleServer2(std::move(pipe2.handle0));
    812   worker->OverrideThread(&worker_thread);
    813   workers.push_back(worker);
    814 
    815   worker = new MultipleClient2(&client1_msg_received, &client1_can_reply,
    816                                client_pump, std::move(pipe2.handle1));
    817   workers.push_back(worker);
    818 
    819   worker = new MultipleServer1(server_pump, std::move(pipe1.handle0));
    820   worker->OverrideThread(&worker_thread);
    821   workers.push_back(worker);
    822 
    823   worker = new MultipleClient1(&client1_msg_received, &client1_can_reply,
    824                                std::move(pipe1.handle1));
    825   workers.push_back(worker);
    826 
    827   RunTest(workers);
    828 }
    829 
    830 // Tests that multiple SyncObjects on the same listener thread can unblock each
    831 // other.
    832 TEST_F(IPCSyncChannelTest, Multiple) {
    833   Multiple(false, false);
    834   Multiple(false, true);
    835   Multiple(true, false);
    836   Multiple(true, true);
    837 }
    838 
    839 //------------------------------------------------------------------------------
    840 
    841 // This class provides server side functionality to test the case where
    842 // multiple sync channels are in use on the same thread on the client and
    843 // nested calls are issued.
    844 class QueuedReplyServer : public Worker {
    845  public:
    846   QueuedReplyServer(base::Thread* listener_thread,
    847                     mojo::ScopedMessagePipeHandle channel_handle,
    848                     const std::string& reply_text)
    849       : Worker(std::move(channel_handle), Channel::MODE_SERVER),
    850         reply_text_(reply_text) {
    851     Worker::OverrideThread(listener_thread);
    852   }
    853 
    854   void OnNestedTestMsg(Message* reply_msg) override {
    855     VLOG(1) << __FUNCTION__ << " Sending reply: " << reply_text_;
    856     SyncChannelNestedTestMsg_String::WriteReplyParams(reply_msg, reply_text_);
    857     Send(reply_msg);
    858     Done();
    859   }
    860 
    861  private:
    862   std::string reply_text_;
    863 };
    864 
    865 // The QueuedReplyClient class provides functionality to test the case where
    866 // multiple sync channels are in use on the same thread and they make nested
    867 // sync calls, i.e. while the first channel waits for a response it makes a
    868 // sync call on another channel.
    869 // The callstack should unwind correctly, i.e. the outermost call should
    870 // complete first, and so on.
    871 class QueuedReplyClient : public Worker {
    872  public:
    873   QueuedReplyClient(base::Thread* listener_thread,
    874                     mojo::ScopedMessagePipeHandle channel_handle,
    875                     const std::string& expected_text,
    876                     bool pump_during_send)
    877       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
    878         pump_during_send_(pump_during_send),
    879         expected_text_(expected_text) {
    880     Worker::OverrideThread(listener_thread);
    881   }
    882 
    883   void Run() override {
    884     std::string response;
    885     SyncMessage* msg = new SyncChannelNestedTestMsg_String(&response);
    886     if (pump_during_send_)
    887       msg->EnableMessagePumping();
    888     bool result = Send(msg);
    889     DCHECK(result);
    890     DCHECK_EQ(response, expected_text_);
    891 
    892     VLOG(1) << __FUNCTION__ << " Received reply: " << response;
    893     Done();
    894   }
    895 
    896  private:
    897   bool pump_during_send_;
    898   std::string expected_text_;
    899 };
    900 
    901 void QueuedReply(bool client_pump) {
    902   std::vector<Worker*> workers;
    903 
    904   // A shared worker thread for servers
    905   base::Thread server_worker_thread("QueuedReply_ServerListener");
    906   ASSERT_TRUE(server_worker_thread.Start());
    907 
    908   base::Thread client_worker_thread("QueuedReply_ClientListener");
    909   ASSERT_TRUE(client_worker_thread.Start());
    910 
    911   Worker* worker;
    912 
    913   mojo::MessagePipe pipe1, pipe2;
    914   worker = new QueuedReplyServer(&server_worker_thread,
    915                                  std::move(pipe1.handle0), "Got first message");
    916   workers.push_back(worker);
    917 
    918   worker = new QueuedReplyServer(
    919       &server_worker_thread, std::move(pipe2.handle0), "Got second message");
    920   workers.push_back(worker);
    921 
    922   worker =
    923       new QueuedReplyClient(&client_worker_thread, std::move(pipe1.handle1),
    924                             "Got first message", client_pump);
    925   workers.push_back(worker);
    926 
    927   worker =
    928       new QueuedReplyClient(&client_worker_thread, std::move(pipe2.handle1),
    929                             "Got second message", client_pump);
    930   workers.push_back(worker);
    931 
    932   RunTest(workers);
    933 }
    934 
    935 // While a blocking send is in progress, the listener thread might answer other
    936 // synchronous messages.  This tests that if during the response to another
    937 // message the reply to the original messages comes, it is queued up correctly
    938 // and the original Send is unblocked later.
    939 // We also test that the send call stacks unwind correctly when the channel
    940 // pumps messages while waiting for a response.
    941 TEST_F(IPCSyncChannelTest, QueuedReply) {
    942   QueuedReply(false);
    943   QueuedReply(true);
    944 }
    945 
    946 //------------------------------------------------------------------------------
    947 
    948 class ChattyClient : public Worker {
    949  public:
    950   explicit ChattyClient(mojo::ScopedMessagePipeHandle channel_handle)
    951       : Worker(Channel::MODE_CLIENT,
    952                "chatty_client",
    953                std::move(channel_handle)) {}
    954 
    955   void OnAnswer(int* answer) override {
    956     // The PostMessage limit is 10k.  Send 20% more than that.
    957     const int kMessageLimit = 10000;
    958     const int kMessagesToSend = kMessageLimit * 120 / 100;
    959     for (int i = 0; i < kMessagesToSend; ++i) {
    960       if (!SendDouble(false, true))
    961         break;
    962     }
    963     *answer = 42;
    964     Done();
    965   }
    966 };
    967 
    968 void ChattyServer(bool pump_during_send) {
    969   std::vector<Worker*> workers;
    970   mojo::MessagePipe pipe;
    971   workers.push_back(
    972       new UnblockServer(pump_during_send, false, std::move(pipe.handle0)));
    973   workers.push_back(new ChattyClient(std::move(pipe.handle1)));
    974   RunTest(workers);
    975 }
    976 
    977 #if defined(OS_ANDROID)
    978 // Times out.
    979 #define MAYBE_ChattyServer DISABLED_ChattyServer
    980 #else
    981 #define MAYBE_ChattyServer ChattyServer
    982 #endif
    983 // Tests http://b/1093251 - that sending lots of sync messages while
    984 // the receiver is waiting for a sync reply does not overflow the PostMessage
    985 // queue.
    986 TEST_F(IPCSyncChannelTest, MAYBE_ChattyServer) {
    987   ChattyServer(false);
    988 }
    989 
    990 #if defined(OS_ANDROID)
    991 // Times out.
    992 #define MAYBE_ChattyServerPumpDuringSend DISABLED_ChattyServerPumpDuringSend
    993 #else
    994 #define MAYBE_ChattyServerPumpDuringSend ChattyServerPumpDuringSend
    995 #endif
    996 TEST_F(IPCSyncChannelTest, MAYBE_ChattyServerPumpDuringSend) {
    997   ChattyServer(true);
    998 }
    999 
   1000 //------------------------------------------------------------------------------
   1001 
   1002 void NestedCallback(Worker* server) {
   1003   // Sleep a bit so that we wake up after the reply has been received.
   1004   base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(250));
   1005   server->SendAnswerToLife(true, true);
   1006 }
   1007 
   1008 bool timeout_occurred = false;
   1009 
   1010 void TimeoutCallback() {
   1011   timeout_occurred = true;
   1012 }
   1013 
   1014 class DoneEventRaceServer : public Worker {
   1015  public:
   1016   explicit DoneEventRaceServer(mojo::ScopedMessagePipeHandle channel_handle)
   1017       : Worker(Channel::MODE_SERVER,
   1018                "done_event_race_server",
   1019                std::move(channel_handle)) {}
   1020 
   1021   void Run() override {
   1022     base::ThreadTaskRunnerHandle::Get()->PostTask(
   1023         FROM_HERE, base::Bind(&NestedCallback, base::Unretained(this)));
   1024     base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
   1025         FROM_HERE, base::Bind(&TimeoutCallback),
   1026         base::TimeDelta::FromSeconds(9));
   1027     // Even though we have a timeout on the Send, it will succeed since for this
   1028     // bug, the reply message comes back and is deserialized, however the done
   1029     // event wasn't set.  So we indirectly use the timeout task to notice if a
   1030     // timeout occurred.
   1031     SendAnswerToLife(true, true);
   1032     DCHECK(!timeout_occurred);
   1033     Done();
   1034   }
   1035 };
   1036 
   1037 #if defined(OS_ANDROID)
   1038 #define MAYBE_DoneEventRace DISABLED_DoneEventRace
   1039 #else
   1040 #define MAYBE_DoneEventRace DoneEventRace
   1041 #endif
   1042 // Tests http://b/1474092 - that if after the done_event is set but before
   1043 // OnObjectSignaled is called another message is sent out, then after its
   1044 // reply comes back OnObjectSignaled will be called for the first message.
   1045 TEST_F(IPCSyncChannelTest, MAYBE_DoneEventRace) {
   1046   std::vector<Worker*> workers;
   1047   mojo::MessagePipe pipe;
   1048   workers.push_back(new DoneEventRaceServer(std::move(pipe.handle0)));
   1049   workers.push_back(new SimpleClient(std::move(pipe.handle1)));
   1050   RunTest(workers);
   1051 }
   1052 
   1053 //------------------------------------------------------------------------------
   1054 
   1055 class TestSyncMessageFilter : public SyncMessageFilter {
   1056  public:
   1057   TestSyncMessageFilter(
   1058       base::WaitableEvent* shutdown_event,
   1059       Worker* worker,
   1060       scoped_refptr<base::SingleThreadTaskRunner> task_runner)
   1061       : SyncMessageFilter(shutdown_event),
   1062         worker_(worker),
   1063         task_runner_(task_runner) {}
   1064 
   1065   void OnFilterAdded(Channel* channel) override {
   1066     SyncMessageFilter::OnFilterAdded(channel);
   1067     task_runner_->PostTask(
   1068         FROM_HERE,
   1069         base::Bind(&TestSyncMessageFilter::SendMessageOnHelperThread, this));
   1070   }
   1071 
   1072   void SendMessageOnHelperThread() {
   1073     int answer = 0;
   1074     bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
   1075     DCHECK(result);
   1076     DCHECK_EQ(answer, 42);
   1077 
   1078     worker_->Done();
   1079   }
   1080 
   1081  private:
   1082   ~TestSyncMessageFilter() override = default;
   1083 
   1084   Worker* worker_;
   1085   scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
   1086 };
   1087 
   1088 class SyncMessageFilterServer : public Worker {
   1089  public:
   1090   explicit SyncMessageFilterServer(mojo::ScopedMessagePipeHandle channel_handle)
   1091       : Worker(Channel::MODE_SERVER,
   1092                "sync_message_filter_server",
   1093                std::move(channel_handle)),
   1094         thread_("helper_thread") {
   1095     base::Thread::Options options;
   1096     options.message_loop_type = base::MessageLoop::TYPE_DEFAULT;
   1097     thread_.StartWithOptions(options);
   1098     filter_ = new TestSyncMessageFilter(shutdown_event(), this,
   1099                                         thread_.task_runner());
   1100   }
   1101 
   1102   void Run() override {
   1103     channel()->AddFilter(filter_.get());
   1104   }
   1105 
   1106   base::Thread thread_;
   1107   scoped_refptr<TestSyncMessageFilter> filter_;
   1108 };
   1109 
   1110 // This class provides functionality to test the case that a Send on the sync
   1111 // channel does not crash after the channel has been closed.
   1112 class ServerSendAfterClose : public Worker {
   1113  public:
   1114   explicit ServerSendAfterClose(mojo::ScopedMessagePipeHandle channel_handle)
   1115       : Worker(Channel::MODE_SERVER,
   1116                "simpler_server",
   1117                std::move(channel_handle)),
   1118         send_result_(true) {}
   1119 
   1120   bool SendDummy() {
   1121     ListenerThread()->task_runner()->PostTask(
   1122         FROM_HERE,
   1123         base::Bind(base::IgnoreResult(&ServerSendAfterClose::Send),
   1124                    base::Unretained(this), new SyncChannelTestMsg_NoArgs));
   1125     return true;
   1126   }
   1127 
   1128   bool send_result() const {
   1129     return send_result_;
   1130   }
   1131 
   1132  private:
   1133   void Run() override {
   1134     CloseChannel();
   1135     Done();
   1136   }
   1137 
   1138   bool Send(Message* msg) override {
   1139     send_result_ = Worker::Send(msg);
   1140     Done();
   1141     return send_result_;
   1142   }
   1143 
   1144   bool send_result_;
   1145 };
   1146 
   1147 // Tests basic synchronous call
   1148 TEST_F(IPCSyncChannelTest, SyncMessageFilter) {
   1149   std::vector<Worker*> workers;
   1150   mojo::MessagePipe pipe;
   1151   workers.push_back(new SyncMessageFilterServer(std::move(pipe.handle0)));
   1152   workers.push_back(new SimpleClient(std::move(pipe.handle1)));
   1153   RunTest(workers);
   1154 }
   1155 
   1156 // Test the case when the channel is closed and a Send is attempted after that.
   1157 TEST_F(IPCSyncChannelTest, SendAfterClose) {
   1158   mojo::MessagePipe pipe;
   1159   ServerSendAfterClose server(std::move(pipe.handle0));
   1160   server.Start();
   1161 
   1162   server.done_event()->Wait();
   1163   server.done_event()->Reset();
   1164 
   1165   server.SendDummy();
   1166   server.done_event()->Wait();
   1167 
   1168   EXPECT_FALSE(server.send_result());
   1169 
   1170   server.Shutdown();
   1171 }
   1172 
   1173 //------------------------------------------------------------------------------
   1174 
   1175 class RestrictedDispatchServer : public Worker {
   1176  public:
   1177   RestrictedDispatchServer(WaitableEvent* sent_ping_event,
   1178                            WaitableEvent* wait_event,
   1179                            mojo::ScopedMessagePipeHandle channel_handle)
   1180       : Worker(std::move(channel_handle), Channel::MODE_SERVER),
   1181         sent_ping_event_(sent_ping_event),
   1182         wait_event_(wait_event) {}
   1183 
   1184   void OnDoPing(int ping) {
   1185     // Send an asynchronous message that unblocks the caller.
   1186     Message* msg = new SyncChannelTestMsg_Ping(ping);
   1187     msg->set_unblock(true);
   1188     Send(msg);
   1189     // Signal the event after the message has been sent on the channel, on the
   1190     // IPC thread.
   1191     ipc_thread().task_runner()->PostTask(
   1192         FROM_HERE, base::Bind(&RestrictedDispatchServer::OnPingSent,
   1193                               base::Unretained(this)));
   1194   }
   1195 
   1196   void OnPingTTL(int ping, int* out) {
   1197     *out = ping;
   1198     wait_event_->Wait();
   1199   }
   1200 
   1201   base::Thread* ListenerThread() { return Worker::ListenerThread(); }
   1202 
   1203  private:
   1204   bool OnMessageReceived(const Message& message) override {
   1205     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchServer, message)
   1206      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
   1207      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL)
   1208      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
   1209     IPC_END_MESSAGE_MAP()
   1210     return true;
   1211   }
   1212 
   1213   void OnPingSent() {
   1214     sent_ping_event_->Signal();
   1215   }
   1216 
   1217   void OnNoArgs() { }
   1218   WaitableEvent* sent_ping_event_;
   1219   WaitableEvent* wait_event_;
   1220 };
   1221 
   1222 class NonRestrictedDispatchServer : public Worker {
   1223  public:
   1224   NonRestrictedDispatchServer(WaitableEvent* signal_event,
   1225                               mojo::ScopedMessagePipeHandle channel_handle)
   1226       : Worker(std::move(channel_handle), Channel::MODE_SERVER),
   1227         signal_event_(signal_event) {}
   1228 
   1229   base::Thread* ListenerThread() { return Worker::ListenerThread(); }
   1230 
   1231   void OnDoPingTTL(int ping) {
   1232     int value = 0;
   1233     Send(new SyncChannelTestMsg_PingTTL(ping, &value));
   1234     signal_event_->Signal();
   1235   }
   1236 
   1237  private:
   1238   bool OnMessageReceived(const Message& message) override {
   1239     IPC_BEGIN_MESSAGE_MAP(NonRestrictedDispatchServer, message)
   1240      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
   1241      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
   1242     IPC_END_MESSAGE_MAP()
   1243     return true;
   1244   }
   1245 
   1246   void OnNoArgs() { }
   1247   WaitableEvent* signal_event_;
   1248 };
   1249 
   1250 class RestrictedDispatchClient : public Worker {
   1251  public:
   1252   RestrictedDispatchClient(
   1253       WaitableEvent* sent_ping_event,
   1254       RestrictedDispatchServer* server,
   1255       NonRestrictedDispatchServer* server2,
   1256       int* success,
   1257       mojo::ScopedMessagePipeHandle restricted_channel_handle,
   1258       mojo::ScopedMessagePipeHandle non_restricted_channel_handle)
   1259       : Worker(std::move(restricted_channel_handle), Channel::MODE_CLIENT),
   1260         ping_(0),
   1261         server_(server),
   1262         server2_(server2),
   1263         success_(success),
   1264         sent_ping_event_(sent_ping_event),
   1265         non_restricted_channel_handle_(
   1266             std::move(non_restricted_channel_handle)) {}
   1267 
   1268   void Run() override {
   1269     // Incoming messages from our channel should only be dispatched when we
   1270     // send a message on that same channel.
   1271     channel()->SetRestrictDispatchChannelGroup(1);
   1272 
   1273     server_->ListenerThread()->task_runner()->PostTask(
   1274         FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing,
   1275                               base::Unretained(server_), 1));
   1276     sent_ping_event_->Wait();
   1277     Send(new SyncChannelTestMsg_NoArgs);
   1278     if (ping_ == 1)
   1279       ++*success_;
   1280     else
   1281       LOG(ERROR) << "Send failed to dispatch incoming message on same channel";
   1282 
   1283     non_restricted_channel_ = SyncChannel::Create(
   1284         non_restricted_channel_handle_.release(), IPC::Channel::MODE_CLIENT,
   1285         this, ipc_thread().task_runner(), base::ThreadTaskRunnerHandle::Get(),
   1286         true, shutdown_event());
   1287 
   1288     server_->ListenerThread()->task_runner()->PostTask(
   1289         FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing,
   1290                               base::Unretained(server_), 2));
   1291     sent_ping_event_->Wait();
   1292     // Check that the incoming message is *not* dispatched when sending on the
   1293     // non restricted channel.
   1294     // TODO(piman): there is a possibility of a false positive race condition
   1295     // here, if the message that was posted on the server-side end of the pipe
   1296     // is not visible yet on the client side, but I don't know how to solve this
   1297     // without hooking into the internals of SyncChannel. I haven't seen it in
   1298     // practice (i.e. not setting SetRestrictDispatchToSameChannel does cause
   1299     // the following to fail).
   1300     non_restricted_channel_->Send(new SyncChannelTestMsg_NoArgs);
   1301     if (ping_ == 1)
   1302       ++*success_;
   1303     else
   1304       LOG(ERROR) << "Send dispatched message from restricted channel";
   1305 
   1306     Send(new SyncChannelTestMsg_NoArgs);
   1307     if (ping_ == 2)
   1308       ++*success_;
   1309     else
   1310       LOG(ERROR) << "Send failed to dispatch incoming message on same channel";
   1311 
   1312     // Check that the incoming message on the non-restricted channel is
   1313     // dispatched when sending on the restricted channel.
   1314     server2_->ListenerThread()->task_runner()->PostTask(
   1315         FROM_HERE, base::Bind(&NonRestrictedDispatchServer::OnDoPingTTL,
   1316                               base::Unretained(server2_), 3));
   1317     int value = 0;
   1318     Send(new SyncChannelTestMsg_PingTTL(4, &value));
   1319     if (ping_ == 3 && value == 4)
   1320       ++*success_;
   1321     else
   1322       LOG(ERROR) << "Send failed to dispatch message from unrestricted channel";
   1323 
   1324     non_restricted_channel_->Send(new SyncChannelTestMsg_Done);
   1325     non_restricted_channel_.reset();
   1326     Send(new SyncChannelTestMsg_Done);
   1327     Done();
   1328   }
   1329 
   1330  private:
   1331   bool OnMessageReceived(const Message& message) override {
   1332     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchClient, message)
   1333      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Ping, OnPing)
   1334      IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_PingTTL, OnPingTTL)
   1335     IPC_END_MESSAGE_MAP()
   1336     return true;
   1337   }
   1338 
   1339   void OnPing(int ping) {
   1340     ping_ = ping;
   1341   }
   1342 
   1343   void OnPingTTL(int ping, IPC::Message* reply) {
   1344     ping_ = ping;
   1345     // This message comes from the NonRestrictedDispatchServer, we have to send
   1346     // the reply back manually.
   1347     SyncChannelTestMsg_PingTTL::WriteReplyParams(reply, ping);
   1348     non_restricted_channel_->Send(reply);
   1349   }
   1350 
   1351   int ping_;
   1352   RestrictedDispatchServer* server_;
   1353   NonRestrictedDispatchServer* server2_;
   1354   int* success_;
   1355   WaitableEvent* sent_ping_event_;
   1356   std::unique_ptr<SyncChannel> non_restricted_channel_;
   1357   mojo::ScopedMessagePipeHandle non_restricted_channel_handle_;
   1358 };
   1359 
   1360 TEST_F(IPCSyncChannelTest, RestrictedDispatch) {
   1361   WaitableEvent sent_ping_event(
   1362       base::WaitableEvent::ResetPolicy::AUTOMATIC,
   1363       base::WaitableEvent::InitialState::NOT_SIGNALED);
   1364   WaitableEvent wait_event(base::WaitableEvent::ResetPolicy::AUTOMATIC,
   1365                            base::WaitableEvent::InitialState::NOT_SIGNALED);
   1366   mojo::MessagePipe restricted_pipe, non_restricted_pipe;
   1367   RestrictedDispatchServer* server = new RestrictedDispatchServer(
   1368       &sent_ping_event, &wait_event, std::move(restricted_pipe.handle0));
   1369   NonRestrictedDispatchServer* server2 = new NonRestrictedDispatchServer(
   1370       &wait_event, std::move(non_restricted_pipe.handle0));
   1371 
   1372   int success = 0;
   1373   std::vector<Worker*> workers;
   1374   workers.push_back(server);
   1375   workers.push_back(server2);
   1376   workers.push_back(
   1377       new RestrictedDispatchClient(&sent_ping_event, server, server2, &success,
   1378                                    std::move(restricted_pipe.handle1),
   1379                                    std::move(non_restricted_pipe.handle1)));
   1380   RunTest(workers);
   1381   EXPECT_EQ(4, success);
   1382 }
   1383 
   1384 //------------------------------------------------------------------------------
   1385 
   1386 // This test case inspired by crbug.com/108491
   1387 // We create two servers that use the same ListenerThread but have
   1388 // SetRestrictDispatchToSameChannel set to true.
   1389 // We create clients, then use some specific WaitableEvent wait/signalling to
   1390 // ensure that messages get dispatched in a way that causes a deadlock due to
   1391 // a nested dispatch and an eligible message in a higher-level dispatch's
   1392 // delayed_queue. Specifically, we start with client1 about so send an
   1393 // unblocking message to server1, while the shared listener thread for the
   1394 // servers server1 and server2 is about to send a non-unblocking message to
   1395 // client1. At the same time, client2 will be about to send an unblocking
   1396 // message to server2. Server1 will handle the client1->server1 message by
   1397 // telling server2 to send a non-unblocking message to client2.
   1398 // What should happen is that the send to server2 should find the pending,
   1399 // same-context client2->server2 message to dispatch, causing client2 to
   1400 // unblock then handle the server2->client2 message, so that the shared
   1401 // servers' listener thread can then respond to the client1->server1 message.
   1402 // Then client1 can handle the non-unblocking server1->client1 message.
   1403 // The old code would end up in a state where the server2->client2 message is
   1404 // sent, but the client2->server2 message (which is eligible for dispatch, and
   1405 // which is what client2 is waiting for) is stashed in a local delayed_queue
   1406 // that has server1's channel context, causing a deadlock.
   1407 // WaitableEvents in the events array are used to:
   1408 //   event 0: indicate to client1 that server listener is in OnDoServerTask
   1409 //   event 1: indicate to client1 that client2 listener is in OnDoClient2Task
   1410 //   event 2: indicate to server1 that client2 listener is in OnDoClient2Task
   1411 //   event 3: indicate to client2 that server listener is in OnDoServerTask
   1412 
   1413 class RestrictedDispatchDeadlockServer : public Worker {
   1414  public:
   1415   RestrictedDispatchDeadlockServer(int server_num,
   1416                                    WaitableEvent* server_ready_event,
   1417                                    WaitableEvent** events,
   1418                                    RestrictedDispatchDeadlockServer* peer,
   1419                                    mojo::ScopedMessagePipeHandle channel_handle)
   1420       : Worker(std::move(channel_handle), Channel::MODE_SERVER),
   1421         server_num_(server_num),
   1422         server_ready_event_(server_ready_event),
   1423         events_(events),
   1424         peer_(peer) {}
   1425 
   1426   void OnDoServerTask() {
   1427     events_[3]->Signal();
   1428     events_[2]->Wait();
   1429     events_[0]->Signal();
   1430     SendMessageToClient();
   1431   }
   1432 
   1433   void Run() override {
   1434     channel()->SetRestrictDispatchChannelGroup(1);
   1435     server_ready_event_->Signal();
   1436   }
   1437 
   1438   base::Thread* ListenerThread() { return Worker::ListenerThread(); }
   1439 
   1440  private:
   1441   bool OnMessageReceived(const Message& message) override {
   1442     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockServer, message)
   1443      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
   1444      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
   1445     IPC_END_MESSAGE_MAP()
   1446     return true;
   1447   }
   1448 
   1449   void OnNoArgs() {
   1450     if (server_num_ == 1) {
   1451       DCHECK(peer_ != NULL);
   1452       peer_->SendMessageToClient();
   1453     }
   1454   }
   1455 
   1456   void SendMessageToClient() {
   1457     Message* msg = new SyncChannelTestMsg_NoArgs;
   1458     msg->set_unblock(false);
   1459     DCHECK(!msg->should_unblock());
   1460     Send(msg);
   1461   }
   1462 
   1463   int server_num_;
   1464   WaitableEvent* server_ready_event_;
   1465   WaitableEvent** events_;
   1466   RestrictedDispatchDeadlockServer* peer_;
   1467 };
   1468 
   1469 class RestrictedDispatchDeadlockClient2 : public Worker {
   1470  public:
   1471   RestrictedDispatchDeadlockClient2(
   1472       RestrictedDispatchDeadlockServer* server,
   1473       WaitableEvent* server_ready_event,
   1474       WaitableEvent** events,
   1475       mojo::ScopedMessagePipeHandle channel_handle)
   1476       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
   1477         server_ready_event_(server_ready_event),
   1478         events_(events),
   1479         received_msg_(false),
   1480         received_noarg_reply_(false),
   1481         done_issued_(false) {}
   1482 
   1483   void Run() override {
   1484     server_ready_event_->Wait();
   1485   }
   1486 
   1487   void OnDoClient2Task() {
   1488     events_[3]->Wait();
   1489     events_[1]->Signal();
   1490     events_[2]->Signal();
   1491     DCHECK(received_msg_ == false);
   1492 
   1493     Message* message = new SyncChannelTestMsg_NoArgs;
   1494     message->set_unblock(true);
   1495     Send(message);
   1496     received_noarg_reply_ = true;
   1497   }
   1498 
   1499   base::Thread* ListenerThread() { return Worker::ListenerThread(); }
   1500  private:
   1501   bool OnMessageReceived(const Message& message) override {
   1502     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient2, message)
   1503      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
   1504     IPC_END_MESSAGE_MAP()
   1505     return true;
   1506   }
   1507 
   1508   void OnNoArgs() {
   1509     received_msg_ = true;
   1510     PossiblyDone();
   1511   }
   1512 
   1513   void PossiblyDone() {
   1514     if (received_noarg_reply_ && received_msg_) {
   1515       DCHECK(done_issued_ == false);
   1516       done_issued_ = true;
   1517       Send(new SyncChannelTestMsg_Done);
   1518       Done();
   1519     }
   1520   }
   1521 
   1522   WaitableEvent* server_ready_event_;
   1523   WaitableEvent** events_;
   1524   bool received_msg_;
   1525   bool received_noarg_reply_;
   1526   bool done_issued_;
   1527 };
   1528 
   1529 class RestrictedDispatchDeadlockClient1 : public Worker {
   1530  public:
   1531   RestrictedDispatchDeadlockClient1(
   1532       RestrictedDispatchDeadlockServer* server,
   1533       RestrictedDispatchDeadlockClient2* peer,
   1534       WaitableEvent* server_ready_event,
   1535       WaitableEvent** events,
   1536       mojo::ScopedMessagePipeHandle channel_handle)
   1537       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
   1538         server_(server),
   1539         peer_(peer),
   1540         server_ready_event_(server_ready_event),
   1541         events_(events),
   1542         received_msg_(false),
   1543         received_noarg_reply_(false),
   1544         done_issued_(false) {}
   1545 
   1546   void Run() override {
   1547     server_ready_event_->Wait();
   1548     server_->ListenerThread()->task_runner()->PostTask(
   1549         FROM_HERE, base::Bind(&RestrictedDispatchDeadlockServer::OnDoServerTask,
   1550                               base::Unretained(server_)));
   1551     peer_->ListenerThread()->task_runner()->PostTask(
   1552         FROM_HERE,
   1553         base::Bind(&RestrictedDispatchDeadlockClient2::OnDoClient2Task,
   1554                    base::Unretained(peer_)));
   1555     events_[0]->Wait();
   1556     events_[1]->Wait();
   1557     DCHECK(received_msg_ == false);
   1558 
   1559     Message* message = new SyncChannelTestMsg_NoArgs;
   1560     message->set_unblock(true);
   1561     Send(message);
   1562     received_noarg_reply_ = true;
   1563     PossiblyDone();
   1564   }
   1565 
   1566  private:
   1567   bool OnMessageReceived(const Message& message) override {
   1568     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient1, message)
   1569      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
   1570     IPC_END_MESSAGE_MAP()
   1571     return true;
   1572   }
   1573 
   1574   void OnNoArgs() {
   1575     received_msg_ = true;
   1576     PossiblyDone();
   1577   }
   1578 
   1579   void PossiblyDone() {
   1580     if (received_noarg_reply_ && received_msg_) {
   1581       DCHECK(done_issued_ == false);
   1582       done_issued_ = true;
   1583       Send(new SyncChannelTestMsg_Done);
   1584       Done();
   1585     }
   1586   }
   1587 
   1588   RestrictedDispatchDeadlockServer* server_;
   1589   RestrictedDispatchDeadlockClient2* peer_;
   1590   WaitableEvent* server_ready_event_;
   1591   WaitableEvent** events_;
   1592   bool received_msg_;
   1593   bool received_noarg_reply_;
   1594   bool done_issued_;
   1595 };
   1596 
   1597 TEST_F(IPCSyncChannelTest, RestrictedDispatchDeadlock) {
   1598   std::vector<Worker*> workers;
   1599 
   1600   // A shared worker thread so that server1 and server2 run on one thread.
   1601   base::Thread worker_thread("RestrictedDispatchDeadlock");
   1602   ASSERT_TRUE(worker_thread.Start());
   1603 
   1604   WaitableEvent server1_ready(base::WaitableEvent::ResetPolicy::AUTOMATIC,
   1605                               base::WaitableEvent::InitialState::NOT_SIGNALED);
   1606   WaitableEvent server2_ready(base::WaitableEvent::ResetPolicy::AUTOMATIC,
   1607                               base::WaitableEvent::InitialState::NOT_SIGNALED);
   1608 
   1609   WaitableEvent event0(base::WaitableEvent::ResetPolicy::AUTOMATIC,
   1610                        base::WaitableEvent::InitialState::NOT_SIGNALED);
   1611   WaitableEvent event1(base::WaitableEvent::ResetPolicy::AUTOMATIC,
   1612                        base::WaitableEvent::InitialState::NOT_SIGNALED);
   1613   WaitableEvent event2(base::WaitableEvent::ResetPolicy::AUTOMATIC,
   1614                        base::WaitableEvent::InitialState::NOT_SIGNALED);
   1615   WaitableEvent event3(base::WaitableEvent::ResetPolicy::AUTOMATIC,
   1616                        base::WaitableEvent::InitialState::NOT_SIGNALED);
   1617   WaitableEvent* events[4] = {&event0, &event1, &event2, &event3};
   1618 
   1619   RestrictedDispatchDeadlockServer* server1;
   1620   RestrictedDispatchDeadlockServer* server2;
   1621   RestrictedDispatchDeadlockClient1* client1;
   1622   RestrictedDispatchDeadlockClient2* client2;
   1623 
   1624   mojo::MessagePipe pipe1, pipe2;
   1625   server2 = new RestrictedDispatchDeadlockServer(
   1626       2, &server2_ready, events, NULL, std::move(pipe2.handle0));
   1627   server2->OverrideThread(&worker_thread);
   1628   workers.push_back(server2);
   1629 
   1630   client2 = new RestrictedDispatchDeadlockClient2(
   1631       server2, &server2_ready, events, std::move(pipe2.handle1));
   1632   workers.push_back(client2);
   1633 
   1634   server1 = new RestrictedDispatchDeadlockServer(
   1635       1, &server1_ready, events, server2, std::move(pipe1.handle0));
   1636   server1->OverrideThread(&worker_thread);
   1637   workers.push_back(server1);
   1638 
   1639   client1 = new RestrictedDispatchDeadlockClient1(
   1640       server1, client2, &server1_ready, events, std::move(pipe1.handle1));
   1641   workers.push_back(client1);
   1642 
   1643   RunTest(workers);
   1644 }
   1645 
   1646 //------------------------------------------------------------------------------
   1647 
   1648 // This test case inspired by crbug.com/120530
   1649 // We create 4 workers that pipe to each other W1->W2->W3->W4->W1 then we send a
   1650 // message that recurses through 3, 4 or 5 steps to make sure, say, W1 can
   1651 // re-enter when called from W4 while it's sending a message to W2.
   1652 // The first worker drives the whole test so it must be treated specially.
   1653 
   1654 class RestrictedDispatchPipeWorker : public Worker {
   1655  public:
   1656   RestrictedDispatchPipeWorker(mojo::ScopedMessagePipeHandle channel_handle1,
   1657                                WaitableEvent* event1,
   1658                                mojo::ScopedMessagePipeHandle channel_handle2,
   1659                                WaitableEvent* event2,
   1660                                int group,
   1661                                int* success)
   1662       : Worker(std::move(channel_handle1), Channel::MODE_SERVER),
   1663         event1_(event1),
   1664         event2_(event2),
   1665         other_channel_handle_(std::move(channel_handle2)),
   1666         group_(group),
   1667         success_(success) {}
   1668 
   1669   void OnPingTTL(int ping, int* ret) {
   1670     *ret = 0;
   1671     if (!ping)
   1672       return;
   1673     other_channel_->Send(new SyncChannelTestMsg_PingTTL(ping - 1, ret));
   1674     ++*ret;
   1675   }
   1676 
   1677   void OnDone() {
   1678     if (is_first())
   1679       return;
   1680     other_channel_->Send(new SyncChannelTestMsg_Done);
   1681     other_channel_.reset();
   1682     Done();
   1683   }
   1684 
   1685   void Run() override {
   1686     channel()->SetRestrictDispatchChannelGroup(group_);
   1687     if (is_first())
   1688       event1_->Signal();
   1689     event2_->Wait();
   1690     other_channel_ = SyncChannel::Create(
   1691         other_channel_handle_.release(), IPC::Channel::MODE_CLIENT, this,
   1692         ipc_thread().task_runner(), base::ThreadTaskRunnerHandle::Get(), true,
   1693         shutdown_event());
   1694     other_channel_->SetRestrictDispatchChannelGroup(group_);
   1695     if (!is_first()) {
   1696       event1_->Signal();
   1697       return;
   1698     }
   1699     *success_ = 0;
   1700     int value = 0;
   1701     OnPingTTL(3, &value);
   1702     *success_ += (value == 3);
   1703     OnPingTTL(4, &value);
   1704     *success_ += (value == 4);
   1705     OnPingTTL(5, &value);
   1706     *success_ += (value == 5);
   1707     other_channel_->Send(new SyncChannelTestMsg_Done);
   1708     other_channel_.reset();
   1709     Done();
   1710   }
   1711 
   1712   bool is_first() { return !!success_; }
   1713 
   1714  private:
   1715   bool OnMessageReceived(const Message& message) override {
   1716     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchPipeWorker, message)
   1717      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL)
   1718      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, OnDone)
   1719     IPC_END_MESSAGE_MAP()
   1720     return true;
   1721   }
   1722 
   1723   std::unique_ptr<SyncChannel> other_channel_;
   1724   WaitableEvent* event1_;
   1725   WaitableEvent* event2_;
   1726   mojo::ScopedMessagePipeHandle other_channel_handle_;
   1727   int group_;
   1728   int* success_;
   1729 };
   1730 
   1731 #if defined(OS_ANDROID)
   1732 #define MAYBE_RestrictedDispatch4WayDeadlock \
   1733   DISABLED_RestrictedDispatch4WayDeadlock
   1734 #else
   1735 #define MAYBE_RestrictedDispatch4WayDeadlock RestrictedDispatch4WayDeadlock
   1736 #endif
   1737 TEST_F(IPCSyncChannelTest, MAYBE_RestrictedDispatch4WayDeadlock) {
   1738   int success = 0;
   1739   std::vector<Worker*> workers;
   1740   WaitableEvent event0(base::WaitableEvent::ResetPolicy::MANUAL,
   1741                        base::WaitableEvent::InitialState::NOT_SIGNALED);
   1742   WaitableEvent event1(base::WaitableEvent::ResetPolicy::MANUAL,
   1743                        base::WaitableEvent::InitialState::NOT_SIGNALED);
   1744   WaitableEvent event2(base::WaitableEvent::ResetPolicy::MANUAL,
   1745                        base::WaitableEvent::InitialState::NOT_SIGNALED);
   1746   WaitableEvent event3(base::WaitableEvent::ResetPolicy::MANUAL,
   1747                        base::WaitableEvent::InitialState::NOT_SIGNALED);
   1748   mojo::MessagePipe pipe0, pipe1, pipe2, pipe3;
   1749   workers.push_back(new RestrictedDispatchPipeWorker(
   1750       std::move(pipe0.handle0), &event0, std::move(pipe1.handle1), &event1, 1,
   1751       &success));
   1752   workers.push_back(new RestrictedDispatchPipeWorker(
   1753       std::move(pipe1.handle0), &event1, std::move(pipe2.handle1), &event2, 2,
   1754       NULL));
   1755   workers.push_back(new RestrictedDispatchPipeWorker(
   1756       std::move(pipe2.handle0), &event2, std::move(pipe3.handle1), &event3, 3,
   1757       NULL));
   1758   workers.push_back(new RestrictedDispatchPipeWorker(
   1759       std::move(pipe3.handle0), &event3, std::move(pipe0.handle1), &event0, 4,
   1760       NULL));
   1761   RunTest(workers);
   1762   EXPECT_EQ(3, success);
   1763 }
   1764 
   1765 //------------------------------------------------------------------------------
   1766 
   1767 // This test case inspired by crbug.com/122443
   1768 // We want to make sure a reply message with the unblock flag set correctly
   1769 // behaves as a reply, not a regular message.
   1770 // We have 3 workers. Server1 will send a message to Server2 (which will block),
   1771 // during which it will dispatch a message comming from Client, at which point
   1772 // it will send another message to Server2. While sending that second message it
   1773 // will receive a reply from Server1 with the unblock flag.
   1774 
   1775 class ReentrantReplyServer1 : public Worker {
   1776  public:
   1777   ReentrantReplyServer1(WaitableEvent* server_ready,
   1778                         mojo::ScopedMessagePipeHandle channel_handle1,
   1779                         mojo::ScopedMessagePipeHandle channel_handle2)
   1780       : Worker(std::move(channel_handle1), Channel::MODE_SERVER),
   1781         server_ready_(server_ready),
   1782         other_channel_handle_(std::move(channel_handle2)) {}
   1783 
   1784   void Run() override {
   1785     server2_channel_ = SyncChannel::Create(
   1786         other_channel_handle_.release(), IPC::Channel::MODE_CLIENT, this,
   1787         ipc_thread().task_runner(), base::ThreadTaskRunnerHandle::Get(), true,
   1788         shutdown_event());
   1789     server_ready_->Signal();
   1790     Message* msg = new SyncChannelTestMsg_Reentrant1();
   1791     server2_channel_->Send(msg);
   1792     server2_channel_.reset();
   1793     Done();
   1794   }
   1795 
   1796  private:
   1797   bool OnMessageReceived(const Message& message) override {
   1798     IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer1, message)
   1799      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant2, OnReentrant2)
   1800      IPC_REPLY_HANDLER(OnReply)
   1801     IPC_END_MESSAGE_MAP()
   1802     return true;
   1803   }
   1804 
   1805   void OnReentrant2() {
   1806     Message* msg = new SyncChannelTestMsg_Reentrant3();
   1807     server2_channel_->Send(msg);
   1808   }
   1809 
   1810   void OnReply(const Message& message) {
   1811     // If we get here, the Send() will never receive the reply (thus would
   1812     // hang), so abort instead.
   1813     LOG(FATAL) << "Reply message was dispatched";
   1814   }
   1815 
   1816   WaitableEvent* server_ready_;
   1817   std::unique_ptr<SyncChannel> server2_channel_;
   1818   mojo::ScopedMessagePipeHandle other_channel_handle_;
   1819 };
   1820 
   1821 class ReentrantReplyServer2 : public Worker {
   1822  public:
   1823   ReentrantReplyServer2(mojo::ScopedMessagePipeHandle channel_handle)
   1824       : Worker(std::move(channel_handle), Channel::MODE_SERVER), reply_(NULL) {}
   1825 
   1826  private:
   1827   bool OnMessageReceived(const Message& message) override {
   1828     IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer2, message)
   1829      IPC_MESSAGE_HANDLER_DELAY_REPLY(
   1830          SyncChannelTestMsg_Reentrant1, OnReentrant1)
   1831      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant3, OnReentrant3)
   1832     IPC_END_MESSAGE_MAP()
   1833     return true;
   1834   }
   1835 
   1836   void OnReentrant1(Message* reply) {
   1837     DCHECK(!reply_);
   1838     reply_ = reply;
   1839   }
   1840 
   1841   void OnReentrant3() {
   1842     DCHECK(reply_);
   1843     Message* reply = reply_;
   1844     reply_ = NULL;
   1845     reply->set_unblock(true);
   1846     Send(reply);
   1847     Done();
   1848   }
   1849 
   1850   Message* reply_;
   1851 };
   1852 
   1853 class ReentrantReplyClient : public Worker {
   1854  public:
   1855   ReentrantReplyClient(WaitableEvent* server_ready,
   1856                        mojo::ScopedMessagePipeHandle channel_handle)
   1857       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
   1858         server_ready_(server_ready) {}
   1859 
   1860   void Run() override {
   1861     server_ready_->Wait();
   1862     Send(new SyncChannelTestMsg_Reentrant2());
   1863     Done();
   1864   }
   1865 
   1866  private:
   1867   WaitableEvent* server_ready_;
   1868 };
   1869 
   1870 TEST_F(IPCSyncChannelTest, ReentrantReply) {
   1871   std::vector<Worker*> workers;
   1872   WaitableEvent server_ready(base::WaitableEvent::ResetPolicy::AUTOMATIC,
   1873                              base::WaitableEvent::InitialState::NOT_SIGNALED);
   1874   mojo::MessagePipe pipe1, pipe2;
   1875   workers.push_back(new ReentrantReplyServer2(std::move(pipe2.handle0)));
   1876   workers.push_back(new ReentrantReplyServer1(
   1877       &server_ready, std::move(pipe1.handle0), std::move(pipe2.handle1)));
   1878   workers.push_back(
   1879       new ReentrantReplyClient(&server_ready, std::move(pipe1.handle1)));
   1880   RunTest(workers);
   1881 }
   1882 
   1883 }  // namespace
   1884 }  // namespace IPC
   1885