Home | History | Annotate | Download | only in system
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include <stdint.h>
      6 #include <stdio.h>
      7 #include <string.h>
      8 
      9 #include <vector>
     10 
     11 #include "base/bind.h"
     12 #include "base/files/file_path.h"
     13 #include "base/files/file_util.h"
     14 #include "base/files/scoped_file.h"
     15 #include "base/files/scoped_temp_dir.h"
     16 #include "base/location.h"
     17 #include "base/logging.h"
     18 #include "base/macros.h"
     19 #include "base/message_loop/message_loop.h"
     20 #include "base/test/test_io_thread.h"
     21 #include "base/threading/platform_thread.h"  // For |Sleep()|.
     22 #include "build/build_config.h"              // TODO(vtl): Remove this.
     23 #include "mojo/common/test/test_utils.h"
     24 #include "mojo/embedder/platform_channel_pair.h"
     25 #include "mojo/embedder/platform_shared_buffer.h"
     26 #include "mojo/embedder/scoped_platform_handle.h"
     27 #include "mojo/embedder/simple_platform_support.h"
     28 #include "mojo/system/channel.h"
     29 #include "mojo/system/channel_endpoint.h"
     30 #include "mojo/system/message_pipe.h"
     31 #include "mojo/system/message_pipe_dispatcher.h"
     32 #include "mojo/system/platform_handle_dispatcher.h"
     33 #include "mojo/system/raw_channel.h"
     34 #include "mojo/system/shared_buffer_dispatcher.h"
     35 #include "mojo/system/test_utils.h"
     36 #include "mojo/system/waiter.h"
     37 #include "testing/gtest/include/gtest/gtest.h"
     38 
     39 namespace mojo {
     40 namespace system {
     41 namespace {
     42 
     43 class RemoteMessagePipeTest : public testing::Test {
     44  public:
     45   RemoteMessagePipeTest() : io_thread_(base::TestIOThread::kAutoStart) {}
     46   virtual ~RemoteMessagePipeTest() {}
     47 
     48   virtual void SetUp() OVERRIDE {
     49     io_thread_.PostTaskAndWait(
     50         FROM_HERE,
     51         base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
     52                    base::Unretained(this)));
     53   }
     54 
     55   virtual void TearDown() OVERRIDE {
     56     io_thread_.PostTaskAndWait(
     57         FROM_HERE,
     58         base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
     59                    base::Unretained(this)));
     60   }
     61 
     62  protected:
     63   // This connects the two given |ChannelEndpoint|s.
     64   void ConnectChannelEndpoints(scoped_refptr<ChannelEndpoint> ep0,
     65                                scoped_refptr<ChannelEndpoint> ep1) {
     66     io_thread_.PostTaskAndWait(
     67         FROM_HERE,
     68         base::Bind(&RemoteMessagePipeTest::ConnectChannelEndpointsOnIOThread,
     69                    base::Unretained(this),
     70                    ep0,
     71                    ep1));
     72   }
     73 
     74   // This bootstraps |ep| on |channels_[channel_index]|. It assumes/requires
     75   // that this is the bootstrap case, i.e., that the endpoint IDs are both/will
     76   // both be |Channel::kBootstrapEndpointId|. This returns *without* waiting for
     77   // it to finish connecting.
     78   void BootstrapChannelEndpointNoWait(unsigned channel_index,
     79                                       scoped_refptr<ChannelEndpoint> ep) {
     80     io_thread_.PostTask(
     81         FROM_HERE,
     82         base::Bind(&RemoteMessagePipeTest::BootstrapChannelEndpointOnIOThread,
     83                    base::Unretained(this),
     84                    channel_index,
     85                    ep));
     86   }
     87 
     88   void RestoreInitialState() {
     89     io_thread_.PostTaskAndWait(
     90         FROM_HERE,
     91         base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread,
     92                    base::Unretained(this)));
     93   }
     94 
     95   embedder::PlatformSupport* platform_support() { return &platform_support_; }
     96   base::TestIOThread* io_thread() { return &io_thread_; }
     97 
     98  private:
     99   void SetUpOnIOThread() {
    100     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
    101 
    102     embedder::PlatformChannelPair channel_pair;
    103     platform_handles_[0] = channel_pair.PassServerHandle();
    104     platform_handles_[1] = channel_pair.PassClientHandle();
    105   }
    106 
    107   void TearDownOnIOThread() {
    108     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
    109 
    110     if (channels_[0].get()) {
    111       channels_[0]->Shutdown();
    112       channels_[0] = nullptr;
    113     }
    114     if (channels_[1].get()) {
    115       channels_[1]->Shutdown();
    116       channels_[1] = nullptr;
    117     }
    118   }
    119 
    120   void CreateAndInitChannel(unsigned channel_index) {
    121     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
    122     CHECK(channel_index == 0 || channel_index == 1);
    123     CHECK(!channels_[channel_index].get());
    124 
    125     channels_[channel_index] = new Channel(&platform_support_);
    126     CHECK(channels_[channel_index]->Init(
    127         RawChannel::Create(platform_handles_[channel_index].Pass())));
    128   }
    129 
    130   void ConnectChannelEndpointsOnIOThread(scoped_refptr<ChannelEndpoint> ep0,
    131                                          scoped_refptr<ChannelEndpoint> ep1) {
    132     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
    133 
    134     if (!channels_[0].get())
    135       CreateAndInitChannel(0);
    136     if (!channels_[1].get())
    137       CreateAndInitChannel(1);
    138 
    139     MessageInTransit::EndpointId local_id0 = channels_[0]->AttachEndpoint(ep0);
    140     MessageInTransit::EndpointId local_id1 = channels_[1]->AttachEndpoint(ep1);
    141 
    142     CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1));
    143     CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0));
    144   }
    145 
    146   void BootstrapChannelEndpointOnIOThread(unsigned channel_index,
    147                                           scoped_refptr<ChannelEndpoint> ep) {
    148     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
    149     CHECK(channel_index == 0 || channel_index == 1);
    150 
    151     CreateAndInitChannel(channel_index);
    152     MessageInTransit::EndpointId endpoint_id =
    153         channels_[channel_index]->AttachEndpoint(ep);
    154     if (endpoint_id == MessageInTransit::kInvalidEndpointId)
    155       return;
    156 
    157     CHECK_EQ(endpoint_id, Channel::kBootstrapEndpointId);
    158     CHECK(channels_[channel_index]->RunMessagePipeEndpoint(
    159         Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId));
    160   }
    161 
    162   void RestoreInitialStateOnIOThread() {
    163     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
    164 
    165     TearDownOnIOThread();
    166     SetUpOnIOThread();
    167   }
    168 
    169   embedder::SimplePlatformSupport platform_support_;
    170   base::TestIOThread io_thread_;
    171   embedder::ScopedPlatformHandle platform_handles_[2];
    172   scoped_refptr<Channel> channels_[2];
    173 
    174   DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
    175 };
    176 
    177 TEST_F(RemoteMessagePipeTest, Basic) {
    178   static const char kHello[] = "hello";
    179   static const char kWorld[] = "world!!!1!!!1!";
    180   char buffer[100] = {0};
    181   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
    182   Waiter waiter;
    183   HandleSignalsState hss;
    184   uint32_t context = 0;
    185 
    186   // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
    187   // connected to MP 1, port 0, which will be attached to channel 1. This leaves
    188   // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
    189 
    190   scoped_refptr<ChannelEndpoint> ep0;
    191   scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
    192   scoped_refptr<ChannelEndpoint> ep1;
    193   scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
    194   ConnectChannelEndpoints(ep0, ep1);
    195 
    196   // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
    197 
    198   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
    199   // it later, it might already be readable.)
    200   waiter.Init();
    201   ASSERT_EQ(
    202       MOJO_RESULT_OK,
    203       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
    204 
    205   // Write to MP 0, port 0.
    206   EXPECT_EQ(MOJO_RESULT_OK,
    207             mp0->WriteMessage(0,
    208                               UserPointer<const void>(kHello),
    209                               sizeof(kHello),
    210                               nullptr,
    211                               MOJO_WRITE_MESSAGE_FLAG_NONE));
    212 
    213   // Wait.
    214   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    215   EXPECT_EQ(123u, context);
    216   hss = HandleSignalsState();
    217   mp1->RemoveWaiter(1, &waiter, &hss);
    218   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    219             hss.satisfied_signals);
    220   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    221             hss.satisfiable_signals);
    222 
    223   // Read from MP 1, port 1.
    224   EXPECT_EQ(MOJO_RESULT_OK,
    225             mp1->ReadMessage(1,
    226                              UserPointer<void>(buffer),
    227                              MakeUserPointer(&buffer_size),
    228                              nullptr,
    229                              nullptr,
    230                              MOJO_READ_MESSAGE_FLAG_NONE));
    231   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
    232   EXPECT_STREQ(kHello, buffer);
    233 
    234   // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
    235 
    236   waiter.Init();
    237   ASSERT_EQ(
    238       MOJO_RESULT_OK,
    239       mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr));
    240 
    241   EXPECT_EQ(MOJO_RESULT_OK,
    242             mp1->WriteMessage(1,
    243                               UserPointer<const void>(kWorld),
    244                               sizeof(kWorld),
    245                               nullptr,
    246                               MOJO_WRITE_MESSAGE_FLAG_NONE));
    247 
    248   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    249   EXPECT_EQ(456u, context);
    250   hss = HandleSignalsState();
    251   mp0->RemoveWaiter(0, &waiter, &hss);
    252   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    253             hss.satisfied_signals);
    254   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    255             hss.satisfiable_signals);
    256 
    257   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    258   EXPECT_EQ(MOJO_RESULT_OK,
    259             mp0->ReadMessage(0,
    260                              UserPointer<void>(buffer),
    261                              MakeUserPointer(&buffer_size),
    262                              nullptr,
    263                              nullptr,
    264                              MOJO_READ_MESSAGE_FLAG_NONE));
    265   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
    266   EXPECT_STREQ(kWorld, buffer);
    267 
    268   // Close MP 0, port 0.
    269   mp0->Close(0);
    270 
    271   // Try to wait for MP 1, port 1 to become readable. This will eventually fail
    272   // when it realizes that MP 0, port 0 has been closed. (It may also fail
    273   // immediately.)
    274   waiter.Init();
    275   hss = HandleSignalsState();
    276   MojoResult result =
    277       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, &hss);
    278   if (result == MOJO_RESULT_OK) {
    279     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    280               waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    281     EXPECT_EQ(789u, context);
    282     hss = HandleSignalsState();
    283     mp1->RemoveWaiter(1, &waiter, &hss);
    284   }
    285   EXPECT_EQ(0u, hss.satisfied_signals);
    286   EXPECT_EQ(0u, hss.satisfiable_signals);
    287 
    288   // And MP 1, port 1.
    289   mp1->Close(1);
    290 }
    291 
    292 TEST_F(RemoteMessagePipeTest, Multiplex) {
    293   static const char kHello[] = "hello";
    294   static const char kWorld[] = "world!!!1!!!1!";
    295   char buffer[100] = {0};
    296   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
    297   Waiter waiter;
    298   HandleSignalsState hss;
    299   uint32_t context = 0;
    300 
    301   // Connect message pipes as in the |Basic| test.
    302 
    303   scoped_refptr<ChannelEndpoint> ep0;
    304   scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
    305   scoped_refptr<ChannelEndpoint> ep1;
    306   scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
    307   ConnectChannelEndpoints(ep0, ep1);
    308 
    309   // Now put another message pipe on the channel.
    310 
    311   scoped_refptr<ChannelEndpoint> ep2;
    312   scoped_refptr<MessagePipe> mp2(MessagePipe::CreateLocalProxy(&ep2));
    313   scoped_refptr<ChannelEndpoint> ep3;
    314   scoped_refptr<MessagePipe> mp3(MessagePipe::CreateProxyLocal(&ep3));
    315   ConnectChannelEndpoints(ep2, ep3);
    316 
    317   // Write: MP 2, port 0 -> MP 3, port 1.
    318 
    319   waiter.Init();
    320   ASSERT_EQ(
    321       MOJO_RESULT_OK,
    322       mp3->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));
    323 
    324   EXPECT_EQ(MOJO_RESULT_OK,
    325             mp2->WriteMessage(0,
    326                               UserPointer<const void>(kHello),
    327                               sizeof(kHello),
    328                               nullptr,
    329                               MOJO_WRITE_MESSAGE_FLAG_NONE));
    330 
    331   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    332   EXPECT_EQ(789u, context);
    333   hss = HandleSignalsState();
    334   mp3->RemoveWaiter(1, &waiter, &hss);
    335   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    336             hss.satisfied_signals);
    337   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    338             hss.satisfiable_signals);
    339 
    340   // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
    341   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    342   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
    343             mp0->ReadMessage(0,
    344                              UserPointer<void>(buffer),
    345                              MakeUserPointer(&buffer_size),
    346                              nullptr,
    347                              nullptr,
    348                              MOJO_READ_MESSAGE_FLAG_NONE));
    349   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    350   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
    351             mp1->ReadMessage(1,
    352                              UserPointer<void>(buffer),
    353                              MakeUserPointer(&buffer_size),
    354                              nullptr,
    355                              nullptr,
    356                              MOJO_READ_MESSAGE_FLAG_NONE));
    357   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    358   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
    359             mp2->ReadMessage(0,
    360                              UserPointer<void>(buffer),
    361                              MakeUserPointer(&buffer_size),
    362                              nullptr,
    363                              nullptr,
    364                              MOJO_READ_MESSAGE_FLAG_NONE));
    365 
    366   // Read from MP 3, port 1.
    367   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    368   EXPECT_EQ(MOJO_RESULT_OK,
    369             mp3->ReadMessage(1,
    370                              UserPointer<void>(buffer),
    371                              MakeUserPointer(&buffer_size),
    372                              nullptr,
    373                              nullptr,
    374                              MOJO_READ_MESSAGE_FLAG_NONE));
    375   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
    376   EXPECT_STREQ(kHello, buffer);
    377 
    378   // Write: MP 0, port 0 -> MP 1, port 1 again.
    379 
    380   waiter.Init();
    381   ASSERT_EQ(
    382       MOJO_RESULT_OK,
    383       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
    384 
    385   EXPECT_EQ(MOJO_RESULT_OK,
    386             mp0->WriteMessage(0,
    387                               UserPointer<const void>(kWorld),
    388                               sizeof(kWorld),
    389                               nullptr,
    390                               MOJO_WRITE_MESSAGE_FLAG_NONE));
    391 
    392   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    393   EXPECT_EQ(123u, context);
    394   hss = HandleSignalsState();
    395   mp1->RemoveWaiter(1, &waiter, &hss);
    396   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    397             hss.satisfied_signals);
    398   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    399             hss.satisfiable_signals);
    400 
    401   // Make sure there's nothing on the other ports.
    402   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    403   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
    404             mp0->ReadMessage(0,
    405                              UserPointer<void>(buffer),
    406                              MakeUserPointer(&buffer_size),
    407                              nullptr,
    408                              nullptr,
    409                              MOJO_READ_MESSAGE_FLAG_NONE));
    410   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    411   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
    412             mp2->ReadMessage(0,
    413                              UserPointer<void>(buffer),
    414                              MakeUserPointer(&buffer_size),
    415                              nullptr,
    416                              nullptr,
    417                              MOJO_READ_MESSAGE_FLAG_NONE));
    418   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    419   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
    420             mp3->ReadMessage(1,
    421                              UserPointer<void>(buffer),
    422                              MakeUserPointer(&buffer_size),
    423                              nullptr,
    424                              nullptr,
    425                              MOJO_READ_MESSAGE_FLAG_NONE));
    426 
    427   buffer_size = static_cast<uint32_t>(sizeof(buffer));
    428   EXPECT_EQ(MOJO_RESULT_OK,
    429             mp1->ReadMessage(1,
    430                              UserPointer<void>(buffer),
    431                              MakeUserPointer(&buffer_size),
    432                              nullptr,
    433                              nullptr,
    434                              MOJO_READ_MESSAGE_FLAG_NONE));
    435   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
    436   EXPECT_STREQ(kWorld, buffer);
    437 
    438   mp0->Close(0);
    439   mp1->Close(1);
    440   mp2->Close(0);
    441   mp3->Close(1);
    442 }
    443 
    444 TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
    445   static const char kHello[] = "hello";
    446   char buffer[100] = {0};
    447   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
    448   Waiter waiter;
    449   HandleSignalsState hss;
    450   uint32_t context = 0;
    451 
    452   // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
    453   // connected to MP 1, port 0, which will be attached to channel 1. This leaves
    454   // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
    455 
    456   scoped_refptr<ChannelEndpoint> ep0;
    457   scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
    458 
    459   // Write to MP 0, port 0.
    460   EXPECT_EQ(MOJO_RESULT_OK,
    461             mp0->WriteMessage(0,
    462                               UserPointer<const void>(kHello),
    463                               sizeof(kHello),
    464                               nullptr,
    465                               MOJO_WRITE_MESSAGE_FLAG_NONE));
    466 
    467   BootstrapChannelEndpointNoWait(0, ep0);
    468 
    469   // Close MP 0, port 0 before channel 1 is even connected.
    470   mp0->Close(0);
    471 
    472   scoped_refptr<ChannelEndpoint> ep1;
    473   scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
    474 
    475   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
    476   // it later, it might already be readable.)
    477   waiter.Init();
    478   ASSERT_EQ(
    479       MOJO_RESULT_OK,
    480       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
    481 
    482   BootstrapChannelEndpointNoWait(1, ep1);
    483 
    484   // Wait.
    485   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    486   EXPECT_EQ(123u, context);
    487   hss = HandleSignalsState();
    488   // Note: MP 1, port 1 should definitely should be readable, but it may or may
    489   // not appear as writable (there's a race, and it may not have noticed that
    490   // the other side was closed yet -- e.g., inserting a sleep here would make it
    491   // much more likely to notice that it's no longer writable).
    492   mp1->RemoveWaiter(1, &waiter, &hss);
    493   EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
    494   EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
    495 
    496   // Read from MP 1, port 1.
    497   EXPECT_EQ(MOJO_RESULT_OK,
    498             mp1->ReadMessage(1,
    499                              UserPointer<void>(buffer),
    500                              MakeUserPointer(&buffer_size),
    501                              nullptr,
    502                              nullptr,
    503                              MOJO_READ_MESSAGE_FLAG_NONE));
    504   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
    505   EXPECT_STREQ(kHello, buffer);
    506 
    507   // And MP 1, port 1.
    508   mp1->Close(1);
    509 }
    510 
    511 TEST_F(RemoteMessagePipeTest, HandlePassing) {
    512   static const char kHello[] = "hello";
    513   Waiter waiter;
    514   HandleSignalsState hss;
    515   uint32_t context = 0;
    516 
    517   scoped_refptr<ChannelEndpoint> ep0;
    518   scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
    519   scoped_refptr<ChannelEndpoint> ep1;
    520   scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
    521   ConnectChannelEndpoints(ep0, ep1);
    522 
    523   // We'll try to pass this dispatcher.
    524   scoped_refptr<MessagePipeDispatcher> dispatcher(
    525       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
    526   scoped_refptr<MessagePipe> local_mp(MessagePipe::CreateLocalLocal());
    527   dispatcher->Init(local_mp, 0);
    528 
    529   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
    530   // it later, it might already be readable.)
    531   waiter.Init();
    532   ASSERT_EQ(
    533       MOJO_RESULT_OK,
    534       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
    535 
    536   // Write to MP 0, port 0.
    537   {
    538     DispatcherTransport transport(
    539         test::DispatcherTryStartTransport(dispatcher.get()));
    540     EXPECT_TRUE(transport.is_valid());
    541 
    542     std::vector<DispatcherTransport> transports;
    543     transports.push_back(transport);
    544     EXPECT_EQ(MOJO_RESULT_OK,
    545               mp0->WriteMessage(0,
    546                                 UserPointer<const void>(kHello),
    547                                 sizeof(kHello),
    548                                 &transports,
    549                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
    550     transport.End();
    551 
    552     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
    553     // |dispatcher| is destroyed.
    554     EXPECT_TRUE(dispatcher->HasOneRef());
    555     dispatcher = nullptr;
    556   }
    557 
    558   // Wait.
    559   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    560   EXPECT_EQ(123u, context);
    561   hss = HandleSignalsState();
    562   mp1->RemoveWaiter(1, &waiter, &hss);
    563   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    564             hss.satisfied_signals);
    565   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    566             hss.satisfiable_signals);
    567 
    568   // Read from MP 1, port 1.
    569   char read_buffer[100] = {0};
    570   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
    571   DispatcherVector read_dispatchers;
    572   uint32_t read_num_dispatchers = 10;  // Maximum to get.
    573   EXPECT_EQ(MOJO_RESULT_OK,
    574             mp1->ReadMessage(1,
    575                              UserPointer<void>(read_buffer),
    576                              MakeUserPointer(&read_buffer_size),
    577                              &read_dispatchers,
    578                              &read_num_dispatchers,
    579                              MOJO_READ_MESSAGE_FLAG_NONE));
    580   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
    581   EXPECT_STREQ(kHello, read_buffer);
    582   EXPECT_EQ(1u, read_dispatchers.size());
    583   EXPECT_EQ(1u, read_num_dispatchers);
    584   ASSERT_TRUE(read_dispatchers[0].get());
    585   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
    586 
    587   EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
    588   dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
    589 
    590   // Add the waiter now, before it becomes readable to avoid a race.
    591   waiter.Init();
    592   ASSERT_EQ(MOJO_RESULT_OK,
    593             dispatcher->AddWaiter(
    594                 &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr));
    595 
    596   // Write to "local_mp", port 1.
    597   EXPECT_EQ(MOJO_RESULT_OK,
    598             local_mp->WriteMessage(1,
    599                                    UserPointer<const void>(kHello),
    600                                    sizeof(kHello),
    601                                    nullptr,
    602                                    MOJO_WRITE_MESSAGE_FLAG_NONE));
    603 
    604   // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately
    605   // here. (We don't crash if I sleep and then close.)
    606 
    607   // Wait for the dispatcher to become readable.
    608   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    609   EXPECT_EQ(456u, context);
    610   hss = HandleSignalsState();
    611   dispatcher->RemoveWaiter(&waiter, &hss);
    612   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    613             hss.satisfied_signals);
    614   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    615             hss.satisfiable_signals);
    616 
    617   // Read from the dispatcher.
    618   memset(read_buffer, 0, sizeof(read_buffer));
    619   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
    620   EXPECT_EQ(MOJO_RESULT_OK,
    621             dispatcher->ReadMessage(UserPointer<void>(read_buffer),
    622                                     MakeUserPointer(&read_buffer_size),
    623                                     0,
    624                                     nullptr,
    625                                     MOJO_READ_MESSAGE_FLAG_NONE));
    626   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
    627   EXPECT_STREQ(kHello, read_buffer);
    628 
    629   // Prepare to wait on "local_mp", port 1.
    630   waiter.Init();
    631   ASSERT_EQ(MOJO_RESULT_OK,
    632             local_mp->AddWaiter(
    633                 1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));
    634 
    635   // Write to the dispatcher.
    636   EXPECT_EQ(MOJO_RESULT_OK,
    637             dispatcher->WriteMessage(UserPointer<const void>(kHello),
    638                                      sizeof(kHello),
    639                                      nullptr,
    640                                      MOJO_WRITE_MESSAGE_FLAG_NONE));
    641 
    642   // Wait.
    643   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    644   EXPECT_EQ(789u, context);
    645   hss = HandleSignalsState();
    646   local_mp->RemoveWaiter(1, &waiter, &hss);
    647   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    648             hss.satisfied_signals);
    649   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    650             hss.satisfiable_signals);
    651 
    652   // Read from "local_mp", port 1.
    653   memset(read_buffer, 0, sizeof(read_buffer));
    654   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
    655   EXPECT_EQ(MOJO_RESULT_OK,
    656             local_mp->ReadMessage(1,
    657                                   UserPointer<void>(read_buffer),
    658                                   MakeUserPointer(&read_buffer_size),
    659                                   nullptr,
    660                                   nullptr,
    661                                   MOJO_READ_MESSAGE_FLAG_NONE));
    662   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
    663   EXPECT_STREQ(kHello, read_buffer);
    664 
    665   // TODO(vtl): Also test that messages queued up before the handle was sent are
    666   // delivered properly.
    667 
    668   // Close everything that belongs to us.
    669   mp0->Close(0);
    670   mp1->Close(1);
    671   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
    672   // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
    673   local_mp->Close(1);
    674 }
    675 
    676 #if defined(OS_POSIX)
    677 #define MAYBE_SharedBufferPassing SharedBufferPassing
    678 #else
    679 // Not yet implemented (on Windows).
    680 #define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing
    681 #endif
    682 TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) {
    683   static const char kHello[] = "hello";
    684   Waiter waiter;
    685   HandleSignalsState hss;
    686   uint32_t context = 0;
    687 
    688   scoped_refptr<ChannelEndpoint> ep0;
    689   scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
    690   scoped_refptr<ChannelEndpoint> ep1;
    691   scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
    692   ConnectChannelEndpoints(ep0, ep1);
    693 
    694   // We'll try to pass this dispatcher.
    695   scoped_refptr<SharedBufferDispatcher> dispatcher;
    696   EXPECT_EQ(MOJO_RESULT_OK,
    697             SharedBufferDispatcher::Create(
    698                 platform_support(),
    699                 SharedBufferDispatcher::kDefaultCreateOptions,
    700                 100,
    701                 &dispatcher));
    702   ASSERT_TRUE(dispatcher.get());
    703 
    704   // Make a mapping.
    705   scoped_ptr<embedder::PlatformSharedBufferMapping> mapping0;
    706   EXPECT_EQ(
    707       MOJO_RESULT_OK,
    708       dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping0));
    709   ASSERT_TRUE(mapping0);
    710   ASSERT_TRUE(mapping0->GetBase());
    711   ASSERT_EQ(100u, mapping0->GetLength());
    712   static_cast<char*>(mapping0->GetBase())[0] = 'A';
    713   static_cast<char*>(mapping0->GetBase())[50] = 'B';
    714   static_cast<char*>(mapping0->GetBase())[99] = 'C';
    715 
    716   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
    717   // it later, it might already be readable.)
    718   waiter.Init();
    719   ASSERT_EQ(
    720       MOJO_RESULT_OK,
    721       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
    722 
    723   // Write to MP 0, port 0.
    724   {
    725     DispatcherTransport transport(
    726         test::DispatcherTryStartTransport(dispatcher.get()));
    727     EXPECT_TRUE(transport.is_valid());
    728 
    729     std::vector<DispatcherTransport> transports;
    730     transports.push_back(transport);
    731     EXPECT_EQ(MOJO_RESULT_OK,
    732               mp0->WriteMessage(0,
    733                                 UserPointer<const void>(kHello),
    734                                 sizeof(kHello),
    735                                 &transports,
    736                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
    737     transport.End();
    738 
    739     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
    740     // |dispatcher| is destroyed.
    741     EXPECT_TRUE(dispatcher->HasOneRef());
    742     dispatcher = nullptr;
    743   }
    744 
    745   // Wait.
    746   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    747   EXPECT_EQ(123u, context);
    748   hss = HandleSignalsState();
    749   mp1->RemoveWaiter(1, &waiter, &hss);
    750   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    751             hss.satisfied_signals);
    752   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    753             hss.satisfiable_signals);
    754 
    755   // Read from MP 1, port 1.
    756   char read_buffer[100] = {0};
    757   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
    758   DispatcherVector read_dispatchers;
    759   uint32_t read_num_dispatchers = 10;  // Maximum to get.
    760   EXPECT_EQ(MOJO_RESULT_OK,
    761             mp1->ReadMessage(1,
    762                              UserPointer<void>(read_buffer),
    763                              MakeUserPointer(&read_buffer_size),
    764                              &read_dispatchers,
    765                              &read_num_dispatchers,
    766                              MOJO_READ_MESSAGE_FLAG_NONE));
    767   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
    768   EXPECT_STREQ(kHello, read_buffer);
    769   EXPECT_EQ(1u, read_dispatchers.size());
    770   EXPECT_EQ(1u, read_num_dispatchers);
    771   ASSERT_TRUE(read_dispatchers[0].get());
    772   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
    773 
    774   EXPECT_EQ(Dispatcher::kTypeSharedBuffer, read_dispatchers[0]->GetType());
    775   dispatcher = static_cast<SharedBufferDispatcher*>(read_dispatchers[0].get());
    776 
    777   // Make another mapping.
    778   scoped_ptr<embedder::PlatformSharedBufferMapping> mapping1;
    779   EXPECT_EQ(
    780       MOJO_RESULT_OK,
    781       dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping1));
    782   ASSERT_TRUE(mapping1);
    783   ASSERT_TRUE(mapping1->GetBase());
    784   ASSERT_EQ(100u, mapping1->GetLength());
    785   EXPECT_NE(mapping1->GetBase(), mapping0->GetBase());
    786   EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);
    787   EXPECT_EQ('B', static_cast<char*>(mapping1->GetBase())[50]);
    788   EXPECT_EQ('C', static_cast<char*>(mapping1->GetBase())[99]);
    789 
    790   // Write stuff either way.
    791   static_cast<char*>(mapping1->GetBase())[1] = 'x';
    792   EXPECT_EQ('x', static_cast<char*>(mapping0->GetBase())[1]);
    793   static_cast<char*>(mapping0->GetBase())[2] = 'y';
    794   EXPECT_EQ('y', static_cast<char*>(mapping1->GetBase())[2]);
    795 
    796   // Kill the first mapping; the second should still be valid.
    797   mapping0.reset();
    798   EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);
    799 
    800   // Close everything that belongs to us.
    801   mp0->Close(0);
    802   mp1->Close(1);
    803   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
    804 
    805   // The second mapping should still be good.
    806   EXPECT_EQ('x', static_cast<char*>(mapping1->GetBase())[1]);
    807 }
    808 
    809 #if defined(OS_POSIX)
    810 #define MAYBE_PlatformHandlePassing PlatformHandlePassing
    811 #else
    812 // Not yet implemented (on Windows).
    813 #define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing
    814 #endif
    815 TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) {
    816   base::ScopedTempDir temp_dir;
    817   ASSERT_TRUE(temp_dir.CreateUniqueTempDir());
    818 
    819   static const char kHello[] = "hello";
    820   static const char kWorld[] = "world";
    821   Waiter waiter;
    822   uint32_t context = 0;
    823   HandleSignalsState hss;
    824 
    825   scoped_refptr<ChannelEndpoint> ep0;
    826   scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
    827   scoped_refptr<ChannelEndpoint> ep1;
    828   scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
    829   ConnectChannelEndpoints(ep0, ep1);
    830 
    831   base::FilePath unused;
    832   base::ScopedFILE fp(
    833       CreateAndOpenTemporaryFileInDir(temp_dir.path(), &unused));
    834   EXPECT_EQ(sizeof(kHello), fwrite(kHello, 1, sizeof(kHello), fp.get()));
    835   // We'll try to pass this dispatcher, which will cause a |PlatformHandle| to
    836   // be passed.
    837   scoped_refptr<PlatformHandleDispatcher> dispatcher(
    838       new PlatformHandleDispatcher(
    839           mojo::test::PlatformHandleFromFILE(fp.Pass())));
    840 
    841   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
    842   // it later, it might already be readable.)
    843   waiter.Init();
    844   ASSERT_EQ(
    845       MOJO_RESULT_OK,
    846       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
    847 
    848   // Write to MP 0, port 0.
    849   {
    850     DispatcherTransport transport(
    851         test::DispatcherTryStartTransport(dispatcher.get()));
    852     EXPECT_TRUE(transport.is_valid());
    853 
    854     std::vector<DispatcherTransport> transports;
    855     transports.push_back(transport);
    856     EXPECT_EQ(MOJO_RESULT_OK,
    857               mp0->WriteMessage(0,
    858                                 UserPointer<const void>(kWorld),
    859                                 sizeof(kWorld),
    860                                 &transports,
    861                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
    862     transport.End();
    863 
    864     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
    865     // |dispatcher| is destroyed.
    866     EXPECT_TRUE(dispatcher->HasOneRef());
    867     dispatcher = nullptr;
    868   }
    869 
    870   // Wait.
    871   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    872   EXPECT_EQ(123u, context);
    873   hss = HandleSignalsState();
    874   mp1->RemoveWaiter(1, &waiter, &hss);
    875   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    876             hss.satisfied_signals);
    877   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    878             hss.satisfiable_signals);
    879 
    880   // Read from MP 1, port 1.
    881   char read_buffer[100] = {0};
    882   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
    883   DispatcherVector read_dispatchers;
    884   uint32_t read_num_dispatchers = 10;  // Maximum to get.
    885   EXPECT_EQ(MOJO_RESULT_OK,
    886             mp1->ReadMessage(1,
    887                              UserPointer<void>(read_buffer),
    888                              MakeUserPointer(&read_buffer_size),
    889                              &read_dispatchers,
    890                              &read_num_dispatchers,
    891                              MOJO_READ_MESSAGE_FLAG_NONE));
    892   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
    893   EXPECT_STREQ(kWorld, read_buffer);
    894   EXPECT_EQ(1u, read_dispatchers.size());
    895   EXPECT_EQ(1u, read_num_dispatchers);
    896   ASSERT_TRUE(read_dispatchers[0].get());
    897   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
    898 
    899   EXPECT_EQ(Dispatcher::kTypePlatformHandle, read_dispatchers[0]->GetType());
    900   dispatcher =
    901       static_cast<PlatformHandleDispatcher*>(read_dispatchers[0].get());
    902 
    903   embedder::ScopedPlatformHandle h = dispatcher->PassPlatformHandle().Pass();
    904   EXPECT_TRUE(h.is_valid());
    905 
    906   fp = mojo::test::FILEFromPlatformHandle(h.Pass(), "rb").Pass();
    907   EXPECT_FALSE(h.is_valid());
    908   EXPECT_TRUE(fp);
    909 
    910   rewind(fp.get());
    911   memset(read_buffer, 0, sizeof(read_buffer));
    912   EXPECT_EQ(sizeof(kHello),
    913             fread(read_buffer, 1, sizeof(read_buffer), fp.get()));
    914   EXPECT_STREQ(kHello, read_buffer);
    915 
    916   // Close everything that belongs to us.
    917   mp0->Close(0);
    918   mp1->Close(1);
    919   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
    920 }
    921 
    922 // Test racing closes (on each end).
    923 // Note: A flaky failure would almost certainly indicate a problem in the code
    924 // itself (not in the test). Also, any logged warnings/errors would also
    925 // probably be indicative of bugs.
    926 TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
    927   base::TimeDelta delay = base::TimeDelta::FromMilliseconds(5);
    928 
    929   for (unsigned i = 0; i < 256; i++) {
    930     DVLOG(2) << "---------------------------------------- " << i;
    931     scoped_refptr<ChannelEndpoint> ep0;
    932     scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
    933     BootstrapChannelEndpointNoWait(0, ep0);
    934 
    935     scoped_refptr<ChannelEndpoint> ep1;
    936     scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
    937     BootstrapChannelEndpointNoWait(1, ep1);
    938 
    939     if (i & 1u) {
    940       io_thread()->task_runner()->PostTask(
    941           FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
    942     }
    943     if (i & 2u)
    944       base::PlatformThread::Sleep(delay);
    945 
    946     mp0->Close(0);
    947 
    948     if (i & 4u) {
    949       io_thread()->task_runner()->PostTask(
    950           FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
    951     }
    952     if (i & 8u)
    953       base::PlatformThread::Sleep(delay);
    954 
    955     mp1->Close(1);
    956 
    957     RestoreInitialState();
    958   }
    959 }
    960 
    961 // Tests passing an end of a message pipe over a remote message pipe, and then
    962 // passing that end back.
    963 // TODO(vtl): Also test passing a message pipe across two remote message pipes.
    964 TEST_F(RemoteMessagePipeTest, PassMessagePipeHandleAcrossAndBack) {
    965   static const char kHello[] = "hello";
    966   static const char kWorld[] = "world";
    967   Waiter waiter;
    968   HandleSignalsState hss;
    969   uint32_t context = 0;
    970 
    971   scoped_refptr<ChannelEndpoint> ep0;
    972   scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
    973   scoped_refptr<ChannelEndpoint> ep1;
    974   scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
    975   ConnectChannelEndpoints(ep0, ep1);
    976 
    977   // We'll try to pass this dispatcher.
    978   scoped_refptr<MessagePipeDispatcher> dispatcher(
    979       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
    980   scoped_refptr<MessagePipe> local_mp(MessagePipe::CreateLocalLocal());
    981   dispatcher->Init(local_mp, 0);
    982 
    983   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
    984   // it later, it might already be readable.)
    985   waiter.Init();
    986   ASSERT_EQ(
    987       MOJO_RESULT_OK,
    988       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
    989 
    990   // Write to MP 0, port 0.
    991   {
    992     DispatcherTransport transport(
    993         test::DispatcherTryStartTransport(dispatcher.get()));
    994     EXPECT_TRUE(transport.is_valid());
    995 
    996     std::vector<DispatcherTransport> transports;
    997     transports.push_back(transport);
    998     EXPECT_EQ(MOJO_RESULT_OK,
    999               mp0->WriteMessage(0,
   1000                                 UserPointer<const void>(kHello),
   1001                                 sizeof(kHello),
   1002                                 &transports,
   1003                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
   1004     transport.End();
   1005 
   1006     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
   1007     // |dispatcher| is destroyed.
   1008     EXPECT_TRUE(dispatcher->HasOneRef());
   1009     dispatcher = nullptr;
   1010   }
   1011 
   1012   // Wait.
   1013   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
   1014   EXPECT_EQ(123u, context);
   1015   hss = HandleSignalsState();
   1016   mp1->RemoveWaiter(1, &waiter, &hss);
   1017   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
   1018             hss.satisfied_signals);
   1019   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
   1020             hss.satisfiable_signals);
   1021 
   1022   // Read from MP 1, port 1.
   1023   char read_buffer[100] = {0};
   1024   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
   1025   DispatcherVector read_dispatchers;
   1026   uint32_t read_num_dispatchers = 10;  // Maximum to get.
   1027   EXPECT_EQ(MOJO_RESULT_OK,
   1028             mp1->ReadMessage(1,
   1029                              UserPointer<void>(read_buffer),
   1030                              MakeUserPointer(&read_buffer_size),
   1031                              &read_dispatchers,
   1032                              &read_num_dispatchers,
   1033                              MOJO_READ_MESSAGE_FLAG_NONE));
   1034   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
   1035   EXPECT_STREQ(kHello, read_buffer);
   1036   EXPECT_EQ(1u, read_dispatchers.size());
   1037   EXPECT_EQ(1u, read_num_dispatchers);
   1038   ASSERT_TRUE(read_dispatchers[0].get());
   1039   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
   1040 
   1041   EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
   1042   dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
   1043   read_dispatchers.clear();
   1044 
   1045   // Now pass it back.
   1046 
   1047   // Prepare to wait on MP 0, port 0. (Add the waiter now. Otherwise, if we do
   1048   // it later, it might already be readable.)
   1049   waiter.Init();
   1050   ASSERT_EQ(
   1051       MOJO_RESULT_OK,
   1052       mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr));
   1053 
   1054   // Write to MP 1, port 1.
   1055   {
   1056     DispatcherTransport transport(
   1057         test::DispatcherTryStartTransport(dispatcher.get()));
   1058     EXPECT_TRUE(transport.is_valid());
   1059 
   1060     std::vector<DispatcherTransport> transports;
   1061     transports.push_back(transport);
   1062     EXPECT_EQ(MOJO_RESULT_OK,
   1063               mp1->WriteMessage(1,
   1064                                 UserPointer<const void>(kWorld),
   1065                                 sizeof(kWorld),
   1066                                 &transports,
   1067                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
   1068     transport.End();
   1069 
   1070     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
   1071     // |dispatcher| is destroyed.
   1072     EXPECT_TRUE(dispatcher->HasOneRef());
   1073     dispatcher = nullptr;
   1074   }
   1075 
   1076   // Wait.
   1077   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
   1078   EXPECT_EQ(456u, context);
   1079   hss = HandleSignalsState();
   1080   mp0->RemoveWaiter(0, &waiter, &hss);
   1081   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
   1082             hss.satisfied_signals);
   1083   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
   1084             hss.satisfiable_signals);
   1085 
   1086   // Read from MP 0, port 0.
   1087   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
   1088   read_num_dispatchers = 10;  // Maximum to get.
   1089   EXPECT_EQ(MOJO_RESULT_OK,
   1090             mp0->ReadMessage(0,
   1091                              UserPointer<void>(read_buffer),
   1092                              MakeUserPointer(&read_buffer_size),
   1093                              &read_dispatchers,
   1094                              &read_num_dispatchers,
   1095                              MOJO_READ_MESSAGE_FLAG_NONE));
   1096   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
   1097   EXPECT_STREQ(kWorld, read_buffer);
   1098   EXPECT_EQ(1u, read_dispatchers.size());
   1099   EXPECT_EQ(1u, read_num_dispatchers);
   1100   ASSERT_TRUE(read_dispatchers[0].get());
   1101   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
   1102 
   1103   EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
   1104   dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
   1105   read_dispatchers.clear();
   1106 
   1107   // Add the waiter now, before it becomes readable to avoid a race.
   1108   waiter.Init();
   1109   ASSERT_EQ(MOJO_RESULT_OK,
   1110             dispatcher->AddWaiter(
   1111                 &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));
   1112 
   1113   // Write to "local_mp", port 1.
   1114   EXPECT_EQ(MOJO_RESULT_OK,
   1115             local_mp->WriteMessage(1,
   1116                                    UserPointer<const void>(kHello),
   1117                                    sizeof(kHello),
   1118                                    nullptr,
   1119                                    MOJO_WRITE_MESSAGE_FLAG_NONE));
   1120 
   1121   // Wait for the dispatcher to become readable.
   1122   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
   1123   EXPECT_EQ(789u, context);
   1124   hss = HandleSignalsState();
   1125   dispatcher->RemoveWaiter(&waiter, &hss);
   1126   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
   1127             hss.satisfied_signals);
   1128   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
   1129             hss.satisfiable_signals);
   1130 
   1131   // Read from the dispatcher.
   1132   memset(read_buffer, 0, sizeof(read_buffer));
   1133   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
   1134   EXPECT_EQ(MOJO_RESULT_OK,
   1135             dispatcher->ReadMessage(UserPointer<void>(read_buffer),
   1136                                     MakeUserPointer(&read_buffer_size),
   1137                                     0,
   1138                                     nullptr,
   1139                                     MOJO_READ_MESSAGE_FLAG_NONE));
   1140   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
   1141   EXPECT_STREQ(kHello, read_buffer);
   1142 
   1143   // Prepare to wait on "local_mp", port 1.
   1144   waiter.Init();
   1145   ASSERT_EQ(MOJO_RESULT_OK,
   1146             local_mp->AddWaiter(
   1147                 1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));
   1148 
   1149   // Write to the dispatcher.
   1150   EXPECT_EQ(MOJO_RESULT_OK,
   1151             dispatcher->WriteMessage(UserPointer<const void>(kHello),
   1152                                      sizeof(kHello),
   1153                                      nullptr,
   1154                                      MOJO_WRITE_MESSAGE_FLAG_NONE));
   1155 
   1156   // Wait.
   1157   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
   1158   EXPECT_EQ(789u, context);
   1159   hss = HandleSignalsState();
   1160   local_mp->RemoveWaiter(1, &waiter, &hss);
   1161   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
   1162             hss.satisfied_signals);
   1163   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
   1164             hss.satisfiable_signals);
   1165 
   1166   // Read from "local_mp", port 1.
   1167   memset(read_buffer, 0, sizeof(read_buffer));
   1168   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
   1169   EXPECT_EQ(MOJO_RESULT_OK,
   1170             local_mp->ReadMessage(1,
   1171                                   UserPointer<void>(read_buffer),
   1172                                   MakeUserPointer(&read_buffer_size),
   1173                                   nullptr,
   1174                                   nullptr,
   1175                                   MOJO_READ_MESSAGE_FLAG_NONE));
   1176   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
   1177   EXPECT_STREQ(kHello, read_buffer);
   1178 
   1179   // TODO(vtl): Also test the cases where messages are written and read (at
   1180   // various points) on the message pipe being passed around.
   1181 
   1182   // Close everything that belongs to us.
   1183   mp0->Close(0);
   1184   mp1->Close(1);
   1185   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
   1186   // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
   1187   local_mp->Close(1);
   1188 }
   1189 
   1190 }  // namespace
   1191 }  // namespace system
   1192 }  // namespace mojo
   1193