Home | History | Annotate | Download | only in system
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include <stdint.h>
      6 #include <stdio.h>
      7 #include <string.h>
      8 
      9 #include <vector>
     10 
     11 #include "base/basictypes.h"
     12 #include "base/bind.h"
     13 #include "base/file_util.h"
     14 #include "base/files/file_path.h"
     15 #include "base/files/scoped_file.h"
     16 #include "base/location.h"
     17 #include "base/logging.h"
     18 #include "base/message_loop/message_loop.h"
     19 #include "base/threading/platform_thread.h"  // For |Sleep()|.
     20 #include "build/build_config.h"  // TODO(vtl): Remove this.
     21 #include "mojo/common/test/test_utils.h"
     22 #include "mojo/embedder/platform_channel_pair.h"
     23 #include "mojo/embedder/scoped_platform_handle.h"
     24 #include "mojo/system/channel.h"
     25 #include "mojo/system/local_message_pipe_endpoint.h"
     26 #include "mojo/system/message_pipe.h"
     27 #include "mojo/system/message_pipe_dispatcher.h"
     28 #include "mojo/system/platform_handle_dispatcher.h"
     29 #include "mojo/system/proxy_message_pipe_endpoint.h"
     30 #include "mojo/system/raw_channel.h"
     31 #include "mojo/system/shared_buffer_dispatcher.h"
     32 #include "mojo/system/test_utils.h"
     33 #include "mojo/system/waiter.h"
     34 #include "testing/gtest/include/gtest/gtest.h"
     35 
     36 namespace mojo {
     37 namespace system {
     38 namespace {
     39 
     40 class RemoteMessagePipeTest : public testing::Test {
     41  public:
     42   RemoteMessagePipeTest() : io_thread_(test::TestIOThread::kAutoStart) {}
     43   virtual ~RemoteMessagePipeTest() {}
     44 
     45   virtual void SetUp() OVERRIDE {
     46     io_thread_.PostTaskAndWait(
     47         FROM_HERE,
     48         base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
     49                    base::Unretained(this)));
     50   }
     51 
     52   virtual void TearDown() OVERRIDE {
     53     io_thread_.PostTaskAndWait(
     54         FROM_HERE,
     55         base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
     56                    base::Unretained(this)));
     57   }
     58 
     59  protected:
     60   // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1,
     61   // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP
     62   // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s.
     63   void ConnectMessagePipes(scoped_refptr<MessagePipe> mp0,
     64                            scoped_refptr<MessagePipe> mp1) {
     65     io_thread_.PostTaskAndWait(
     66         FROM_HERE,
     67         base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
     68                    base::Unretained(this), mp0, mp1));
     69   }
     70 
     71   // This connects |mp|'s port |channel_index ^ 1| to channel |channel_index|.
     72   // It assumes/requires that this is the bootstrap case, i.e., that the
     73   // endpoint IDs are both/will both be |Channel::kBootstrapEndpointId|. This
     74   // returns *without* waiting for it to finish connecting.
     75   void BootstrapMessagePipeNoWait(unsigned channel_index,
     76                                   scoped_refptr<MessagePipe> mp) {
     77     io_thread_.PostTask(
     78         FROM_HERE,
     79         base::Bind(&RemoteMessagePipeTest::BootstrapMessagePipeOnIOThread,
     80                    base::Unretained(this), channel_index, mp));
     81   }
     82 
     83   void RestoreInitialState() {
     84     io_thread_.PostTaskAndWait(
     85         FROM_HERE,
     86         base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread,
     87                    base::Unretained(this)));
     88   }
     89 
     90   test::TestIOThread* io_thread() { return &io_thread_; }
     91 
     92  private:
     93   void SetUpOnIOThread() {
     94     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
     95 
     96     embedder::PlatformChannelPair channel_pair;
     97     platform_handles_[0] = channel_pair.PassServerHandle();
     98     platform_handles_[1] = channel_pair.PassClientHandle();
     99   }
    100 
    101   void TearDownOnIOThread() {
    102     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
    103 
    104     if (channels_[0]) {
    105       channels_[0]->Shutdown();
    106       channels_[0] = NULL;
    107     }
    108     if (channels_[1]) {
    109       channels_[1]->Shutdown();
    110       channels_[1] = NULL;
    111     }
    112   }
    113 
    114   void CreateAndInitChannel(unsigned channel_index) {
    115     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
    116     CHECK(channel_index == 0 || channel_index == 1);
    117     CHECK(!channels_[channel_index]);
    118 
    119     channels_[channel_index] = new Channel();
    120     CHECK(channels_[channel_index]->Init(
    121         RawChannel::Create(platform_handles_[channel_index].Pass())));
    122   }
    123 
    124   void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp0,
    125                                      scoped_refptr<MessagePipe> mp1) {
    126     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
    127 
    128     if (!channels_[0])
    129       CreateAndInitChannel(0);
    130     if (!channels_[1])
    131       CreateAndInitChannel(1);
    132 
    133     MessageInTransit::EndpointId local_id0 =
    134         channels_[0]->AttachMessagePipeEndpoint(mp0, 1);
    135     MessageInTransit::EndpointId local_id1 =
    136         channels_[1]->AttachMessagePipeEndpoint(mp1, 0);
    137 
    138     CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1));
    139     CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0));
    140   }
    141 
    142   void BootstrapMessagePipeOnIOThread(unsigned channel_index,
    143                                       scoped_refptr<MessagePipe> mp) {
    144     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
    145     CHECK(channel_index == 0 || channel_index == 1);
    146 
    147     unsigned port = channel_index ^ 1u;
    148 
    149     CreateAndInitChannel(channel_index);
    150     MessageInTransit::EndpointId endpoint_id =
    151         channels_[channel_index]->AttachMessagePipeEndpoint(mp, port);
    152     if (endpoint_id == MessageInTransit::kInvalidEndpointId)
    153       return;
    154 
    155     CHECK_EQ(endpoint_id, Channel::kBootstrapEndpointId);
    156     CHECK(channels_[channel_index]->RunMessagePipeEndpoint(
    157         Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId));
    158   }
    159 
    160   void RestoreInitialStateOnIOThread() {
    161     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
    162 
    163     TearDownOnIOThread();
    164     SetUpOnIOThread();
    165   }
    166 
    167   test::TestIOThread io_thread_;
    168   embedder::ScopedPlatformHandle platform_handles_[2];
    169   scoped_refptr<Channel> channels_[2];
    170 
    171   DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
    172 };
    173 
    174 TEST_F(RemoteMessagePipeTest, Basic) {
    175   static const char kHello[] = "hello";
    176   static const char kWorld[] = "world!!!1!!!1!";
    177   char buffer[100] = { 0 };
    178   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
    179   Waiter waiter;
    180   uint32_t context = 0;
    181 
    182   // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
    183   // connected to MP 1, port 0, which will be attached to channel 1. This leaves
    184   // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
    185 
    186   scoped_refptr<MessagePipe> mp0(new MessagePipe(
    187       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
    188       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
    189   scoped_refptr<MessagePipe> mp1(new MessagePipe(
    190       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
    191       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
    192   ConnectMessagePipes(mp0, mp1);
    193 
    194   // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
    195 
    196   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
    197   // it later, it might already be readable.)
    198   waiter.Init();
    199   EXPECT_EQ(MOJO_RESULT_OK,
    200             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
    201 
    202   // Write to MP 0, port 0.
    203   EXPECT_EQ(MOJO_RESULT_OK,
    204             mp0->WriteMessage(0,
    205                               kHello, sizeof(kHello),
    206                               NULL,
    207                               MOJO_WRITE_MESSAGE_FLAG_NONE));
    208 
    209   // Wait.
    210   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    211   EXPECT_EQ(123u, context);
    212   mp1->RemoveWaiter(1, &waiter);
    213 
    214   // Read from MP 1, port 1.
    215   EXPECT_EQ(MOJO_RESULT_OK,
    216             mp1->ReadMessage(1,
    217                              buffer, &buffer_size,
    218                              NULL, NULL,
    219                              MOJO_READ_MESSAGE_FLAG_NONE));
    220   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
    221   EXPECT_STREQ(kHello, buffer);
    222 
    223   // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
    224 
    225   waiter.Init();
    226   EXPECT_EQ(MOJO_RESULT_OK,
    227             mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456));
    228 
    229   EXPECT_EQ(MOJO_RESULT_OK,
    230             mp1->WriteMessage(1,
    231                               kWorld, sizeof(kWorld),
    232                               NULL,
    233                               MOJO_WRITE_MESSAGE_FLAG_NONE));
    234 
    235   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    236   EXPECT_EQ(456u, context);
    237   mp0->RemoveWaiter(0, &waiter);
    238 
    239   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    240   EXPECT_EQ(MOJO_RESULT_OK,
    241             mp0->ReadMessage(0,
    242                              buffer, &buffer_size,
    243                              NULL, NULL,
    244                              MOJO_READ_MESSAGE_FLAG_NONE));
    245   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
    246   EXPECT_STREQ(kWorld, buffer);
    247 
    248   // Close MP 0, port 0.
    249   mp0->Close(0);
    250 
    251   // Try to wait for MP 1, port 1 to become readable. This will eventually fail
    252   // when it realizes that MP 0, port 0 has been closed. (It may also fail
    253   // immediately.)
    254   waiter.Init();
    255   MojoResult result =
    256       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789);
    257   if (result == MOJO_RESULT_OK) {
    258     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    259               waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    260     EXPECT_EQ(789u, context);
    261     mp1->RemoveWaiter(1, &waiter);
    262   } else {
    263     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
    264   }
    265 
    266   // And MP 1, port 1.
    267   mp1->Close(1);
    268 }
    269 
    270 TEST_F(RemoteMessagePipeTest, Multiplex) {
    271   static const char kHello[] = "hello";
    272   static const char kWorld[] = "world!!!1!!!1!";
    273   char buffer[100] = { 0 };
    274   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
    275   Waiter waiter;
    276   uint32_t context = 0;
    277 
    278   // Connect message pipes as in the |Basic| test.
    279 
    280   scoped_refptr<MessagePipe> mp0(new MessagePipe(
    281       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
    282       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
    283   scoped_refptr<MessagePipe> mp1(new MessagePipe(
    284       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
    285       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
    286   ConnectMessagePipes(mp0, mp1);
    287 
    288   // Now put another message pipe on the channel.
    289 
    290   scoped_refptr<MessagePipe> mp2(new MessagePipe(
    291       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
    292       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
    293   scoped_refptr<MessagePipe> mp3(new MessagePipe(
    294       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
    295       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
    296   ConnectMessagePipes(mp2, mp3);
    297 
    298   // Write: MP 2, port 0 -> MP 3, port 1.
    299 
    300   waiter.Init();
    301   EXPECT_EQ(MOJO_RESULT_OK,
    302             mp3->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789));
    303 
    304   EXPECT_EQ(MOJO_RESULT_OK,
    305             mp2->WriteMessage(0,
    306                               kHello, sizeof(kHello),
    307                               NULL,
    308                               MOJO_WRITE_MESSAGE_FLAG_NONE));
    309 
    310   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    311   EXPECT_EQ(789u, context);
    312   mp3->RemoveWaiter(1, &waiter);
    313 
    314   // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
    315   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    316   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
    317             mp0->ReadMessage(0,
    318                              buffer, &buffer_size,
    319                              NULL, NULL,
    320                              MOJO_READ_MESSAGE_FLAG_NONE));
    321   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    322   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
    323             mp1->ReadMessage(1,
    324                              buffer, &buffer_size,
    325                              NULL, NULL,
    326                              MOJO_READ_MESSAGE_FLAG_NONE));
    327   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    328   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
    329             mp2->ReadMessage(0,
    330                              buffer, &buffer_size,
    331                              NULL, NULL,
    332                              MOJO_READ_MESSAGE_FLAG_NONE));
    333 
    334   // Read from MP 3, port 1.
    335   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    336   EXPECT_EQ(MOJO_RESULT_OK,
    337             mp3->ReadMessage(1,
    338                              buffer, &buffer_size,
    339                              NULL, NULL,
    340                              MOJO_READ_MESSAGE_FLAG_NONE));
    341   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
    342   EXPECT_STREQ(kHello, buffer);
    343 
    344   // Write: MP 0, port 0 -> MP 1, port 1 again.
    345 
    346   waiter.Init();
    347   EXPECT_EQ(MOJO_RESULT_OK,
    348             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
    349 
    350   EXPECT_EQ(MOJO_RESULT_OK,
    351             mp0->WriteMessage(0,
    352                               kWorld, sizeof(kWorld),
    353                               NULL,
    354                               MOJO_WRITE_MESSAGE_FLAG_NONE));
    355 
    356   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    357   EXPECT_EQ(123u, context);
    358   mp1->RemoveWaiter(1, &waiter);
    359 
    360   // Make sure there's nothing on the other ports.
    361   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    362   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
    363             mp0->ReadMessage(0,
    364                              buffer, &buffer_size,
    365                              NULL, NULL,
    366                              MOJO_READ_MESSAGE_FLAG_NONE));
    367   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    368   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
    369             mp2->ReadMessage(0,
    370                              buffer, &buffer_size,
    371                              NULL, NULL,
    372                              MOJO_READ_MESSAGE_FLAG_NONE));
    373   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    374   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
    375             mp3->ReadMessage(1,
    376                              buffer, &buffer_size,
    377                              NULL, NULL,
    378                              MOJO_READ_MESSAGE_FLAG_NONE));
    379 
    380   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    381   EXPECT_EQ(MOJO_RESULT_OK,
    382             mp1->ReadMessage(1,
    383                              buffer, &buffer_size,
    384                              NULL, NULL,
    385                              MOJO_READ_MESSAGE_FLAG_NONE));
    386   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
    387   EXPECT_STREQ(kWorld, buffer);
    388 
    389   mp0->Close(0);
    390   mp1->Close(1);
    391   mp2->Close(0);
    392   mp3->Close(1);
    393 }
    394 
    395 TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
    396   static const char kHello[] = "hello";
    397   char buffer[100] = { 0 };
    398   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
    399   Waiter waiter;
    400   uint32_t context = 0;
    401 
    402   // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
    403   // connected to MP 1, port 0, which will be attached to channel 1. This leaves
    404   // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
    405 
    406   scoped_refptr<MessagePipe> mp0(new MessagePipe(
    407       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
    408       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
    409 
    410   // Write to MP 0, port 0.
    411   EXPECT_EQ(MOJO_RESULT_OK,
    412             mp0->WriteMessage(0,
    413                               kHello, sizeof(kHello),
    414                               NULL,
    415                               MOJO_WRITE_MESSAGE_FLAG_NONE));
    416 
    417   BootstrapMessagePipeNoWait(0, mp0);
    418 
    419 
    420   // Close MP 0, port 0 before channel 1 is even connected.
    421   mp0->Close(0);
    422 
    423   scoped_refptr<MessagePipe> mp1(new MessagePipe(
    424       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
    425       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
    426 
    427   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
    428   // it later, it might already be readable.)
    429   waiter.Init();
    430   EXPECT_EQ(MOJO_RESULT_OK,
    431             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
    432 
    433   BootstrapMessagePipeNoWait(1, mp1);
    434 
    435   // Wait.
    436   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    437   EXPECT_EQ(123u, context);
    438   mp1->RemoveWaiter(1, &waiter);
    439 
    440   // Read from MP 1, port 1.
    441   EXPECT_EQ(MOJO_RESULT_OK,
    442             mp1->ReadMessage(1,
    443                              buffer, &buffer_size,
    444                              NULL, NULL,
    445                              MOJO_READ_MESSAGE_FLAG_NONE));
    446   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
    447   EXPECT_STREQ(kHello, buffer);
    448 
    449   // And MP 1, port 1.
    450   mp1->Close(1);
    451 }
    452 
    453 TEST_F(RemoteMessagePipeTest, HandlePassing) {
    454   static const char kHello[] = "hello";
    455   Waiter waiter;
    456   uint32_t context = 0;
    457 
    458   scoped_refptr<MessagePipe> mp0(new MessagePipe(
    459       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
    460       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
    461   scoped_refptr<MessagePipe> mp1(new MessagePipe(
    462       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
    463       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
    464   ConnectMessagePipes(mp0, mp1);
    465 
    466   // We'll try to pass this dispatcher.
    467   scoped_refptr<MessagePipeDispatcher> dispatcher(new MessagePipeDispatcher(
    468       MessagePipeDispatcher::kDefaultCreateOptions));
    469   scoped_refptr<MessagePipe> local_mp(new MessagePipe());
    470   dispatcher->Init(local_mp, 0);
    471 
    472   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
    473   // it later, it might already be readable.)
    474   waiter.Init();
    475   EXPECT_EQ(MOJO_RESULT_OK,
    476             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
    477 
    478   // Write to MP 0, port 0.
    479   {
    480     DispatcherTransport
    481         transport(test::DispatcherTryStartTransport(dispatcher.get()));
    482     EXPECT_TRUE(transport.is_valid());
    483 
    484     std::vector<DispatcherTransport> transports;
    485     transports.push_back(transport);
    486     EXPECT_EQ(MOJO_RESULT_OK,
    487               mp0->WriteMessage(0, kHello, sizeof(kHello), &transports,
    488                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
    489     transport.End();
    490 
    491     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
    492     // |dispatcher| is destroyed.
    493     EXPECT_TRUE(dispatcher->HasOneRef());
    494     dispatcher = NULL;
    495   }
    496 
    497   // Wait.
    498   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    499   EXPECT_EQ(123u, context);
    500   mp1->RemoveWaiter(1, &waiter);
    501 
    502   // Read from MP 1, port 1.
    503   char read_buffer[100] = { 0 };
    504   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
    505   DispatcherVector read_dispatchers;
    506   uint32_t read_num_dispatchers = 10;  // Maximum to get.
    507   EXPECT_EQ(MOJO_RESULT_OK,
    508             mp1->ReadMessage(1, read_buffer, &read_buffer_size,
    509                              &read_dispatchers, &read_num_dispatchers,
    510                              MOJO_READ_MESSAGE_FLAG_NONE));
    511   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
    512   EXPECT_STREQ(kHello, read_buffer);
    513   EXPECT_EQ(1u, read_dispatchers.size());
    514   EXPECT_EQ(1u, read_num_dispatchers);
    515   ASSERT_TRUE(read_dispatchers[0]);
    516   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
    517 
    518   EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
    519   dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
    520 
    521   // Write to "local_mp", port 1.
    522   EXPECT_EQ(MOJO_RESULT_OK,
    523             local_mp->WriteMessage(1, kHello, sizeof(kHello), NULL,
    524                                    MOJO_WRITE_MESSAGE_FLAG_NONE));
    525 
    526   // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately
    527   // here. (We don't crash if I sleep and then close.)
    528 
    529   // Wait for the dispatcher to become readable.
    530   waiter.Init();
    531   EXPECT_EQ(MOJO_RESULT_OK,
    532             dispatcher->AddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 456));
    533   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    534   EXPECT_EQ(456u, context);
    535   dispatcher->RemoveWaiter(&waiter);
    536 
    537   // Read from the dispatcher.
    538   memset(read_buffer, 0, sizeof(read_buffer));
    539   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
    540   EXPECT_EQ(MOJO_RESULT_OK,
    541             dispatcher->ReadMessage(read_buffer, &read_buffer_size, 0, NULL,
    542                                     MOJO_READ_MESSAGE_FLAG_NONE));
    543   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
    544   EXPECT_STREQ(kHello, read_buffer);
    545 
    546   // Prepare to wait on "local_mp", port 1.
    547   waiter.Init();
    548   EXPECT_EQ(MOJO_RESULT_OK,
    549             local_mp->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789));
    550 
    551   // Write to the dispatcher.
    552   EXPECT_EQ(MOJO_RESULT_OK,
    553             dispatcher->WriteMessage(kHello, sizeof(kHello), NULL,
    554                                      MOJO_WRITE_MESSAGE_FLAG_NONE));
    555 
    556   // Wait.
    557   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    558   EXPECT_EQ(789u, context);
    559   local_mp->RemoveWaiter(1, &waiter);
    560 
    561   // Read from "local_mp", port 1.
    562   memset(read_buffer, 0, sizeof(read_buffer));
    563   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
    564   EXPECT_EQ(MOJO_RESULT_OK,
    565             local_mp->ReadMessage(1, read_buffer, &read_buffer_size, NULL, NULL,
    566                                   MOJO_READ_MESSAGE_FLAG_NONE));
    567   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
    568   EXPECT_STREQ(kHello, read_buffer);
    569 
    570   // TODO(vtl): Also test that messages queued up before the handle was sent are
    571   // delivered properly.
    572 
    573   // Close everything that belongs to us.
    574   mp0->Close(0);
    575   mp1->Close(1);
    576   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
    577   // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
    578   local_mp->Close(1);
    579 }
    580 
    581 #if defined(OS_POSIX)
    582 #define MAYBE_SharedBufferPassing SharedBufferPassing
    583 #else
    584 // Not yet implemented (on Windows).
    585 #define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing
    586 #endif
    587 TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) {
    588   static const char kHello[] = "hello";
    589   Waiter waiter;
    590   uint32_t context = 0;
    591 
    592   scoped_refptr<MessagePipe> mp0(new MessagePipe(
    593       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
    594       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
    595   scoped_refptr<MessagePipe> mp1(new MessagePipe(
    596       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
    597       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
    598   ConnectMessagePipes(mp0, mp1);
    599 
    600   // We'll try to pass this dispatcher.
    601   scoped_refptr<SharedBufferDispatcher> dispatcher;
    602   EXPECT_EQ(MOJO_RESULT_OK,
    603             SharedBufferDispatcher::Create(
    604                 SharedBufferDispatcher::kDefaultCreateOptions, 100,
    605                 &dispatcher));
    606   ASSERT_TRUE(dispatcher);
    607 
    608   // Make a mapping.
    609   scoped_ptr<RawSharedBufferMapping> mapping0;
    610   EXPECT_EQ(MOJO_RESULT_OK,
    611             dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE,
    612                                   &mapping0));
    613   ASSERT_TRUE(mapping0);
    614   ASSERT_TRUE(mapping0->base());
    615   ASSERT_EQ(100u, mapping0->length());
    616   static_cast<char*>(mapping0->base())[0] = 'A';
    617   static_cast<char*>(mapping0->base())[50] = 'B';
    618   static_cast<char*>(mapping0->base())[99] = 'C';
    619 
    620   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
    621   // it later, it might already be readable.)
    622   waiter.Init();
    623   EXPECT_EQ(MOJO_RESULT_OK,
    624             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
    625 
    626   // Write to MP 0, port 0.
    627   {
    628     DispatcherTransport
    629         transport(test::DispatcherTryStartTransport(dispatcher.get()));
    630     EXPECT_TRUE(transport.is_valid());
    631 
    632     std::vector<DispatcherTransport> transports;
    633     transports.push_back(transport);
    634     EXPECT_EQ(MOJO_RESULT_OK,
    635               mp0->WriteMessage(0, kHello, sizeof(kHello), &transports,
    636                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
    637     transport.End();
    638 
    639     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
    640     // |dispatcher| is destroyed.
    641     EXPECT_TRUE(dispatcher->HasOneRef());
    642     dispatcher = NULL;
    643   }
    644 
    645   // Wait.
    646   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    647   EXPECT_EQ(123u, context);
    648   mp1->RemoveWaiter(1, &waiter);
    649 
    650   // Read from MP 1, port 1.
    651   char read_buffer[100] = { 0 };
    652   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
    653   DispatcherVector read_dispatchers;
    654   uint32_t read_num_dispatchers = 10;  // Maximum to get.
    655   EXPECT_EQ(MOJO_RESULT_OK,
    656             mp1->ReadMessage(1, read_buffer, &read_buffer_size,
    657                              &read_dispatchers, &read_num_dispatchers,
    658                              MOJO_READ_MESSAGE_FLAG_NONE));
    659   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
    660   EXPECT_STREQ(kHello, read_buffer);
    661   EXPECT_EQ(1u, read_dispatchers.size());
    662   EXPECT_EQ(1u, read_num_dispatchers);
    663   ASSERT_TRUE(read_dispatchers[0]);
    664   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
    665 
    666   EXPECT_EQ(Dispatcher::kTypeSharedBuffer, read_dispatchers[0]->GetType());
    667   dispatcher =
    668       static_cast<SharedBufferDispatcher*>(read_dispatchers[0].get());
    669 
    670   // Make another mapping.
    671   scoped_ptr<RawSharedBufferMapping> mapping1;
    672   EXPECT_EQ(MOJO_RESULT_OK,
    673             dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE,
    674                                   &mapping1));
    675   ASSERT_TRUE(mapping1);
    676   ASSERT_TRUE(mapping1->base());
    677   ASSERT_EQ(100u, mapping1->length());
    678   EXPECT_NE(mapping1->base(), mapping0->base());
    679   EXPECT_EQ('A', static_cast<char*>(mapping1->base())[0]);
    680   EXPECT_EQ('B', static_cast<char*>(mapping1->base())[50]);
    681   EXPECT_EQ('C', static_cast<char*>(mapping1->base())[99]);
    682 
    683   // Write stuff either way.
    684   static_cast<char*>(mapping1->base())[1] = 'x';
    685   EXPECT_EQ('x', static_cast<char*>(mapping0->base())[1]);
    686   static_cast<char*>(mapping0->base())[2] = 'y';
    687   EXPECT_EQ('y', static_cast<char*>(mapping1->base())[2]);
    688 
    689   // Kill the first mapping; the second should still be valid.
    690   mapping0.reset();
    691   EXPECT_EQ('A', static_cast<char*>(mapping1->base())[0]);
    692 
    693   // Close everything that belongs to us.
    694   mp0->Close(0);
    695   mp1->Close(1);
    696   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
    697 
    698   // The second mapping should still be good.
    699   EXPECT_EQ('x', static_cast<char*>(mapping1->base())[1]);
    700 }
    701 
    702 #if defined(OS_POSIX)
    703 #define MAYBE_PlatformHandlePassing PlatformHandlePassing
    704 #else
    705 // Not yet implemented (on Windows).
    706 #define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing
    707 #endif
    708 TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) {
    709   static const char kHello[] = "hello";
    710   static const char kWorld[] = "world";
    711   Waiter waiter;
    712   uint32_t context = 0;
    713 
    714   scoped_refptr<MessagePipe> mp0(new MessagePipe(
    715       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
    716       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
    717   scoped_refptr<MessagePipe> mp1(new MessagePipe(
    718       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
    719       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
    720   ConnectMessagePipes(mp0, mp1);
    721 
    722   base::FilePath unused;
    723   base::ScopedFILE fp(CreateAndOpenTemporaryFile(&unused));
    724   EXPECT_EQ(sizeof(kHello), fwrite(kHello, 1, sizeof(kHello), fp.get()));
    725   // We'll try to pass this dispatcher, which will cause a |PlatformHandle| to
    726   // be passed.
    727   scoped_refptr<PlatformHandleDispatcher> dispatcher(
    728       new PlatformHandleDispatcher(
    729           mojo::test::PlatformHandleFromFILE(fp.Pass())));
    730 
    731   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
    732   // it later, it might already be readable.)
    733   waiter.Init();
    734   EXPECT_EQ(MOJO_RESULT_OK,
    735             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
    736 
    737   // Write to MP 0, port 0.
    738   {
    739     DispatcherTransport
    740         transport(test::DispatcherTryStartTransport(dispatcher.get()));
    741     EXPECT_TRUE(transport.is_valid());
    742 
    743     std::vector<DispatcherTransport> transports;
    744     transports.push_back(transport);
    745     EXPECT_EQ(MOJO_RESULT_OK,
    746               mp0->WriteMessage(0, kWorld, sizeof(kWorld), &transports,
    747                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
    748     transport.End();
    749 
    750     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
    751     // |dispatcher| is destroyed.
    752     EXPECT_TRUE(dispatcher->HasOneRef());
    753     dispatcher = NULL;
    754   }
    755 
    756   // Wait.
    757   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    758   EXPECT_EQ(123u, context);
    759   mp1->RemoveWaiter(1, &waiter);
    760 
    761   // Read from MP 1, port 1.
    762   char read_buffer[100] = { 0 };
    763   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
    764   DispatcherVector read_dispatchers;
    765   uint32_t read_num_dispatchers = 10;  // Maximum to get.
    766   EXPECT_EQ(MOJO_RESULT_OK,
    767             mp1->ReadMessage(1, read_buffer, &read_buffer_size,
    768                              &read_dispatchers, &read_num_dispatchers,
    769                              MOJO_READ_MESSAGE_FLAG_NONE));
    770   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
    771   EXPECT_STREQ(kWorld, read_buffer);
    772   EXPECT_EQ(1u, read_dispatchers.size());
    773   EXPECT_EQ(1u, read_num_dispatchers);
    774   ASSERT_TRUE(read_dispatchers[0]);
    775   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
    776 
    777   EXPECT_EQ(Dispatcher::kTypePlatformHandle, read_dispatchers[0]->GetType());
    778   dispatcher =
    779       static_cast<PlatformHandleDispatcher*>(read_dispatchers[0].get());
    780 
    781   embedder::ScopedPlatformHandle h = dispatcher->PassPlatformHandle().Pass();
    782   EXPECT_TRUE(h.is_valid());
    783 
    784   fp = mojo::test::FILEFromPlatformHandle(h.Pass(), "rb").Pass();
    785   EXPECT_FALSE(h.is_valid());
    786   EXPECT_TRUE(fp);
    787 
    788   rewind(fp.get());
    789   memset(read_buffer, 0, sizeof(read_buffer));
    790   EXPECT_EQ(sizeof(kHello),
    791             fread(read_buffer, 1, sizeof(read_buffer), fp.get()));
    792   EXPECT_STREQ(kHello, read_buffer);
    793 
    794   // Close everything that belongs to us.
    795   mp0->Close(0);
    796   mp1->Close(1);
    797   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
    798 }
    799 
    800 // Test racing closes (on each end).
    801 // Note: A flaky failure would almost certainly indicate a problem in the code
    802 // itself (not in the test). Also, any logged warnings/errors would also
    803 // probably be indicative of bugs.
    804 TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
    805   base::TimeDelta delay = base::TimeDelta::FromMilliseconds(5);
    806 
    807   for (unsigned i = 0; i < 256; i++) {
    808     DVLOG(2) << "---------------------------------------- " << i;
    809     scoped_refptr<MessagePipe> mp0(new MessagePipe(
    810         scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
    811         scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
    812     BootstrapMessagePipeNoWait(0, mp0);
    813 
    814     scoped_refptr<MessagePipe> mp1(new MessagePipe(
    815         scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
    816         scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
    817     BootstrapMessagePipeNoWait(1, mp1);
    818 
    819     if (i & 1u) {
    820       io_thread()->task_runner()->PostTask(
    821           FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
    822     }
    823     if (i & 2u)
    824       base::PlatformThread::Sleep(delay);
    825 
    826     mp0->Close(0);
    827 
    828     if (i & 4u) {
    829       io_thread()->task_runner()->PostTask(
    830           FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
    831     }
    832     if (i & 8u)
    833       base::PlatformThread::Sleep(delay);
    834 
    835     mp1->Close(1);
    836 
    837     RestoreInitialState();
    838   }
    839 }
    840 
    841 }  // namespace
    842 }  // namespace system
    843 }  // namespace mojo
    844