Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 // TODO(vtl): Enable this on non-POSIX once we have a non-POSIX implementation.
      6 #include "build/build_config.h"
      7 #if defined(OS_POSIX)
      8 
      9 #include <stdint.h>
     10 
     11 #include <string>
     12 
     13 #include "base/basictypes.h"
     14 #include "base/bind.h"
     15 #include "base/callback.h"
     16 #include "base/location.h"
     17 #include "base/logging.h"
     18 #include "base/message_loop/message_loop.h"
     19 #include "base/threading/thread.h"
     20 #include "mojo/common/test/multiprocess_test_base.h"
     21 #include "mojo/system/channel.h"
     22 #include "mojo/system/local_message_pipe_endpoint.h"
     23 #include "mojo/system/message_pipe.h"
     24 #include "mojo/system/platform_channel.h"
     25 #include "mojo/system/proxy_message_pipe_endpoint.h"
     26 #include "mojo/system/test_utils.h"
     27 #include "mojo/system/waiter.h"
     28 
     29 namespace mojo {
     30 namespace system {
     31 namespace {
     32 
     33 class IOThreadWrapper {
     34  public:
     35   IOThreadWrapper() : io_thread_("io_thread") {}
     36   ~IOThreadWrapper() {
     37     CHECK(!channel_.get());
     38     CHECK(!io_thread_.IsRunning());
     39   }
     40 
     41   void PostTask(const tracked_objects::Location& from_here,
     42                 const base::Closure& task) {
     43     task_runner()->PostTask(from_here, task);
     44   }
     45 
     46   void PostTaskAndWait(const tracked_objects::Location& from_here,
     47                        const base::Closure& task) {
     48     test::PostTaskAndWait(task_runner(), from_here, task);
     49   }
     50 
     51   void Init(PlatformChannel* platform_channel, scoped_refptr<MessagePipe> mp) {
     52     io_thread_.StartWithOptions(
     53         base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
     54     PostTask(FROM_HERE,
     55              base::Bind(&IOThreadWrapper::InitOnIOThread,
     56                         base::Unretained(this),
     57                         platform_channel, mp));
     58   }
     59 
     60   void Shutdown() {
     61     PostTaskAndWait(FROM_HERE,
     62                     base::Bind(&IOThreadWrapper::ShutdownOnIOThread,
     63                                base::Unretained(this)));
     64     io_thread_.Stop();
     65   }
     66 
     67   bool is_initialized() const { return !!channel_.get(); }
     68 
     69   base::MessageLoop* message_loop() {
     70     return io_thread_.message_loop();
     71   }
     72 
     73   scoped_refptr<base::TaskRunner> task_runner() {
     74     return message_loop()->message_loop_proxy();
     75   }
     76 
     77  private:
     78   void InitOnIOThread(PlatformChannel* platform_channel,
     79                       scoped_refptr<MessagePipe> mp) {
     80     CHECK_EQ(base::MessageLoop::current(), message_loop());
     81     CHECK(platform_channel);
     82     CHECK(platform_channel->is_valid());
     83 
     84     // Create and initialize |Channel|.
     85     channel_ = new Channel();
     86     CHECK(channel_->Init(platform_channel->PassHandle()));
     87 
     88     // Attach the message pipe endpoint.
     89     // Note: On the "server" (parent process) side, we need not attach the
     90     // message pipe endpoint immediately. However, on the "client" (child
     91     // process) side, this *must* be done here -- otherwise, the |Channel| may
     92     // receive/process messages (which it can do as soon as it's hooked up to
     93     // the IO thread message loop, and that message loop runs) before the
     94     // message pipe endpoint is attached.
     95     CHECK_EQ(channel_->AttachMessagePipeEndpoint(mp, 1),
     96              Channel::kBootstrapEndpointId);
     97     channel_->RunMessagePipeEndpoint(Channel::kBootstrapEndpointId,
     98                                      Channel::kBootstrapEndpointId);
     99   }
    100 
    101   void ShutdownOnIOThread() {
    102     CHECK(channel_.get());
    103     channel_->Shutdown();
    104     channel_ = NULL;
    105   }
    106 
    107   base::Thread io_thread_;
    108   scoped_refptr<Channel> channel_;
    109 
    110   DISALLOW_COPY_AND_ASSIGN(IOThreadWrapper);
    111 };
    112 
    113 class MultiprocessMessagePipeTest : public mojo::test::MultiprocessTestBase {
    114  public:
    115   MultiprocessMessagePipeTest() {}
    116   virtual ~MultiprocessMessagePipeTest() {}
    117 
    118   virtual void TearDown() OVERRIDE {
    119     if (io_thread_wrapper_.is_initialized())
    120       io_thread_wrapper_.Shutdown();
    121     mojo::test::MultiprocessTestBase::TearDown();
    122   }
    123 
    124   void Init(scoped_refptr<MessagePipe> mp) {
    125     io_thread_wrapper_.Init(platform_server_channel.get(), mp);
    126   }
    127 
    128  private:
    129   IOThreadWrapper io_thread_wrapper_;
    130 
    131   DISALLOW_COPY_AND_ASSIGN(MultiprocessMessagePipeTest);
    132 };
    133 
    134 MojoResult WaitIfNecessary(scoped_refptr<MessagePipe> mp, MojoWaitFlags flags) {
    135   Waiter waiter;
    136   waiter.Init();
    137 
    138   MojoResult add_result = mp->AddWaiter(0, &waiter, flags, MOJO_RESULT_OK);
    139   if (add_result != MOJO_RESULT_OK) {
    140     return (add_result == MOJO_RESULT_ALREADY_EXISTS) ? MOJO_RESULT_OK :
    141                                                         add_result;
    142   }
    143 
    144   MojoResult wait_result = waiter.Wait(MOJO_DEADLINE_INDEFINITE);
    145   mp->RemoveWaiter(0, &waiter);
    146   return wait_result;
    147 }
    148 
    149 // For each message received, sends a reply message with the same contents
    150 // repeated twice, until the other end is closed or it receives "quitquitquit"
    151 // (which it doesn't reply to). It'll return the number of messages received,
    152 // not including any "quitquitquit" message, modulo 100.
    153 MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho) {
    154   IOThreadWrapper io_thread_wrapper;
    155   PlatformClientChannel* const platform_client_channel =
    156       MultiprocessMessagePipeTest::platform_client_channel.get();
    157   CHECK(platform_client_channel);
    158   CHECK(platform_client_channel->is_valid());
    159   scoped_refptr<MessagePipe> mp(new MessagePipe(
    160       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
    161       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
    162   io_thread_wrapper.Init(platform_client_channel, mp);
    163 
    164   const std::string quitquitquit("quitquitquit");
    165   int rv = 0;
    166   for (;; rv = (rv + 1) % 100) {
    167     // Wait for our end of the message pipe to be readable.
    168     MojoResult result = WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE);
    169     if (result != MOJO_RESULT_OK) {
    170       // It was closed, probably.
    171       CHECK_EQ(result, MOJO_RESULT_FAILED_PRECONDITION);
    172       break;
    173     }
    174 
    175     std::string read_buffer(1000, '\0');
    176     uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
    177     CHECK_EQ(mp->ReadMessage(0,
    178                              &read_buffer[0], &read_buffer_size,
    179                              NULL, NULL,
    180                              MOJO_READ_MESSAGE_FLAG_NONE),
    181              MOJO_RESULT_OK);
    182     read_buffer.resize(read_buffer_size);
    183     VLOG(2) << "Child got: " << read_buffer;
    184 
    185     if (read_buffer == quitquitquit) {
    186       VLOG(2) << "Child quitting.";
    187       break;
    188     }
    189 
    190     std::string write_buffer = read_buffer + read_buffer;
    191     CHECK_EQ(mp->WriteMessage(0,
    192                               write_buffer.data(),
    193                               static_cast<uint32_t>(write_buffer.size()),
    194                               NULL,
    195                               MOJO_WRITE_MESSAGE_FLAG_NONE),
    196              MOJO_RESULT_OK);
    197   }
    198 
    199 
    200   mp->Close(0);
    201   io_thread_wrapper.Shutdown();
    202   return rv;
    203 }
    204 
    205 // Sends "hello" to child, and expects "hellohello" back.
    206 TEST_F(MultiprocessMessagePipeTest, Basic) {
    207   StartChild("EchoEcho");
    208 
    209   scoped_refptr<MessagePipe> mp(new MessagePipe(
    210       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
    211       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
    212   Init(mp);
    213 
    214   std::string hello("hello");
    215   EXPECT_EQ(MOJO_RESULT_OK,
    216             mp->WriteMessage(0,
    217                              hello.data(), static_cast<uint32_t>(hello.size()),
    218                              NULL,
    219                              MOJO_WRITE_MESSAGE_FLAG_NONE));
    220 
    221   EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE));
    222 
    223   std::string read_buffer(1000, '\0');
    224   uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
    225   CHECK_EQ(mp->ReadMessage(0,
    226                            &read_buffer[0], &read_buffer_size,
    227                            NULL, NULL,
    228                            MOJO_READ_MESSAGE_FLAG_NONE),
    229            MOJO_RESULT_OK);
    230   read_buffer.resize(read_buffer_size);
    231   VLOG(2) << "Parent got: " << read_buffer;
    232   EXPECT_EQ(hello + hello, read_buffer);
    233 
    234   mp->Close(0);
    235 
    236   // We sent one message.
    237   EXPECT_EQ(1 % 100, WaitForChildShutdown());
    238 }
    239 
    240 // Sends a bunch of messages to the child. Expects them "repeated" back. Waits
    241 // for the child to close its end before quitting.
    242 TEST_F(MultiprocessMessagePipeTest, QueueMessages) {
    243   StartChild("EchoEcho");
    244 
    245   scoped_refptr<MessagePipe> mp(new MessagePipe(
    246       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
    247       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
    248   Init(mp);
    249 
    250   static const size_t kNumMessages = 1001;
    251   for (size_t i = 0; i < kNumMessages; i++) {
    252     std::string write_buffer(i, 'A' + (i % 26));
    253     EXPECT_EQ(MOJO_RESULT_OK,
    254               mp->WriteMessage(0,
    255                                write_buffer.data(),
    256                                static_cast<uint32_t>(write_buffer.size()),
    257                                NULL,
    258                                MOJO_WRITE_MESSAGE_FLAG_NONE));
    259   }
    260 
    261   const std::string quitquitquit("quitquitquit");
    262   EXPECT_EQ(MOJO_RESULT_OK,
    263             mp->WriteMessage(0,
    264                              quitquitquit.data(),
    265                              static_cast<uint32_t>(quitquitquit.size()),
    266                              NULL,
    267                              MOJO_WRITE_MESSAGE_FLAG_NONE));
    268 
    269   for (size_t i = 0; i < kNumMessages; i++) {
    270     EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE));
    271 
    272     std::string read_buffer(kNumMessages * 2, '\0');
    273     uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
    274     CHECK_EQ(mp->ReadMessage(0,
    275                              &read_buffer[0], &read_buffer_size,
    276                              NULL, NULL,
    277                              MOJO_READ_MESSAGE_FLAG_NONE),
    278              MOJO_RESULT_OK);
    279     read_buffer.resize(read_buffer_size);
    280 
    281     EXPECT_EQ(std::string(i * 2, 'A' + (i % 26)), read_buffer);
    282   }
    283 
    284   // Wait for it to become readable, which should fail (since we sent
    285   // "quitquitquit").
    286   EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    287             WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE));
    288 
    289   mp->Close(0);
    290 
    291   EXPECT_EQ(static_cast<int>(kNumMessages % 100), WaitForChildShutdown());
    292 }
    293 
    294 }  // namespace
    295 }  // namespace system
    296 }  // namespace mojo
    297 
    298 #endif  // defined(OS_POSIX)
    299