Home | History | Annotate | Download | only in ipc
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "ipc/ipc_sync_channel.h"
      6 
      7 #include <string>
      8 #include <vector>
      9 
     10 #include "base/basictypes.h"
     11 #include "base/bind.h"
     12 #include "base/logging.h"
     13 #include "base/memory/scoped_ptr.h"
     14 #include "base/message_loop/message_loop.h"
     15 #include "base/process/process_handle.h"
     16 #include "base/run_loop.h"
     17 #include "base/strings/string_util.h"
     18 #include "base/synchronization/waitable_event.h"
     19 #include "base/threading/platform_thread.h"
     20 #include "base/threading/thread.h"
     21 #include "ipc/ipc_listener.h"
     22 #include "ipc/ipc_message.h"
     23 #include "ipc/ipc_sender.h"
     24 #include "ipc/ipc_sync_message_filter.h"
     25 #include "ipc/ipc_sync_message_unittest.h"
     26 #include "testing/gtest/include/gtest/gtest.h"
     27 
     28 using base::WaitableEvent;
     29 
     30 namespace IPC {
     31 namespace {
     32 
     33 // Base class for a "process" with listener and IPC threads.
     34 class Worker : public Listener, public Sender {
     35  public:
     36   // Will create a channel without a name.
     37   Worker(Channel::Mode mode, const std::string& thread_name)
     38       : done_(new WaitableEvent(false, false)),
     39         channel_created_(new WaitableEvent(false, false)),
     40         mode_(mode),
     41         ipc_thread_((thread_name + "_ipc").c_str()),
     42         listener_thread_((thread_name + "_listener").c_str()),
     43         overrided_thread_(NULL),
     44         shutdown_event_(true, false),
     45         is_shutdown_(false) {
     46   }
     47 
     48   // Will create a named channel and use this name for the threads' name.
     49   Worker(const std::string& channel_name, Channel::Mode mode)
     50       : done_(new WaitableEvent(false, false)),
     51         channel_created_(new WaitableEvent(false, false)),
     52         channel_name_(channel_name),
     53         mode_(mode),
     54         ipc_thread_((channel_name + "_ipc").c_str()),
     55         listener_thread_((channel_name + "_listener").c_str()),
     56         overrided_thread_(NULL),
     57         shutdown_event_(true, false),
     58         is_shutdown_(false) {
     59   }
     60 
     61   virtual ~Worker() {
     62     // Shutdown() must be called before destruction.
     63     CHECK(is_shutdown_);
     64   }
     65   void AddRef() { }
     66   void Release() { }
     67   virtual bool Send(Message* msg) OVERRIDE { return channel_->Send(msg); }
     68   void WaitForChannelCreation() { channel_created_->Wait(); }
     69   void CloseChannel() {
     70     DCHECK(base::MessageLoop::current() == ListenerThread()->message_loop());
     71     channel_->Close();
     72   }
     73   void Start() {
     74     StartThread(&listener_thread_, base::MessageLoop::TYPE_DEFAULT);
     75     ListenerThread()->message_loop()->PostTask(
     76         FROM_HERE, base::Bind(&Worker::OnStart, this));
     77   }
     78   void Shutdown() {
     79     // The IPC thread needs to outlive SyncChannel. We can't do this in
     80     // ~Worker(), since that'll reset the vtable pointer (to Worker's), which
     81     // may result in a race conditions. See http://crbug.com/25841.
     82     WaitableEvent listener_done(false, false), ipc_done(false, false);
     83     ListenerThread()->message_loop()->PostTask(
     84         FROM_HERE, base::Bind(&Worker::OnListenerThreadShutdown1, this,
     85                               &listener_done, &ipc_done));
     86     listener_done.Wait();
     87     ipc_done.Wait();
     88     ipc_thread_.Stop();
     89     listener_thread_.Stop();
     90     is_shutdown_ = true;
     91   }
     92   void OverrideThread(base::Thread* overrided_thread) {
     93     DCHECK(overrided_thread_ == NULL);
     94     overrided_thread_ = overrided_thread;
     95   }
     96   bool SendAnswerToLife(bool pump, bool succeed) {
     97     int answer = 0;
     98     SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
     99     if (pump)
    100       msg->EnableMessagePumping();
    101     bool result = Send(msg);
    102     DCHECK_EQ(result, succeed);
    103     DCHECK_EQ(answer, (succeed ? 42 : 0));
    104     return result;
    105   }
    106   bool SendDouble(bool pump, bool succeed) {
    107     int answer = 0;
    108     SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer);
    109     if (pump)
    110       msg->EnableMessagePumping();
    111     bool result = Send(msg);
    112     DCHECK_EQ(result, succeed);
    113     DCHECK_EQ(answer, (succeed ? 10 : 0));
    114     return result;
    115   }
    116   const std::string& channel_name() { return channel_name_; }
    117   Channel::Mode mode() { return mode_; }
    118   WaitableEvent* done_event() { return done_.get(); }
    119   WaitableEvent* shutdown_event() { return &shutdown_event_; }
    120   void ResetChannel() { channel_.reset(); }
    121   // Derived classes need to call this when they've completed their part of
    122   // the test.
    123   void Done() { done_->Signal(); }
    124 
    125  protected:
    126   SyncChannel* channel() { return channel_.get(); }
    127   // Functions for dervied classes to implement if they wish.
    128   virtual void Run() { }
    129   virtual void OnAnswer(int* answer) { NOTREACHED(); }
    130   virtual void OnAnswerDelay(Message* reply_msg) {
    131     // The message handler map below can only take one entry for
    132     // SyncChannelTestMsg_AnswerToLife, so since some classes want
    133     // the normal version while other want the delayed reply, we
    134     // call the normal version if the derived class didn't override
    135     // this function.
    136     int answer;
    137     OnAnswer(&answer);
    138     SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, answer);
    139     Send(reply_msg);
    140   }
    141   virtual void OnDouble(int in, int* out) { NOTREACHED(); }
    142   virtual void OnDoubleDelay(int in, Message* reply_msg) {
    143     int result;
    144     OnDouble(in, &result);
    145     SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, result);
    146     Send(reply_msg);
    147   }
    148 
    149   virtual void OnNestedTestMsg(Message* reply_msg) {
    150     NOTREACHED();
    151   }
    152 
    153   virtual SyncChannel* CreateChannel() {
    154     scoped_ptr<SyncChannel> channel = SyncChannel::Create(
    155         channel_name_, mode_, this, ipc_thread_.message_loop_proxy().get(),
    156         true, &shutdown_event_);
    157     return channel.release();
    158   }
    159 
    160   base::Thread* ListenerThread() {
    161     return overrided_thread_ ? overrided_thread_ : &listener_thread_;
    162   }
    163 
    164   const base::Thread& ipc_thread() const { return ipc_thread_; }
    165 
    166  private:
    167   // Called on the listener thread to create the sync channel.
    168   void OnStart() {
    169     // Link ipc_thread_, listener_thread_ and channel_ altogether.
    170     StartThread(&ipc_thread_, base::MessageLoop::TYPE_IO);
    171     channel_.reset(CreateChannel());
    172     channel_created_->Signal();
    173     Run();
    174   }
    175 
    176   void OnListenerThreadShutdown1(WaitableEvent* listener_event,
    177                                  WaitableEvent* ipc_event) {
    178     // SyncChannel needs to be destructed on the thread that it was created on.
    179     channel_.reset();
    180 
    181     base::RunLoop().RunUntilIdle();
    182 
    183     ipc_thread_.message_loop()->PostTask(
    184         FROM_HERE, base::Bind(&Worker::OnIPCThreadShutdown, this,
    185                               listener_event, ipc_event));
    186   }
    187 
    188   void OnIPCThreadShutdown(WaitableEvent* listener_event,
    189                            WaitableEvent* ipc_event) {
    190     base::RunLoop().RunUntilIdle();
    191     ipc_event->Signal();
    192 
    193     listener_thread_.message_loop()->PostTask(
    194         FROM_HERE, base::Bind(&Worker::OnListenerThreadShutdown2, this,
    195                               listener_event));
    196   }
    197 
    198   void OnListenerThreadShutdown2(WaitableEvent* listener_event) {
    199     base::RunLoop().RunUntilIdle();
    200     listener_event->Signal();
    201   }
    202 
    203   virtual bool OnMessageReceived(const Message& message) OVERRIDE {
    204     IPC_BEGIN_MESSAGE_MAP(Worker, message)
    205      IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_Double, OnDoubleDelay)
    206      IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_AnswerToLife,
    207                                      OnAnswerDelay)
    208      IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelNestedTestMsg_String,
    209                                      OnNestedTestMsg)
    210     IPC_END_MESSAGE_MAP()
    211     return true;
    212   }
    213 
    214   void StartThread(base::Thread* thread, base::MessageLoop::Type type) {
    215     base::Thread::Options options;
    216     options.message_loop_type = type;
    217     thread->StartWithOptions(options);
    218   }
    219 
    220   scoped_ptr<WaitableEvent> done_;
    221   scoped_ptr<WaitableEvent> channel_created_;
    222   std::string channel_name_;
    223   Channel::Mode mode_;
    224   scoped_ptr<SyncChannel> channel_;
    225   base::Thread ipc_thread_;
    226   base::Thread listener_thread_;
    227   base::Thread* overrided_thread_;
    228 
    229   base::WaitableEvent shutdown_event_;
    230 
    231   bool is_shutdown_;
    232 
    233   DISALLOW_COPY_AND_ASSIGN(Worker);
    234 };
    235 
    236 
    237 // Starts the test with the given workers.  This function deletes the workers
    238 // when it's done.
    239 void RunTest(std::vector<Worker*> workers) {
    240   // First we create the workers that are channel servers, or else the other
    241   // workers' channel initialization might fail because the pipe isn't created..
    242   for (size_t i = 0; i < workers.size(); ++i) {
    243     if (workers[i]->mode() & Channel::MODE_SERVER_FLAG) {
    244       workers[i]->Start();
    245       workers[i]->WaitForChannelCreation();
    246     }
    247   }
    248 
    249   // now create the clients
    250   for (size_t i = 0; i < workers.size(); ++i) {
    251     if (workers[i]->mode() & Channel::MODE_CLIENT_FLAG)
    252       workers[i]->Start();
    253   }
    254 
    255   // wait for all the workers to finish
    256   for (size_t i = 0; i < workers.size(); ++i)
    257     workers[i]->done_event()->Wait();
    258 
    259   for (size_t i = 0; i < workers.size(); ++i) {
    260     workers[i]->Shutdown();
    261     delete workers[i];
    262   }
    263 }
    264 
    265 class IPCSyncChannelTest : public testing::Test {
    266  private:
    267   base::MessageLoop message_loop_;
    268 };
    269 
    270 //------------------------------------------------------------------------------
    271 
    272 class SimpleServer : public Worker {
    273  public:
    274   explicit SimpleServer(bool pump_during_send)
    275       : Worker(Channel::MODE_SERVER, "simpler_server"),
    276         pump_during_send_(pump_during_send) { }
    277   virtual void Run() OVERRIDE {
    278     SendAnswerToLife(pump_during_send_, true);
    279     Done();
    280   }
    281 
    282   bool pump_during_send_;
    283 };
    284 
    285 class SimpleClient : public Worker {
    286  public:
    287   SimpleClient() : Worker(Channel::MODE_CLIENT, "simple_client") { }
    288 
    289   virtual void OnAnswer(int* answer) OVERRIDE {
    290     *answer = 42;
    291     Done();
    292   }
    293 };
    294 
    295 void Simple(bool pump_during_send) {
    296   std::vector<Worker*> workers;
    297   workers.push_back(new SimpleServer(pump_during_send));
    298   workers.push_back(new SimpleClient());
    299   RunTest(workers);
    300 }
    301 
    302 // Tests basic synchronous call
    303 TEST_F(IPCSyncChannelTest, Simple) {
    304   Simple(false);
    305   Simple(true);
    306 }
    307 
    308 //------------------------------------------------------------------------------
    309 
    310 // Worker classes which override how the sync channel is created to use the
    311 // two-step initialization (calling the lightweight constructor and then
    312 // ChannelProxy::Init separately) process.
    313 class TwoStepServer : public Worker {
    314  public:
    315   explicit TwoStepServer(bool create_pipe_now)
    316       : Worker(Channel::MODE_SERVER, "simpler_server"),
    317         create_pipe_now_(create_pipe_now) { }
    318 
    319   virtual void Run() OVERRIDE {
    320     SendAnswerToLife(false, true);
    321     Done();
    322   }
    323 
    324   virtual SyncChannel* CreateChannel() OVERRIDE {
    325     SyncChannel* channel =
    326         SyncChannel::Create(channel_name(), mode(), this,
    327                             ipc_thread().message_loop_proxy().get(),
    328                             create_pipe_now_,
    329                             shutdown_event()).release();
    330     return channel;
    331   }
    332 
    333   bool create_pipe_now_;
    334 };
    335 
    336 class TwoStepClient : public Worker {
    337  public:
    338   TwoStepClient(bool create_pipe_now)
    339       : Worker(Channel::MODE_CLIENT, "simple_client"),
    340         create_pipe_now_(create_pipe_now) { }
    341 
    342   virtual void OnAnswer(int* answer) OVERRIDE {
    343     *answer = 42;
    344     Done();
    345   }
    346 
    347   virtual SyncChannel* CreateChannel() OVERRIDE {
    348     SyncChannel* channel =
    349         SyncChannel::Create(channel_name(), mode(), this,
    350                             ipc_thread().message_loop_proxy().get(),
    351                             create_pipe_now_,
    352                             shutdown_event()).release();
    353     return channel;
    354   }
    355 
    356   bool create_pipe_now_;
    357 };
    358 
    359 void TwoStep(bool create_server_pipe_now, bool create_client_pipe_now) {
    360   std::vector<Worker*> workers;
    361   workers.push_back(new TwoStepServer(create_server_pipe_now));
    362   workers.push_back(new TwoStepClient(create_client_pipe_now));
    363   RunTest(workers);
    364 }
    365 
    366 // Tests basic two-step initialization, where you call the lightweight
    367 // constructor then Init.
    368 TEST_F(IPCSyncChannelTest, TwoStepInitialization) {
    369   TwoStep(false, false);
    370   TwoStep(false, true);
    371   TwoStep(true, false);
    372   TwoStep(true, true);
    373 }
    374 
    375 //------------------------------------------------------------------------------
    376 
    377 class DelayClient : public Worker {
    378  public:
    379   DelayClient() : Worker(Channel::MODE_CLIENT, "delay_client") { }
    380 
    381   virtual void OnAnswerDelay(Message* reply_msg) OVERRIDE {
    382     SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
    383     Send(reply_msg);
    384     Done();
    385   }
    386 };
    387 
    388 void DelayReply(bool pump_during_send) {
    389   std::vector<Worker*> workers;
    390   workers.push_back(new SimpleServer(pump_during_send));
    391   workers.push_back(new DelayClient());
    392   RunTest(workers);
    393 }
    394 
    395 // Tests that asynchronous replies work
    396 TEST_F(IPCSyncChannelTest, DelayReply) {
    397   DelayReply(false);
    398   DelayReply(true);
    399 }
    400 
    401 //------------------------------------------------------------------------------
    402 
    403 class NoHangServer : public Worker {
    404  public:
    405   NoHangServer(WaitableEvent* got_first_reply, bool pump_during_send)
    406       : Worker(Channel::MODE_SERVER, "no_hang_server"),
    407         got_first_reply_(got_first_reply),
    408         pump_during_send_(pump_during_send) { }
    409   virtual void Run() OVERRIDE {
    410     SendAnswerToLife(pump_during_send_, true);
    411     got_first_reply_->Signal();
    412 
    413     SendAnswerToLife(pump_during_send_, false);
    414     Done();
    415   }
    416 
    417   WaitableEvent* got_first_reply_;
    418   bool pump_during_send_;
    419 };
    420 
    421 class NoHangClient : public Worker {
    422  public:
    423   explicit NoHangClient(WaitableEvent* got_first_reply)
    424     : Worker(Channel::MODE_CLIENT, "no_hang_client"),
    425       got_first_reply_(got_first_reply) { }
    426 
    427   virtual void OnAnswerDelay(Message* reply_msg) OVERRIDE {
    428     // Use the DELAY_REPLY macro so that we can force the reply to be sent
    429     // before this function returns (when the channel will be reset).
    430     SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
    431     Send(reply_msg);
    432     got_first_reply_->Wait();
    433     CloseChannel();
    434     Done();
    435   }
    436 
    437   WaitableEvent* got_first_reply_;
    438 };
    439 
    440 void NoHang(bool pump_during_send) {
    441   WaitableEvent got_first_reply(false, false);
    442   std::vector<Worker*> workers;
    443   workers.push_back(new NoHangServer(&got_first_reply, pump_during_send));
    444   workers.push_back(new NoHangClient(&got_first_reply));
    445   RunTest(workers);
    446 }
    447 
    448 // Tests that caller doesn't hang if receiver dies
    449 TEST_F(IPCSyncChannelTest, NoHang) {
    450   NoHang(false);
    451   NoHang(true);
    452 }
    453 
    454 //------------------------------------------------------------------------------
    455 
    456 class UnblockServer : public Worker {
    457  public:
    458   UnblockServer(bool pump_during_send, bool delete_during_send)
    459     : Worker(Channel::MODE_SERVER, "unblock_server"),
    460       pump_during_send_(pump_during_send),
    461       delete_during_send_(delete_during_send) { }
    462   virtual void Run() OVERRIDE {
    463     if (delete_during_send_) {
    464       // Use custom code since race conditions mean the answer may or may not be
    465       // available.
    466       int answer = 0;
    467       SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
    468       if (pump_during_send_)
    469         msg->EnableMessagePumping();
    470       Send(msg);
    471     } else {
    472       SendAnswerToLife(pump_during_send_, true);
    473     }
    474     Done();
    475   }
    476 
    477   virtual void OnDoubleDelay(int in, Message* reply_msg) OVERRIDE {
    478     SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2);
    479     Send(reply_msg);
    480     if (delete_during_send_)
    481       ResetChannel();
    482   }
    483 
    484   bool pump_during_send_;
    485   bool delete_during_send_;
    486 };
    487 
    488 class UnblockClient : public Worker {
    489  public:
    490   explicit UnblockClient(bool pump_during_send)
    491     : Worker(Channel::MODE_CLIENT, "unblock_client"),
    492       pump_during_send_(pump_during_send) { }
    493 
    494   virtual void OnAnswer(int* answer) OVERRIDE {
    495     SendDouble(pump_during_send_, true);
    496     *answer = 42;
    497     Done();
    498   }
    499 
    500   bool pump_during_send_;
    501 };
    502 
    503 void Unblock(bool server_pump, bool client_pump, bool delete_during_send) {
    504   std::vector<Worker*> workers;
    505   workers.push_back(new UnblockServer(server_pump, delete_during_send));
    506   workers.push_back(new UnblockClient(client_pump));
    507   RunTest(workers);
    508 }
    509 
    510 // Tests that the caller unblocks to answer a sync message from the receiver.
    511 TEST_F(IPCSyncChannelTest, Unblock) {
    512   Unblock(false, false, false);
    513   Unblock(false, true, false);
    514   Unblock(true, false, false);
    515   Unblock(true, true, false);
    516 }
    517 
    518 //------------------------------------------------------------------------------
    519 
    520 // Tests that the the SyncChannel object can be deleted during a Send.
    521 TEST_F(IPCSyncChannelTest, ChannelDeleteDuringSend) {
    522   Unblock(false, false, true);
    523   Unblock(false, true, true);
    524   Unblock(true, false, true);
    525   Unblock(true, true, true);
    526 }
    527 
    528 //------------------------------------------------------------------------------
    529 
    530 class RecursiveServer : public Worker {
    531  public:
    532   RecursiveServer(bool expected_send_result, bool pump_first, bool pump_second)
    533       : Worker(Channel::MODE_SERVER, "recursive_server"),
    534         expected_send_result_(expected_send_result),
    535         pump_first_(pump_first), pump_second_(pump_second) {}
    536   virtual void Run() OVERRIDE {
    537     SendDouble(pump_first_, expected_send_result_);
    538     Done();
    539   }
    540 
    541   virtual void OnDouble(int in, int* out) OVERRIDE {
    542     *out = in * 2;
    543     SendAnswerToLife(pump_second_, expected_send_result_);
    544   }
    545 
    546   bool expected_send_result_, pump_first_, pump_second_;
    547 };
    548 
    549 class RecursiveClient : public Worker {
    550  public:
    551   RecursiveClient(bool pump_during_send, bool close_channel)
    552       : Worker(Channel::MODE_CLIENT, "recursive_client"),
    553         pump_during_send_(pump_during_send), close_channel_(close_channel) {}
    554 
    555   virtual void OnDoubleDelay(int in, Message* reply_msg) OVERRIDE {
    556     SendDouble(pump_during_send_, !close_channel_);
    557     if (close_channel_) {
    558       delete reply_msg;
    559     } else {
    560       SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2);
    561       Send(reply_msg);
    562     }
    563     Done();
    564   }
    565 
    566   virtual void OnAnswerDelay(Message* reply_msg) OVERRIDE {
    567     if (close_channel_) {
    568       delete reply_msg;
    569       CloseChannel();
    570     } else {
    571       SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
    572       Send(reply_msg);
    573     }
    574   }
    575 
    576   bool pump_during_send_, close_channel_;
    577 };
    578 
    579 void Recursive(
    580     bool server_pump_first, bool server_pump_second, bool client_pump) {
    581   std::vector<Worker*> workers;
    582   workers.push_back(
    583       new RecursiveServer(true, server_pump_first, server_pump_second));
    584   workers.push_back(new RecursiveClient(client_pump, false));
    585   RunTest(workers);
    586 }
    587 
    588 // Tests a server calling Send while another Send is pending.
    589 TEST_F(IPCSyncChannelTest, Recursive) {
    590   Recursive(false, false, false);
    591   Recursive(false, false, true);
    592   Recursive(false, true, false);
    593   Recursive(false, true, true);
    594   Recursive(true, false, false);
    595   Recursive(true, false, true);
    596   Recursive(true, true, false);
    597   Recursive(true, true, true);
    598 }
    599 
    600 //------------------------------------------------------------------------------
    601 
    602 void RecursiveNoHang(
    603     bool server_pump_first, bool server_pump_second, bool client_pump) {
    604   std::vector<Worker*> workers;
    605   workers.push_back(
    606       new RecursiveServer(false, server_pump_first, server_pump_second));
    607   workers.push_back(new RecursiveClient(client_pump, true));
    608   RunTest(workers);
    609 }
    610 
    611 // Tests that if a caller makes a sync call during an existing sync call and
    612 // the receiver dies, neither of the Send() calls hang.
    613 TEST_F(IPCSyncChannelTest, RecursiveNoHang) {
    614   RecursiveNoHang(false, false, false);
    615   RecursiveNoHang(false, false, true);
    616   RecursiveNoHang(false, true, false);
    617   RecursiveNoHang(false, true, true);
    618   RecursiveNoHang(true, false, false);
    619   RecursiveNoHang(true, false, true);
    620   RecursiveNoHang(true, true, false);
    621   RecursiveNoHang(true, true, true);
    622 }
    623 
    624 //------------------------------------------------------------------------------
    625 
    626 class MultipleServer1 : public Worker {
    627  public:
    628   explicit MultipleServer1(bool pump_during_send)
    629     : Worker("test_channel1", Channel::MODE_SERVER),
    630       pump_during_send_(pump_during_send) { }
    631 
    632   virtual void Run() OVERRIDE {
    633     SendDouble(pump_during_send_, true);
    634     Done();
    635   }
    636 
    637   bool pump_during_send_;
    638 };
    639 
    640 class MultipleClient1 : public Worker {
    641  public:
    642   MultipleClient1(WaitableEvent* client1_msg_received,
    643                   WaitableEvent* client1_can_reply) :
    644       Worker("test_channel1", Channel::MODE_CLIENT),
    645       client1_msg_received_(client1_msg_received),
    646       client1_can_reply_(client1_can_reply) { }
    647 
    648   virtual void OnDouble(int in, int* out) OVERRIDE {
    649     client1_msg_received_->Signal();
    650     *out = in * 2;
    651     client1_can_reply_->Wait();
    652     Done();
    653   }
    654 
    655  private:
    656   WaitableEvent *client1_msg_received_, *client1_can_reply_;
    657 };
    658 
    659 class MultipleServer2 : public Worker {
    660  public:
    661   MultipleServer2() : Worker("test_channel2", Channel::MODE_SERVER) { }
    662 
    663   virtual void OnAnswer(int* result) OVERRIDE {
    664     *result = 42;
    665     Done();
    666   }
    667 };
    668 
    669 class MultipleClient2 : public Worker {
    670  public:
    671   MultipleClient2(
    672     WaitableEvent* client1_msg_received, WaitableEvent* client1_can_reply,
    673     bool pump_during_send)
    674     : Worker("test_channel2", Channel::MODE_CLIENT),
    675       client1_msg_received_(client1_msg_received),
    676       client1_can_reply_(client1_can_reply),
    677       pump_during_send_(pump_during_send) { }
    678 
    679   virtual void Run() OVERRIDE {
    680     client1_msg_received_->Wait();
    681     SendAnswerToLife(pump_during_send_, true);
    682     client1_can_reply_->Signal();
    683     Done();
    684   }
    685 
    686  private:
    687   WaitableEvent *client1_msg_received_, *client1_can_reply_;
    688   bool pump_during_send_;
    689 };
    690 
    691 void Multiple(bool server_pump, bool client_pump) {
    692   std::vector<Worker*> workers;
    693 
    694   // A shared worker thread so that server1 and server2 run on one thread.
    695   base::Thread worker_thread("Multiple");
    696   ASSERT_TRUE(worker_thread.Start());
    697 
    698   // Server1 sends a sync msg to client1, which blocks the reply until
    699   // server2 (which runs on the same worker thread as server1) responds
    700   // to a sync msg from client2.
    701   WaitableEvent client1_msg_received(false, false);
    702   WaitableEvent client1_can_reply(false, false);
    703 
    704   Worker* worker;
    705 
    706   worker = new MultipleServer2();
    707   worker->OverrideThread(&worker_thread);
    708   workers.push_back(worker);
    709 
    710   worker = new MultipleClient2(
    711       &client1_msg_received, &client1_can_reply, client_pump);
    712   workers.push_back(worker);
    713 
    714   worker = new MultipleServer1(server_pump);
    715   worker->OverrideThread(&worker_thread);
    716   workers.push_back(worker);
    717 
    718   worker = new MultipleClient1(
    719       &client1_msg_received, &client1_can_reply);
    720   workers.push_back(worker);
    721 
    722   RunTest(workers);
    723 }
    724 
    725 // Tests that multiple SyncObjects on the same listener thread can unblock each
    726 // other.
    727 TEST_F(IPCSyncChannelTest, Multiple) {
    728   Multiple(false, false);
    729   Multiple(false, true);
    730   Multiple(true, false);
    731   Multiple(true, true);
    732 }
    733 
    734 //------------------------------------------------------------------------------
    735 
    736 // This class provides server side functionality to test the case where
    737 // multiple sync channels are in use on the same thread on the client and
    738 // nested calls are issued.
    739 class QueuedReplyServer : public Worker {
    740  public:
    741   QueuedReplyServer(base::Thread* listener_thread,
    742                     const std::string& channel_name,
    743                     const std::string& reply_text)
    744       : Worker(channel_name, Channel::MODE_SERVER),
    745         reply_text_(reply_text) {
    746     Worker::OverrideThread(listener_thread);
    747   }
    748 
    749   virtual void OnNestedTestMsg(Message* reply_msg) OVERRIDE {
    750     VLOG(1) << __FUNCTION__ << " Sending reply: " << reply_text_;
    751     SyncChannelNestedTestMsg_String::WriteReplyParams(reply_msg, reply_text_);
    752     Send(reply_msg);
    753     Done();
    754   }
    755 
    756  private:
    757   std::string reply_text_;
    758 };
    759 
    760 // The QueuedReplyClient class provides functionality to test the case where
    761 // multiple sync channels are in use on the same thread and they make nested
    762 // sync calls, i.e. while the first channel waits for a response it makes a
    763 // sync call on another channel.
    764 // The callstack should unwind correctly, i.e. the outermost call should
    765 // complete first, and so on.
    766 class QueuedReplyClient : public Worker {
    767  public:
    768   QueuedReplyClient(base::Thread* listener_thread,
    769                     const std::string& channel_name,
    770                     const std::string& expected_text,
    771                     bool pump_during_send)
    772       : Worker(channel_name, Channel::MODE_CLIENT),
    773         pump_during_send_(pump_during_send),
    774         expected_text_(expected_text) {
    775     Worker::OverrideThread(listener_thread);
    776   }
    777 
    778   virtual void Run() OVERRIDE {
    779     std::string response;
    780     SyncMessage* msg = new SyncChannelNestedTestMsg_String(&response);
    781     if (pump_during_send_)
    782       msg->EnableMessagePumping();
    783     bool result = Send(msg);
    784     DCHECK(result);
    785     DCHECK_EQ(response, expected_text_);
    786 
    787     VLOG(1) << __FUNCTION__ << " Received reply: " << response;
    788     Done();
    789   }
    790 
    791  private:
    792   bool pump_during_send_;
    793   std::string expected_text_;
    794 };
    795 
    796 void QueuedReply(bool client_pump) {
    797   std::vector<Worker*> workers;
    798 
    799   // A shared worker thread for servers
    800   base::Thread server_worker_thread("QueuedReply_ServerListener");
    801   ASSERT_TRUE(server_worker_thread.Start());
    802 
    803   base::Thread client_worker_thread("QueuedReply_ClientListener");
    804   ASSERT_TRUE(client_worker_thread.Start());
    805 
    806   Worker* worker;
    807 
    808   worker = new QueuedReplyServer(&server_worker_thread,
    809                                  "QueuedReply_Server1",
    810                                  "Got first message");
    811   workers.push_back(worker);
    812 
    813   worker = new QueuedReplyServer(&server_worker_thread,
    814                                  "QueuedReply_Server2",
    815                                  "Got second message");
    816   workers.push_back(worker);
    817 
    818   worker = new QueuedReplyClient(&client_worker_thread,
    819                                  "QueuedReply_Server1",
    820                                  "Got first message",
    821                                  client_pump);
    822   workers.push_back(worker);
    823 
    824   worker = new QueuedReplyClient(&client_worker_thread,
    825                                  "QueuedReply_Server2",
    826                                  "Got second message",
    827                                  client_pump);
    828   workers.push_back(worker);
    829 
    830   RunTest(workers);
    831 }
    832 
    833 // While a blocking send is in progress, the listener thread might answer other
    834 // synchronous messages.  This tests that if during the response to another
    835 // message the reply to the original messages comes, it is queued up correctly
    836 // and the original Send is unblocked later.
    837 // We also test that the send call stacks unwind correctly when the channel
    838 // pumps messages while waiting for a response.
    839 TEST_F(IPCSyncChannelTest, QueuedReply) {
    840   QueuedReply(false);
    841   QueuedReply(true);
    842 }
    843 
    844 //------------------------------------------------------------------------------
    845 
    846 class ChattyClient : public Worker {
    847  public:
    848   ChattyClient() :
    849       Worker(Channel::MODE_CLIENT, "chatty_client") { }
    850 
    851   virtual void OnAnswer(int* answer) OVERRIDE {
    852     // The PostMessage limit is 10k.  Send 20% more than that.
    853     const int kMessageLimit = 10000;
    854     const int kMessagesToSend = kMessageLimit * 120 / 100;
    855     for (int i = 0; i < kMessagesToSend; ++i) {
    856       if (!SendDouble(false, true))
    857         break;
    858     }
    859     *answer = 42;
    860     Done();
    861   }
    862 };
    863 
    864 void ChattyServer(bool pump_during_send) {
    865   std::vector<Worker*> workers;
    866   workers.push_back(new UnblockServer(pump_during_send, false));
    867   workers.push_back(new ChattyClient());
    868   RunTest(workers);
    869 }
    870 
    871 // Tests http://b/1093251 - that sending lots of sync messages while
    872 // the receiver is waiting for a sync reply does not overflow the PostMessage
    873 // queue.
    874 TEST_F(IPCSyncChannelTest, ChattyServer) {
    875   ChattyServer(false);
    876   ChattyServer(true);
    877 }
    878 
    879 //------------------------------------------------------------------------------
    880 
    881 void NestedCallback(Worker* server) {
    882   // Sleep a bit so that we wake up after the reply has been received.
    883   base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(250));
    884   server->SendAnswerToLife(true, true);
    885 }
    886 
    887 bool timeout_occurred = false;
    888 
    889 void TimeoutCallback() {
    890   timeout_occurred = true;
    891 }
    892 
    893 class DoneEventRaceServer : public Worker {
    894  public:
    895   DoneEventRaceServer()
    896       : Worker(Channel::MODE_SERVER, "done_event_race_server") { }
    897 
    898   virtual void Run() OVERRIDE {
    899     base::MessageLoop::current()->PostTask(FROM_HERE,
    900                                            base::Bind(&NestedCallback, this));
    901     base::MessageLoop::current()->PostDelayedTask(
    902         FROM_HERE,
    903         base::Bind(&TimeoutCallback),
    904         base::TimeDelta::FromSeconds(9));
    905     // Even though we have a timeout on the Send, it will succeed since for this
    906     // bug, the reply message comes back and is deserialized, however the done
    907     // event wasn't set.  So we indirectly use the timeout task to notice if a
    908     // timeout occurred.
    909     SendAnswerToLife(true, true);
    910     DCHECK(!timeout_occurred);
    911     Done();
    912   }
    913 };
    914 
    915 // Tests http://b/1474092 - that if after the done_event is set but before
    916 // OnObjectSignaled is called another message is sent out, then after its
    917 // reply comes back OnObjectSignaled will be called for the first message.
    918 TEST_F(IPCSyncChannelTest, DoneEventRace) {
    919   std::vector<Worker*> workers;
    920   workers.push_back(new DoneEventRaceServer());
    921   workers.push_back(new SimpleClient());
    922   RunTest(workers);
    923 }
    924 
    925 //------------------------------------------------------------------------------
    926 
    927 class TestSyncMessageFilter : public SyncMessageFilter {
    928  public:
    929   TestSyncMessageFilter(base::WaitableEvent* shutdown_event,
    930                         Worker* worker,
    931                         scoped_refptr<base::MessageLoopProxy> message_loop)
    932       : SyncMessageFilter(shutdown_event),
    933         worker_(worker),
    934         message_loop_(message_loop) {
    935   }
    936 
    937   virtual void OnFilterAdded(Sender* sender) OVERRIDE {
    938     SyncMessageFilter::OnFilterAdded(sender);
    939     message_loop_->PostTask(
    940         FROM_HERE,
    941         base::Bind(&TestSyncMessageFilter::SendMessageOnHelperThread, this));
    942   }
    943 
    944   void SendMessageOnHelperThread() {
    945     int answer = 0;
    946     bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
    947     DCHECK(result);
    948     DCHECK_EQ(answer, 42);
    949 
    950     worker_->Done();
    951   }
    952 
    953  private:
    954   virtual ~TestSyncMessageFilter() {}
    955 
    956   Worker* worker_;
    957   scoped_refptr<base::MessageLoopProxy> message_loop_;
    958 };
    959 
    960 class SyncMessageFilterServer : public Worker {
    961  public:
    962   SyncMessageFilterServer()
    963       : Worker(Channel::MODE_SERVER, "sync_message_filter_server"),
    964         thread_("helper_thread") {
    965     base::Thread::Options options;
    966     options.message_loop_type = base::MessageLoop::TYPE_DEFAULT;
    967     thread_.StartWithOptions(options);
    968     filter_ = new TestSyncMessageFilter(shutdown_event(), this,
    969                                         thread_.message_loop_proxy());
    970   }
    971 
    972   virtual void Run() OVERRIDE {
    973     channel()->AddFilter(filter_.get());
    974   }
    975 
    976   base::Thread thread_;
    977   scoped_refptr<TestSyncMessageFilter> filter_;
    978 };
    979 
    980 // This class provides functionality to test the case that a Send on the sync
    981 // channel does not crash after the channel has been closed.
    982 class ServerSendAfterClose : public Worker {
    983  public:
    984   ServerSendAfterClose()
    985      : Worker(Channel::MODE_SERVER, "simpler_server"),
    986        send_result_(true) {
    987   }
    988 
    989   bool SendDummy() {
    990     ListenerThread()->message_loop()->PostTask(
    991         FROM_HERE, base::Bind(base::IgnoreResult(&ServerSendAfterClose::Send),
    992                               this, new SyncChannelTestMsg_NoArgs));
    993     return true;
    994   }
    995 
    996   bool send_result() const {
    997     return send_result_;
    998   }
    999 
   1000  private:
   1001   virtual void Run() OVERRIDE {
   1002     CloseChannel();
   1003     Done();
   1004   }
   1005 
   1006   virtual bool Send(Message* msg) OVERRIDE {
   1007     send_result_ = Worker::Send(msg);
   1008     Done();
   1009     return send_result_;
   1010   }
   1011 
   1012   bool send_result_;
   1013 };
   1014 
   1015 // Tests basic synchronous call
   1016 TEST_F(IPCSyncChannelTest, SyncMessageFilter) {
   1017   std::vector<Worker*> workers;
   1018   workers.push_back(new SyncMessageFilterServer());
   1019   workers.push_back(new SimpleClient());
   1020   RunTest(workers);
   1021 }
   1022 
   1023 // Test the case when the channel is closed and a Send is attempted after that.
   1024 TEST_F(IPCSyncChannelTest, SendAfterClose) {
   1025   ServerSendAfterClose server;
   1026   server.Start();
   1027 
   1028   server.done_event()->Wait();
   1029   server.done_event()->Reset();
   1030 
   1031   server.SendDummy();
   1032   server.done_event()->Wait();
   1033 
   1034   EXPECT_FALSE(server.send_result());
   1035 
   1036   server.Shutdown();
   1037 }
   1038 
   1039 //------------------------------------------------------------------------------
   1040 
   1041 class RestrictedDispatchServer : public Worker {
   1042  public:
   1043   RestrictedDispatchServer(WaitableEvent* sent_ping_event,
   1044                            WaitableEvent* wait_event)
   1045       : Worker("restricted_channel", Channel::MODE_SERVER),
   1046         sent_ping_event_(sent_ping_event),
   1047         wait_event_(wait_event) { }
   1048 
   1049   void OnDoPing(int ping) {
   1050     // Send an asynchronous message that unblocks the caller.
   1051     Message* msg = new SyncChannelTestMsg_Ping(ping);
   1052     msg->set_unblock(true);
   1053     Send(msg);
   1054     // Signal the event after the message has been sent on the channel, on the
   1055     // IPC thread.
   1056     ipc_thread().message_loop()->PostTask(
   1057         FROM_HERE, base::Bind(&RestrictedDispatchServer::OnPingSent, this));
   1058   }
   1059 
   1060   void OnPingTTL(int ping, int* out) {
   1061     *out = ping;
   1062     wait_event_->Wait();
   1063   }
   1064 
   1065   base::Thread* ListenerThread() { return Worker::ListenerThread(); }
   1066 
   1067  private:
   1068   virtual bool OnMessageReceived(const Message& message) OVERRIDE {
   1069     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchServer, message)
   1070      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
   1071      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL)
   1072      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
   1073     IPC_END_MESSAGE_MAP()
   1074     return true;
   1075   }
   1076 
   1077   void OnPingSent() {
   1078     sent_ping_event_->Signal();
   1079   }
   1080 
   1081   void OnNoArgs() { }
   1082   WaitableEvent* sent_ping_event_;
   1083   WaitableEvent* wait_event_;
   1084 };
   1085 
   1086 class NonRestrictedDispatchServer : public Worker {
   1087  public:
   1088   NonRestrictedDispatchServer(WaitableEvent* signal_event)
   1089       : Worker("non_restricted_channel", Channel::MODE_SERVER),
   1090         signal_event_(signal_event) {}
   1091 
   1092   base::Thread* ListenerThread() { return Worker::ListenerThread(); }
   1093 
   1094   void OnDoPingTTL(int ping) {
   1095     int value = 0;
   1096     Send(new SyncChannelTestMsg_PingTTL(ping, &value));
   1097     signal_event_->Signal();
   1098   }
   1099 
   1100  private:
   1101   virtual bool OnMessageReceived(const Message& message) OVERRIDE {
   1102     IPC_BEGIN_MESSAGE_MAP(NonRestrictedDispatchServer, message)
   1103      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
   1104      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
   1105     IPC_END_MESSAGE_MAP()
   1106     return true;
   1107   }
   1108 
   1109   void OnNoArgs() { }
   1110   WaitableEvent* signal_event_;
   1111 };
   1112 
   1113 class RestrictedDispatchClient : public Worker {
   1114  public:
   1115   RestrictedDispatchClient(WaitableEvent* sent_ping_event,
   1116                            RestrictedDispatchServer* server,
   1117                            NonRestrictedDispatchServer* server2,
   1118                            int* success)
   1119       : Worker("restricted_channel", Channel::MODE_CLIENT),
   1120         ping_(0),
   1121         server_(server),
   1122         server2_(server2),
   1123         success_(success),
   1124         sent_ping_event_(sent_ping_event) {}
   1125 
   1126   virtual void Run() OVERRIDE {
   1127     // Incoming messages from our channel should only be dispatched when we
   1128     // send a message on that same channel.
   1129     channel()->SetRestrictDispatchChannelGroup(1);
   1130 
   1131     server_->ListenerThread()->message_loop()->PostTask(
   1132         FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing, server_, 1));
   1133     sent_ping_event_->Wait();
   1134     Send(new SyncChannelTestMsg_NoArgs);
   1135     if (ping_ == 1)
   1136       ++*success_;
   1137     else
   1138       LOG(ERROR) << "Send failed to dispatch incoming message on same channel";
   1139 
   1140     non_restricted_channel_ =
   1141         SyncChannel::Create("non_restricted_channel",
   1142                             IPC::Channel::MODE_CLIENT,
   1143                             this,
   1144                             ipc_thread().message_loop_proxy().get(),
   1145                             true,
   1146                             shutdown_event());
   1147 
   1148     server_->ListenerThread()->message_loop()->PostTask(
   1149         FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing, server_, 2));
   1150     sent_ping_event_->Wait();
   1151     // Check that the incoming message is *not* dispatched when sending on the
   1152     // non restricted channel.
   1153     // TODO(piman): there is a possibility of a false positive race condition
   1154     // here, if the message that was posted on the server-side end of the pipe
   1155     // is not visible yet on the client side, but I don't know how to solve this
   1156     // without hooking into the internals of SyncChannel. I haven't seen it in
   1157     // practice (i.e. not setting SetRestrictDispatchToSameChannel does cause
   1158     // the following to fail).
   1159     non_restricted_channel_->Send(new SyncChannelTestMsg_NoArgs);
   1160     if (ping_ == 1)
   1161       ++*success_;
   1162     else
   1163       LOG(ERROR) << "Send dispatched message from restricted channel";
   1164 
   1165     Send(new SyncChannelTestMsg_NoArgs);
   1166     if (ping_ == 2)
   1167       ++*success_;
   1168     else
   1169       LOG(ERROR) << "Send failed to dispatch incoming message on same channel";
   1170 
   1171     // Check that the incoming message on the non-restricted channel is
   1172     // dispatched when sending on the restricted channel.
   1173     server2_->ListenerThread()->message_loop()->PostTask(
   1174         FROM_HERE,
   1175         base::Bind(&NonRestrictedDispatchServer::OnDoPingTTL, server2_, 3));
   1176     int value = 0;
   1177     Send(new SyncChannelTestMsg_PingTTL(4, &value));
   1178     if (ping_ == 3 && value == 4)
   1179       ++*success_;
   1180     else
   1181       LOG(ERROR) << "Send failed to dispatch message from unrestricted channel";
   1182 
   1183     non_restricted_channel_->Send(new SyncChannelTestMsg_Done);
   1184     non_restricted_channel_.reset();
   1185     Send(new SyncChannelTestMsg_Done);
   1186     Done();
   1187   }
   1188 
   1189  private:
   1190   virtual bool OnMessageReceived(const Message& message) OVERRIDE {
   1191     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchClient, message)
   1192      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Ping, OnPing)
   1193      IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_PingTTL, OnPingTTL)
   1194     IPC_END_MESSAGE_MAP()
   1195     return true;
   1196   }
   1197 
   1198   void OnPing(int ping) {
   1199     ping_ = ping;
   1200   }
   1201 
   1202   void OnPingTTL(int ping, IPC::Message* reply) {
   1203     ping_ = ping;
   1204     // This message comes from the NonRestrictedDispatchServer, we have to send
   1205     // the reply back manually.
   1206     SyncChannelTestMsg_PingTTL::WriteReplyParams(reply, ping);
   1207     non_restricted_channel_->Send(reply);
   1208   }
   1209 
   1210   int ping_;
   1211   RestrictedDispatchServer* server_;
   1212   NonRestrictedDispatchServer* server2_;
   1213   int* success_;
   1214   WaitableEvent* sent_ping_event_;
   1215   scoped_ptr<SyncChannel> non_restricted_channel_;
   1216 };
   1217 
   1218 TEST_F(IPCSyncChannelTest, RestrictedDispatch) {
   1219   WaitableEvent sent_ping_event(false, false);
   1220   WaitableEvent wait_event(false, false);
   1221   RestrictedDispatchServer* server =
   1222       new RestrictedDispatchServer(&sent_ping_event, &wait_event);
   1223   NonRestrictedDispatchServer* server2 =
   1224       new NonRestrictedDispatchServer(&wait_event);
   1225 
   1226   int success = 0;
   1227   std::vector<Worker*> workers;
   1228   workers.push_back(server);
   1229   workers.push_back(server2);
   1230   workers.push_back(new RestrictedDispatchClient(
   1231       &sent_ping_event, server, server2, &success));
   1232   RunTest(workers);
   1233   EXPECT_EQ(4, success);
   1234 }
   1235 
   1236 //------------------------------------------------------------------------------
   1237 
   1238 // This test case inspired by crbug.com/108491
   1239 // We create two servers that use the same ListenerThread but have
   1240 // SetRestrictDispatchToSameChannel set to true.
   1241 // We create clients, then use some specific WaitableEvent wait/signalling to
   1242 // ensure that messages get dispatched in a way that causes a deadlock due to
   1243 // a nested dispatch and an eligible message in a higher-level dispatch's
   1244 // delayed_queue. Specifically, we start with client1 about so send an
   1245 // unblocking message to server1, while the shared listener thread for the
   1246 // servers server1 and server2 is about to send a non-unblocking message to
   1247 // client1. At the same time, client2 will be about to send an unblocking
   1248 // message to server2. Server1 will handle the client1->server1 message by
   1249 // telling server2 to send a non-unblocking message to client2.
   1250 // What should happen is that the send to server2 should find the pending,
   1251 // same-context client2->server2 message to dispatch, causing client2 to
   1252 // unblock then handle the server2->client2 message, so that the shared
   1253 // servers' listener thread can then respond to the client1->server1 message.
   1254 // Then client1 can handle the non-unblocking server1->client1 message.
   1255 // The old code would end up in a state where the server2->client2 message is
   1256 // sent, but the client2->server2 message (which is eligible for dispatch, and
   1257 // which is what client2 is waiting for) is stashed in a local delayed_queue
   1258 // that has server1's channel context, causing a deadlock.
   1259 // WaitableEvents in the events array are used to:
   1260 //   event 0: indicate to client1 that server listener is in OnDoServerTask
   1261 //   event 1: indicate to client1 that client2 listener is in OnDoClient2Task
   1262 //   event 2: indicate to server1 that client2 listener is in OnDoClient2Task
   1263 //   event 3: indicate to client2 that server listener is in OnDoServerTask
   1264 
   1265 class RestrictedDispatchDeadlockServer : public Worker {
   1266  public:
   1267   RestrictedDispatchDeadlockServer(int server_num,
   1268                                    WaitableEvent* server_ready_event,
   1269                                    WaitableEvent** events,
   1270                                    RestrictedDispatchDeadlockServer* peer)
   1271       : Worker(server_num == 1 ? "channel1" : "channel2", Channel::MODE_SERVER),
   1272         server_num_(server_num),
   1273         server_ready_event_(server_ready_event),
   1274         events_(events),
   1275         peer_(peer) { }
   1276 
   1277   void OnDoServerTask() {
   1278     events_[3]->Signal();
   1279     events_[2]->Wait();
   1280     events_[0]->Signal();
   1281     SendMessageToClient();
   1282   }
   1283 
   1284   virtual void Run() OVERRIDE {
   1285     channel()->SetRestrictDispatchChannelGroup(1);
   1286     server_ready_event_->Signal();
   1287   }
   1288 
   1289   base::Thread* ListenerThread() { return Worker::ListenerThread(); }
   1290 
   1291  private:
   1292   virtual bool OnMessageReceived(const Message& message) OVERRIDE {
   1293     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockServer, message)
   1294      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
   1295      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
   1296     IPC_END_MESSAGE_MAP()
   1297     return true;
   1298   }
   1299 
   1300   void OnNoArgs() {
   1301     if (server_num_ == 1) {
   1302       DCHECK(peer_ != NULL);
   1303       peer_->SendMessageToClient();
   1304     }
   1305   }
   1306 
   1307   void SendMessageToClient() {
   1308     Message* msg = new SyncChannelTestMsg_NoArgs;
   1309     msg->set_unblock(false);
   1310     DCHECK(!msg->should_unblock());
   1311     Send(msg);
   1312   }
   1313 
   1314   int server_num_;
   1315   WaitableEvent* server_ready_event_;
   1316   WaitableEvent** events_;
   1317   RestrictedDispatchDeadlockServer* peer_;
   1318 };
   1319 
   1320 class RestrictedDispatchDeadlockClient2 : public Worker {
   1321  public:
   1322   RestrictedDispatchDeadlockClient2(RestrictedDispatchDeadlockServer* server,
   1323                                     WaitableEvent* server_ready_event,
   1324                                     WaitableEvent** events)
   1325       : Worker("channel2", Channel::MODE_CLIENT),
   1326         server_ready_event_(server_ready_event),
   1327         events_(events),
   1328         received_msg_(false),
   1329         received_noarg_reply_(false),
   1330         done_issued_(false) {}
   1331 
   1332   virtual void Run() OVERRIDE {
   1333     server_ready_event_->Wait();
   1334   }
   1335 
   1336   void OnDoClient2Task() {
   1337     events_[3]->Wait();
   1338     events_[1]->Signal();
   1339     events_[2]->Signal();
   1340     DCHECK(received_msg_ == false);
   1341 
   1342     Message* message = new SyncChannelTestMsg_NoArgs;
   1343     message->set_unblock(true);
   1344     Send(message);
   1345     received_noarg_reply_ = true;
   1346   }
   1347 
   1348   base::Thread* ListenerThread() { return Worker::ListenerThread(); }
   1349  private:
   1350   virtual bool OnMessageReceived(const Message& message) OVERRIDE {
   1351     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient2, message)
   1352      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
   1353     IPC_END_MESSAGE_MAP()
   1354     return true;
   1355   }
   1356 
   1357   void OnNoArgs() {
   1358     received_msg_ = true;
   1359     PossiblyDone();
   1360   }
   1361 
   1362   void PossiblyDone() {
   1363     if (received_noarg_reply_ && received_msg_) {
   1364       DCHECK(done_issued_ == false);
   1365       done_issued_ = true;
   1366       Send(new SyncChannelTestMsg_Done);
   1367       Done();
   1368     }
   1369   }
   1370 
   1371   WaitableEvent* server_ready_event_;
   1372   WaitableEvent** events_;
   1373   bool received_msg_;
   1374   bool received_noarg_reply_;
   1375   bool done_issued_;
   1376 };
   1377 
   1378 class RestrictedDispatchDeadlockClient1 : public Worker {
   1379  public:
   1380   RestrictedDispatchDeadlockClient1(RestrictedDispatchDeadlockServer* server,
   1381                                     RestrictedDispatchDeadlockClient2* peer,
   1382                                     WaitableEvent* server_ready_event,
   1383                                     WaitableEvent** events)
   1384       : Worker("channel1", Channel::MODE_CLIENT),
   1385         server_(server),
   1386         peer_(peer),
   1387         server_ready_event_(server_ready_event),
   1388         events_(events),
   1389         received_msg_(false),
   1390         received_noarg_reply_(false),
   1391         done_issued_(false) {}
   1392 
   1393   virtual void Run() OVERRIDE {
   1394     server_ready_event_->Wait();
   1395     server_->ListenerThread()->message_loop()->PostTask(
   1396         FROM_HERE,
   1397         base::Bind(&RestrictedDispatchDeadlockServer::OnDoServerTask, server_));
   1398     peer_->ListenerThread()->message_loop()->PostTask(
   1399         FROM_HERE,
   1400         base::Bind(&RestrictedDispatchDeadlockClient2::OnDoClient2Task, peer_));
   1401     events_[0]->Wait();
   1402     events_[1]->Wait();
   1403     DCHECK(received_msg_ == false);
   1404 
   1405     Message* message = new SyncChannelTestMsg_NoArgs;
   1406     message->set_unblock(true);
   1407     Send(message);
   1408     received_noarg_reply_ = true;
   1409     PossiblyDone();
   1410   }
   1411 
   1412  private:
   1413   virtual bool OnMessageReceived(const Message& message) OVERRIDE {
   1414     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient1, message)
   1415      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
   1416     IPC_END_MESSAGE_MAP()
   1417     return true;
   1418   }
   1419 
   1420   void OnNoArgs() {
   1421     received_msg_ = true;
   1422     PossiblyDone();
   1423   }
   1424 
   1425   void PossiblyDone() {
   1426     if (received_noarg_reply_ && received_msg_) {
   1427       DCHECK(done_issued_ == false);
   1428       done_issued_ = true;
   1429       Send(new SyncChannelTestMsg_Done);
   1430       Done();
   1431     }
   1432   }
   1433 
   1434   RestrictedDispatchDeadlockServer* server_;
   1435   RestrictedDispatchDeadlockClient2* peer_;
   1436   WaitableEvent* server_ready_event_;
   1437   WaitableEvent** events_;
   1438   bool received_msg_;
   1439   bool received_noarg_reply_;
   1440   bool done_issued_;
   1441 };
   1442 
   1443 TEST_F(IPCSyncChannelTest, RestrictedDispatchDeadlock) {
   1444   std::vector<Worker*> workers;
   1445 
   1446   // A shared worker thread so that server1 and server2 run on one thread.
   1447   base::Thread worker_thread("RestrictedDispatchDeadlock");
   1448   ASSERT_TRUE(worker_thread.Start());
   1449 
   1450   WaitableEvent server1_ready(false, false);
   1451   WaitableEvent server2_ready(false, false);
   1452 
   1453   WaitableEvent event0(false, false);
   1454   WaitableEvent event1(false, false);
   1455   WaitableEvent event2(false, false);
   1456   WaitableEvent event3(false, false);
   1457   WaitableEvent* events[4] = {&event0, &event1, &event2, &event3};
   1458 
   1459   RestrictedDispatchDeadlockServer* server1;
   1460   RestrictedDispatchDeadlockServer* server2;
   1461   RestrictedDispatchDeadlockClient1* client1;
   1462   RestrictedDispatchDeadlockClient2* client2;
   1463 
   1464   server2 = new RestrictedDispatchDeadlockServer(2, &server2_ready, events,
   1465                                                  NULL);
   1466   server2->OverrideThread(&worker_thread);
   1467   workers.push_back(server2);
   1468 
   1469   client2 = new RestrictedDispatchDeadlockClient2(server2, &server2_ready,
   1470                                                   events);
   1471   workers.push_back(client2);
   1472 
   1473   server1 = new RestrictedDispatchDeadlockServer(1, &server1_ready, events,
   1474                                                  server2);
   1475   server1->OverrideThread(&worker_thread);
   1476   workers.push_back(server1);
   1477 
   1478   client1 = new RestrictedDispatchDeadlockClient1(server1, client2,
   1479                                                   &server1_ready, events);
   1480   workers.push_back(client1);
   1481 
   1482   RunTest(workers);
   1483 }
   1484 
   1485 //------------------------------------------------------------------------------
   1486 
   1487 // This test case inspired by crbug.com/120530
   1488 // We create 4 workers that pipe to each other W1->W2->W3->W4->W1 then we send a
   1489 // message that recurses through 3, 4 or 5 steps to make sure, say, W1 can
   1490 // re-enter when called from W4 while it's sending a message to W2.
   1491 // The first worker drives the whole test so it must be treated specially.
   1492 
   1493 class RestrictedDispatchPipeWorker : public Worker {
   1494  public:
   1495   RestrictedDispatchPipeWorker(
   1496       const std::string &channel1,
   1497       WaitableEvent* event1,
   1498       const std::string &channel2,
   1499       WaitableEvent* event2,
   1500       int group,
   1501       int* success)
   1502       : Worker(channel1, Channel::MODE_SERVER),
   1503         event1_(event1),
   1504         event2_(event2),
   1505         other_channel_name_(channel2),
   1506         group_(group),
   1507         success_(success) {
   1508   }
   1509 
   1510   void OnPingTTL(int ping, int* ret) {
   1511     *ret = 0;
   1512     if (!ping)
   1513       return;
   1514     other_channel_->Send(new SyncChannelTestMsg_PingTTL(ping - 1, ret));
   1515     ++*ret;
   1516   }
   1517 
   1518   void OnDone() {
   1519     if (is_first())
   1520       return;
   1521     other_channel_->Send(new SyncChannelTestMsg_Done);
   1522     other_channel_.reset();
   1523     Done();
   1524   }
   1525 
   1526   virtual void Run() OVERRIDE {
   1527     channel()->SetRestrictDispatchChannelGroup(group_);
   1528     if (is_first())
   1529       event1_->Signal();
   1530     event2_->Wait();
   1531     other_channel_ =
   1532         SyncChannel::Create(other_channel_name_,
   1533                             IPC::Channel::MODE_CLIENT,
   1534                             this,
   1535                             ipc_thread().message_loop_proxy().get(),
   1536                             true,
   1537                             shutdown_event());
   1538     other_channel_->SetRestrictDispatchChannelGroup(group_);
   1539     if (!is_first()) {
   1540       event1_->Signal();
   1541       return;
   1542     }
   1543     *success_ = 0;
   1544     int value = 0;
   1545     OnPingTTL(3, &value);
   1546     *success_ += (value == 3);
   1547     OnPingTTL(4, &value);
   1548     *success_ += (value == 4);
   1549     OnPingTTL(5, &value);
   1550     *success_ += (value == 5);
   1551     other_channel_->Send(new SyncChannelTestMsg_Done);
   1552     other_channel_.reset();
   1553     Done();
   1554   }
   1555 
   1556   bool is_first() { return !!success_; }
   1557 
   1558  private:
   1559   virtual bool OnMessageReceived(const Message& message) OVERRIDE {
   1560     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchPipeWorker, message)
   1561      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL)
   1562      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, OnDone)
   1563     IPC_END_MESSAGE_MAP()
   1564     return true;
   1565   }
   1566 
   1567   scoped_ptr<SyncChannel> other_channel_;
   1568   WaitableEvent* event1_;
   1569   WaitableEvent* event2_;
   1570   std::string other_channel_name_;
   1571   int group_;
   1572   int* success_;
   1573 };
   1574 
   1575 TEST_F(IPCSyncChannelTest, RestrictedDispatch4WayDeadlock) {
   1576   int success = 0;
   1577   std::vector<Worker*> workers;
   1578   WaitableEvent event0(true, false);
   1579   WaitableEvent event1(true, false);
   1580   WaitableEvent event2(true, false);
   1581   WaitableEvent event3(true, false);
   1582   workers.push_back(new RestrictedDispatchPipeWorker(
   1583         "channel0", &event0, "channel1", &event1, 1, &success));
   1584   workers.push_back(new RestrictedDispatchPipeWorker(
   1585         "channel1", &event1, "channel2", &event2, 2, NULL));
   1586   workers.push_back(new RestrictedDispatchPipeWorker(
   1587         "channel2", &event2, "channel3", &event3, 3, NULL));
   1588   workers.push_back(new RestrictedDispatchPipeWorker(
   1589         "channel3", &event3, "channel0", &event0, 4, NULL));
   1590   RunTest(workers);
   1591   EXPECT_EQ(3, success);
   1592 }
   1593 
   1594 //------------------------------------------------------------------------------
   1595 
   1596 // This test case inspired by crbug.com/122443
   1597 // We want to make sure a reply message with the unblock flag set correctly
   1598 // behaves as a reply, not a regular message.
   1599 // We have 3 workers. Server1 will send a message to Server2 (which will block),
   1600 // during which it will dispatch a message comming from Client, at which point
   1601 // it will send another message to Server2. While sending that second message it
   1602 // will receive a reply from Server1 with the unblock flag.
   1603 
   1604 class ReentrantReplyServer1 : public Worker {
   1605  public:
   1606   ReentrantReplyServer1(WaitableEvent* server_ready)
   1607       : Worker("reentrant_reply1", Channel::MODE_SERVER),
   1608         server_ready_(server_ready) { }
   1609 
   1610   virtual void Run() OVERRIDE {
   1611     server2_channel_ =
   1612         SyncChannel::Create("reentrant_reply2",
   1613                             IPC::Channel::MODE_CLIENT,
   1614                             this,
   1615                             ipc_thread().message_loop_proxy().get(),
   1616                             true,
   1617                             shutdown_event());
   1618     server_ready_->Signal();
   1619     Message* msg = new SyncChannelTestMsg_Reentrant1();
   1620     server2_channel_->Send(msg);
   1621     server2_channel_.reset();
   1622     Done();
   1623   }
   1624 
   1625  private:
   1626   virtual bool OnMessageReceived(const Message& message) OVERRIDE {
   1627     IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer1, message)
   1628      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant2, OnReentrant2)
   1629      IPC_REPLY_HANDLER(OnReply)
   1630     IPC_END_MESSAGE_MAP()
   1631     return true;
   1632   }
   1633 
   1634   void OnReentrant2() {
   1635     Message* msg = new SyncChannelTestMsg_Reentrant3();
   1636     server2_channel_->Send(msg);
   1637   }
   1638 
   1639   void OnReply(const Message& message) {
   1640     // If we get here, the Send() will never receive the reply (thus would
   1641     // hang), so abort instead.
   1642     LOG(FATAL) << "Reply message was dispatched";
   1643   }
   1644 
   1645   WaitableEvent* server_ready_;
   1646   scoped_ptr<SyncChannel> server2_channel_;
   1647 };
   1648 
   1649 class ReentrantReplyServer2 : public Worker {
   1650  public:
   1651   ReentrantReplyServer2()
   1652       : Worker("reentrant_reply2", Channel::MODE_SERVER),
   1653         reply_(NULL) { }
   1654 
   1655  private:
   1656   virtual bool OnMessageReceived(const Message& message) OVERRIDE {
   1657     IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer2, message)
   1658      IPC_MESSAGE_HANDLER_DELAY_REPLY(
   1659          SyncChannelTestMsg_Reentrant1, OnReentrant1)
   1660      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant3, OnReentrant3)
   1661     IPC_END_MESSAGE_MAP()
   1662     return true;
   1663   }
   1664 
   1665   void OnReentrant1(Message* reply) {
   1666     DCHECK(!reply_);
   1667     reply_ = reply;
   1668   }
   1669 
   1670   void OnReentrant3() {
   1671     DCHECK(reply_);
   1672     Message* reply = reply_;
   1673     reply_ = NULL;
   1674     reply->set_unblock(true);
   1675     Send(reply);
   1676     Done();
   1677   }
   1678 
   1679   Message* reply_;
   1680 };
   1681 
   1682 class ReentrantReplyClient : public Worker {
   1683  public:
   1684   ReentrantReplyClient(WaitableEvent* server_ready)
   1685       : Worker("reentrant_reply1", Channel::MODE_CLIENT),
   1686         server_ready_(server_ready) { }
   1687 
   1688   virtual void Run() OVERRIDE {
   1689     server_ready_->Wait();
   1690     Send(new SyncChannelTestMsg_Reentrant2());
   1691     Done();
   1692   }
   1693 
   1694  private:
   1695   WaitableEvent* server_ready_;
   1696 };
   1697 
   1698 TEST_F(IPCSyncChannelTest, ReentrantReply) {
   1699   std::vector<Worker*> workers;
   1700   WaitableEvent server_ready(false, false);
   1701   workers.push_back(new ReentrantReplyServer2());
   1702   workers.push_back(new ReentrantReplyServer1(&server_ready));
   1703   workers.push_back(new ReentrantReplyClient(&server_ready));
   1704   RunTest(workers);
   1705 }
   1706 
   1707 //------------------------------------------------------------------------------
   1708 
   1709 // Generate a validated channel ID using Channel::GenerateVerifiedChannelID().
   1710 
   1711 class VerifiedServer : public Worker {
   1712  public:
   1713   VerifiedServer(base::Thread* listener_thread,
   1714                  const std::string& channel_name,
   1715                  const std::string& reply_text)
   1716       : Worker(channel_name, Channel::MODE_SERVER),
   1717         reply_text_(reply_text) {
   1718     Worker::OverrideThread(listener_thread);
   1719   }
   1720 
   1721   virtual void OnNestedTestMsg(Message* reply_msg) OVERRIDE {
   1722     VLOG(1) << __FUNCTION__ << " Sending reply: " << reply_text_;
   1723     SyncChannelNestedTestMsg_String::WriteReplyParams(reply_msg, reply_text_);
   1724     Send(reply_msg);
   1725     ASSERT_EQ(channel()->GetPeerPID(), base::GetCurrentProcId());
   1726     Done();
   1727   }
   1728 
   1729  private:
   1730   std::string reply_text_;
   1731 };
   1732 
   1733 class VerifiedClient : public Worker {
   1734  public:
   1735   VerifiedClient(base::Thread* listener_thread,
   1736                  const std::string& channel_name,
   1737                  const std::string& expected_text)
   1738       : Worker(channel_name, Channel::MODE_CLIENT),
   1739         expected_text_(expected_text) {
   1740     Worker::OverrideThread(listener_thread);
   1741   }
   1742 
   1743   virtual void Run() OVERRIDE {
   1744     std::string response;
   1745     SyncMessage* msg = new SyncChannelNestedTestMsg_String(&response);
   1746     bool result = Send(msg);
   1747     DCHECK(result);
   1748     DCHECK_EQ(response, expected_text_);
   1749     // expected_text_ is only used in the above DCHECK. This line suppresses the
   1750     // "unused private field" warning in release builds.
   1751     (void)expected_text_;
   1752 
   1753     VLOG(1) << __FUNCTION__ << " Received reply: " << response;
   1754     ASSERT_EQ(channel()->GetPeerPID(), base::GetCurrentProcId());
   1755     Done();
   1756   }
   1757 
   1758  private:
   1759   std::string expected_text_;
   1760 };
   1761 
   1762 void Verified() {
   1763   std::vector<Worker*> workers;
   1764 
   1765   // A shared worker thread for servers
   1766   base::Thread server_worker_thread("Verified_ServerListener");
   1767   ASSERT_TRUE(server_worker_thread.Start());
   1768 
   1769   base::Thread client_worker_thread("Verified_ClientListener");
   1770   ASSERT_TRUE(client_worker_thread.Start());
   1771 
   1772   std::string channel_id = Channel::GenerateVerifiedChannelID("Verified");
   1773   Worker* worker;
   1774 
   1775   worker = new VerifiedServer(&server_worker_thread,
   1776                               channel_id,
   1777                               "Got first message");
   1778   workers.push_back(worker);
   1779 
   1780   worker = new VerifiedClient(&client_worker_thread,
   1781                               channel_id,
   1782                               "Got first message");
   1783   workers.push_back(worker);
   1784 
   1785   RunTest(workers);
   1786 }
   1787 
   1788 // Windows needs to send an out-of-band secret to verify the client end of the
   1789 // channel. Test that we still connect correctly in that case.
   1790 TEST_F(IPCSyncChannelTest, Verified) {
   1791   Verified();
   1792 }
   1793 
   1794 }  // namespace
   1795 }  // namespace IPC
   1796