Home | History | Annotate | Download | only in system
      1 // Copyright 2015 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include <stddef.h>
      6 #include <stdint.h>
      7 
      8 #include <memory>
      9 
     10 #include "base/bind.h"
     11 #include "base/location.h"
     12 #include "base/logging.h"
     13 #include "base/macros.h"
     14 #include "base/message_loop/message_loop.h"
     15 #include "mojo/edk/embedder/embedder.h"
     16 #include "mojo/edk/embedder/platform_channel_pair.h"
     17 #include "mojo/edk/system/test_utils.h"
     18 #include "mojo/edk/system/waiter.h"
     19 #include "mojo/edk/test/mojo_test_base.h"
     20 #include "mojo/public/c/system/data_pipe.h"
     21 #include "mojo/public/c/system/functions.h"
     22 #include "mojo/public/c/system/message_pipe.h"
     23 #include "testing/gtest/include/gtest/gtest.h"
     24 
     25 namespace mojo {
     26 namespace edk {
     27 namespace {
     28 
     29 const uint32_t kSizeOfOptions =
     30     static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions));
     31 
     32 // In various places, we have to poll (since, e.g., we can't yet wait for a
     33 // certain amount of data to be available). This is the maximum number of
     34 // iterations (separated by a short sleep).
     35 // TODO(vtl): Get rid of this.
     36 const size_t kMaxPoll = 100;
     37 
     38 // Used in Multiprocess test.
     39 const size_t kMultiprocessCapacity = 37;
     40 const char kMultiprocessTestData[] = "hello i'm a string that is 36 bytes";
     41 const int kMultiprocessMaxIter = 5;
     42 
     43 class DataPipeTest : public test::MojoTestBase {
     44  public:
     45   DataPipeTest() : producer_(MOJO_HANDLE_INVALID),
     46                    consumer_(MOJO_HANDLE_INVALID) {}
     47 
     48   ~DataPipeTest() override {
     49     if (producer_ != MOJO_HANDLE_INVALID)
     50       CHECK_EQ(MOJO_RESULT_OK, MojoClose(producer_));
     51     if (consumer_ != MOJO_HANDLE_INVALID)
     52       CHECK_EQ(MOJO_RESULT_OK, MojoClose(consumer_));
     53   }
     54 
     55   MojoResult Create(const MojoCreateDataPipeOptions* options) {
     56     return MojoCreateDataPipe(options, &producer_, &consumer_);
     57   }
     58 
     59   MojoResult WriteData(const void* elements,
     60                        uint32_t* num_bytes,
     61                        bool all_or_none = false) {
     62     return MojoWriteData(producer_, elements, num_bytes,
     63                          all_or_none ? MOJO_WRITE_DATA_FLAG_ALL_OR_NONE
     64                                      : MOJO_WRITE_DATA_FLAG_NONE);
     65   }
     66 
     67   MojoResult ReadData(void* elements,
     68                       uint32_t* num_bytes,
     69                       bool all_or_none = false,
     70                       bool peek = false) {
     71     MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
     72     if (all_or_none)
     73       flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
     74     if (peek)
     75       flags |= MOJO_READ_DATA_FLAG_PEEK;
     76     return MojoReadData(consumer_, elements, num_bytes, flags);
     77   }
     78 
     79   MojoResult QueryData(uint32_t* num_bytes) {
     80     return MojoReadData(consumer_, nullptr, num_bytes,
     81                         MOJO_READ_DATA_FLAG_QUERY);
     82   }
     83 
     84   MojoResult DiscardData(uint32_t* num_bytes, bool all_or_none = false) {
     85     MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_DISCARD;
     86     if (all_or_none)
     87       flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
     88     return MojoReadData(consumer_, nullptr, num_bytes, flags);
     89   }
     90 
     91   MojoResult BeginReadData(const void** elements,
     92                            uint32_t* num_bytes,
     93                            bool all_or_none = false) {
     94     MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
     95     if (all_or_none)
     96       flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
     97     return MojoBeginReadData(consumer_, elements, num_bytes, flags);
     98   }
     99 
    100   MojoResult EndReadData(uint32_t num_bytes_read) {
    101     return MojoEndReadData(consumer_, num_bytes_read);
    102   }
    103 
    104   MojoResult BeginWriteData(void** elements,
    105                             uint32_t* num_bytes,
    106                             bool all_or_none = false) {
    107     MojoReadDataFlags flags = MOJO_WRITE_DATA_FLAG_NONE;
    108     if (all_or_none)
    109       flags |= MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
    110     return MojoBeginWriteData(producer_, elements, num_bytes, flags);
    111   }
    112 
    113   MojoResult EndWriteData(uint32_t num_bytes_written) {
    114     return MojoEndWriteData(producer_, num_bytes_written);
    115   }
    116 
    117   MojoResult CloseProducer() {
    118     MojoResult rv = MojoClose(producer_);
    119     producer_ = MOJO_HANDLE_INVALID;
    120     return rv;
    121   }
    122 
    123   MojoResult CloseConsumer() {
    124     MojoResult rv = MojoClose(consumer_);
    125     consumer_ = MOJO_HANDLE_INVALID;
    126     return rv;
    127   }
    128 
    129   MojoHandle producer_, consumer_;
    130 
    131  private:
    132   DISALLOW_COPY_AND_ASSIGN(DataPipeTest);
    133 };
    134 
    135 TEST_F(DataPipeTest, Basic) {
    136   const MojoCreateDataPipeOptions options = {
    137       kSizeOfOptions,                           // |struct_size|.
    138       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
    139       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
    140       1000 * sizeof(int32_t)                    // |capacity_num_bytes|.
    141   };
    142 
    143   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
    144 
    145   // We can write to a data pipe handle immediately.
    146   int32_t elements[10] = {};
    147   uint32_t num_bytes = 0;
    148 
    149   num_bytes =
    150       static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
    151 
    152   elements[0] = 123;
    153   elements[1] = 456;
    154   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
    155   ASSERT_EQ(MOJO_RESULT_OK, WriteData(&elements[0], &num_bytes));
    156 
    157   // Now wait for the other side to become readable.
    158   MojoHandleSignalsState state;
    159   ASSERT_EQ(MOJO_RESULT_OK,
    160             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
    161                      MOJO_DEADLINE_INDEFINITE, &state));
    162   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, state.satisfied_signals);
    163 
    164   elements[0] = -1;
    165   elements[1] = -1;
    166   ASSERT_EQ(MOJO_RESULT_OK, ReadData(&elements[0], &num_bytes));
    167   ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
    168   ASSERT_EQ(elements[0], 123);
    169   ASSERT_EQ(elements[1], 456);
    170 }
    171 
    172 // Tests creation of data pipes with various (valid) options.
    173 TEST_F(DataPipeTest, CreateAndMaybeTransfer) {
    174   MojoCreateDataPipeOptions test_options[] = {
    175       // Default options.
    176       {},
    177       // Trivial element size, non-default capacity.
    178       {kSizeOfOptions,                           // |struct_size|.
    179        MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
    180        1,                                        // |element_num_bytes|.
    181        1000},                                    // |capacity_num_bytes|.
    182       // Nontrivial element size, non-default capacity.
    183       {kSizeOfOptions,                           // |struct_size|.
    184        MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
    185        4,                                        // |element_num_bytes|.
    186        4000},                                    // |capacity_num_bytes|.
    187       // Nontrivial element size, default capacity.
    188       {kSizeOfOptions,                           // |struct_size|.
    189        MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
    190        100,                                      // |element_num_bytes|.
    191        0}                                        // |capacity_num_bytes|.
    192   };
    193   for (size_t i = 0; i < arraysize(test_options); i++) {
    194     MojoHandle producer_handle, consumer_handle;
    195     MojoCreateDataPipeOptions* options =
    196         i ? &test_options[i] : nullptr;
    197     ASSERT_EQ(MOJO_RESULT_OK,
    198               MojoCreateDataPipe(options, &producer_handle, &consumer_handle));
    199     ASSERT_EQ(MOJO_RESULT_OK, MojoClose(producer_handle));
    200     ASSERT_EQ(MOJO_RESULT_OK, MojoClose(consumer_handle));
    201   }
    202 }
    203 
    204 TEST_F(DataPipeTest, SimpleReadWrite) {
    205   const MojoCreateDataPipeOptions options = {
    206       kSizeOfOptions,                           // |struct_size|.
    207       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
    208       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
    209       1000 * sizeof(int32_t)                    // |capacity_num_bytes|.
    210   };
    211 
    212   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
    213   MojoHandleSignalsState hss;
    214 
    215   int32_t elements[10] = {};
    216   uint32_t num_bytes = 0;
    217 
    218   // Try reading; nothing there yet.
    219   num_bytes =
    220       static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
    221   ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, ReadData(elements, &num_bytes));
    222 
    223   // Query; nothing there yet.
    224   num_bytes = 0;
    225   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    226   ASSERT_EQ(0u, num_bytes);
    227 
    228   // Discard; nothing there yet.
    229   num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0]));
    230   ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, DiscardData(&num_bytes));
    231 
    232   // Read with invalid |num_bytes|.
    233   num_bytes = sizeof(elements[0]) + 1;
    234   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, ReadData(elements, &num_bytes));
    235 
    236   // Write two elements.
    237   elements[0] = 123;
    238   elements[1] = 456;
    239   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
    240   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
    241   // It should have written everything (even without "all or none").
    242   ASSERT_EQ(2u * sizeof(elements[0]), num_bytes);
    243 
    244   // Wait.
    245   ASSERT_EQ(MOJO_RESULT_OK,
    246             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
    247                      MOJO_DEADLINE_INDEFINITE, &hss));
    248   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    249   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    250             hss.satisfiable_signals);
    251 
    252   // Query.
    253   // TODO(vtl): It's theoretically possible (though not with the current
    254   // implementation/configured limits) that not all the data has arrived yet.
    255   // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...|
    256   // or |2 * ...|.)
    257   num_bytes = 0;
    258   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    259   ASSERT_EQ(2 * sizeof(elements[0]), num_bytes);
    260 
    261   // Read one element.
    262   elements[0] = -1;
    263   elements[1] = -1;
    264   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    265   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes));
    266   ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
    267   ASSERT_EQ(123, elements[0]);
    268   ASSERT_EQ(-1, elements[1]);
    269 
    270   // Query.
    271   // TODO(vtl): See previous TODO. (If we got 2 elements there, however, we
    272   // should get 1 here.)
    273   num_bytes = 0;
    274   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    275   ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
    276 
    277   // Peek one element.
    278   elements[0] = -1;
    279   elements[1] = -1;
    280   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    281   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, true));
    282   ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
    283   ASSERT_EQ(456, elements[0]);
    284   ASSERT_EQ(-1, elements[1]);
    285 
    286   // Query. Still has 1 element remaining.
    287   num_bytes = 0;
    288   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    289   ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
    290 
    291   // Try to read two elements, with "all or none".
    292   elements[0] = -1;
    293   elements[1] = -1;
    294   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
    295   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
    296             ReadData(elements, &num_bytes, true, false));
    297   ASSERT_EQ(-1, elements[0]);
    298   ASSERT_EQ(-1, elements[1]);
    299 
    300   // Try to read two elements, without "all or none".
    301   elements[0] = -1;
    302   elements[1] = -1;
    303   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
    304   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, false));
    305   ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
    306   ASSERT_EQ(456, elements[0]);
    307   ASSERT_EQ(-1, elements[1]);
    308 
    309   // Query.
    310   num_bytes = 0;
    311   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    312   ASSERT_EQ(0u, num_bytes);
    313 }
    314 
    315 // Note: The "basic" waiting tests test that the "wait states" are correct in
    316 // various situations; they don't test that waiters are properly awoken on state
    317 // changes. (For that, we need to use multiple threads.)
    318 TEST_F(DataPipeTest, BasicProducerWaiting) {
    319   // Note: We take advantage of the fact that current for current
    320   // implementations capacities are strict maximums. This is not guaranteed by
    321   // the API.
    322 
    323   const MojoCreateDataPipeOptions options = {
    324       kSizeOfOptions,                           // |struct_size|.
    325       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
    326       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
    327       2 * sizeof(int32_t)                       // |capacity_num_bytes|.
    328   };
    329   Create(&options);
    330   MojoHandleSignalsState hss;
    331 
    332   // Never readable.
    333   hss = MojoHandleSignalsState();
    334   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    335             MojoWait(producer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
    336   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
    337   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    338             hss.satisfiable_signals);
    339 
    340   // Already writable.
    341   hss = MojoHandleSignalsState();
    342   ASSERT_EQ(MOJO_RESULT_OK,
    343             MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
    344 
    345   // Write two elements.
    346   int32_t elements[2] = {123, 456};
    347   uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
    348   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
    349   ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
    350 
    351   // Wait for data to become available to the consumer.
    352   ASSERT_EQ(MOJO_RESULT_OK,
    353             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
    354                      MOJO_DEADLINE_INDEFINITE, &hss));
    355   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    356   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    357             hss.satisfiable_signals);
    358 
    359   // Peek one element.
    360   elements[0] = -1;
    361   elements[1] = -1;
    362   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    363   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
    364   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
    365   ASSERT_EQ(123, elements[0]);
    366   ASSERT_EQ(-1, elements[1]);
    367 
    368   // Read one element.
    369   elements[0] = -1;
    370   elements[1] = -1;
    371   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    372   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, false));
    373   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
    374   ASSERT_EQ(123, elements[0]);
    375   ASSERT_EQ(-1, elements[1]);
    376 
    377   // Try writing, using a two-phase write.
    378   void* buffer = nullptr;
    379   num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
    380   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes));
    381   EXPECT_TRUE(buffer);
    382   ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
    383 
    384   static_cast<int32_t*>(buffer)[0] = 789;
    385   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(static_cast<uint32_t>(
    386                                          1u * sizeof(elements[0]))));
    387 
    388   // Read one element, using a two-phase read.
    389   const void* read_buffer = nullptr;
    390   num_bytes = 0u;
    391   ASSERT_EQ(MOJO_RESULT_OK,
    392             BeginReadData(&read_buffer, &num_bytes, false));
    393   EXPECT_TRUE(read_buffer);
    394   // The two-phase read should be able to read at least one element.
    395   ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
    396   ASSERT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]);
    397   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(static_cast<uint32_t>(
    398                                         1u * sizeof(elements[0]))));
    399 
    400   // Write one element.
    401   elements[0] = 123;
    402   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    403   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
    404   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
    405 
    406   // Close the consumer.
    407   CloseConsumer();
    408 
    409   // It should now be never-writable.
    410   hss = MojoHandleSignalsState();
    411   ASSERT_EQ(MOJO_RESULT_OK,
    412             MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    413                      MOJO_DEADLINE_INDEFINITE, &hss));
    414   hss = MojoHandleSignalsState();
    415   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    416             MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
    417   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
    418   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
    419 }
    420 
    421 TEST_F(DataPipeTest, PeerClosedProducerWaiting) {
    422   const MojoCreateDataPipeOptions options = {
    423       kSizeOfOptions,                           // |struct_size|.
    424       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
    425       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
    426       2 * sizeof(int32_t)                       // |capacity_num_bytes|.
    427   };
    428   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
    429   MojoHandleSignalsState hss;
    430 
    431   // Close the consumer.
    432   CloseConsumer();
    433 
    434   // It should be signaled.
    435   hss = MojoHandleSignalsState();
    436   ASSERT_EQ(MOJO_RESULT_OK,
    437             MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    438                      MOJO_DEADLINE_INDEFINITE, &hss));
    439   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
    440   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
    441 }
    442 
    443 TEST_F(DataPipeTest, PeerClosedConsumerWaiting) {
    444   const MojoCreateDataPipeOptions options = {
    445       kSizeOfOptions,                           // |struct_size|.
    446       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
    447       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
    448       2 * sizeof(int32_t)                       // |capacity_num_bytes|.
    449   };
    450   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
    451   MojoHandleSignalsState hss;
    452 
    453   // Close the producer.
    454   CloseProducer();
    455 
    456   // It should be signaled.
    457   hss = MojoHandleSignalsState();
    458   ASSERT_EQ(MOJO_RESULT_OK,
    459             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    460                      MOJO_DEADLINE_INDEFINITE, &hss));
    461   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
    462   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
    463 }
    464 
    465 TEST_F(DataPipeTest, BasicConsumerWaiting) {
    466   const MojoCreateDataPipeOptions options = {
    467       kSizeOfOptions,                           // |struct_size|.
    468       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
    469       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
    470       1000 * sizeof(int32_t)                    // |capacity_num_bytes|.
    471   };
    472   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
    473   MojoHandleSignalsState hss;
    474 
    475   // Never writable.
    476   hss = MojoHandleSignalsState();
    477   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    478             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_WRITABLE,
    479                      MOJO_DEADLINE_INDEFINITE, &hss));
    480   ASSERT_EQ(0u, hss.satisfied_signals);
    481   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    482             hss.satisfiable_signals);
    483 
    484   // Write two elements.
    485   int32_t elements[2] = {123, 456};
    486   uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
    487   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
    488 
    489   // Wait for readability.
    490   hss = MojoHandleSignalsState();
    491   ASSERT_EQ(MOJO_RESULT_OK,
    492             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
    493                      MOJO_DEADLINE_INDEFINITE, &hss));
    494   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    495   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    496             hss.satisfiable_signals);
    497 
    498   // Discard one element.
    499   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    500   ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
    501   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
    502 
    503   // Should still be readable.
    504   hss = MojoHandleSignalsState();
    505   ASSERT_EQ(MOJO_RESULT_OK,
    506             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
    507                      MOJO_DEADLINE_INDEFINITE, &hss));
    508   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    509   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    510             hss.satisfiable_signals);
    511 
    512   // Peek one element.
    513   elements[0] = -1;
    514   elements[1] = -1;
    515   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    516   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
    517   ASSERT_EQ(456, elements[0]);
    518   ASSERT_EQ(-1, elements[1]);
    519 
    520   // Should still be readable.
    521   hss = MojoHandleSignalsState();
    522   ASSERT_EQ(MOJO_RESULT_OK,
    523             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
    524                      MOJO_DEADLINE_INDEFINITE, &hss));
    525   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    526   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    527             hss.satisfiable_signals);
    528 
    529   // Read one element.
    530   elements[0] = -1;
    531   elements[1] = -1;
    532   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    533   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
    534   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
    535   ASSERT_EQ(456, elements[0]);
    536   ASSERT_EQ(-1, elements[1]);
    537 
    538   // Write one element.
    539   elements[0] = 789;
    540   elements[1] = -1;
    541   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    542   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
    543 
    544   // Waiting should now succeed.
    545   hss = MojoHandleSignalsState();
    546   ASSERT_EQ(MOJO_RESULT_OK,
    547             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
    548                      MOJO_DEADLINE_INDEFINITE, &hss));
    549   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    550   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    551             hss.satisfiable_signals);
    552 
    553   // Close the producer.
    554   CloseProducer();
    555 
    556   // Should still be readable.
    557   hss = MojoHandleSignalsState();
    558   ASSERT_EQ(MOJO_RESULT_OK,
    559             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
    560                      MOJO_DEADLINE_INDEFINITE, &hss));
    561   ASSERT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE) != 0);
    562   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    563             hss.satisfiable_signals);
    564 
    565   // Wait for the peer closed signal.
    566   hss = MojoHandleSignalsState();
    567   ASSERT_EQ(MOJO_RESULT_OK,
    568             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    569                      MOJO_DEADLINE_INDEFINITE, &hss));
    570   ASSERT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED) != 0);
    571   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    572             hss.satisfiable_signals);
    573 
    574   // Read one element.
    575   elements[0] = -1;
    576   elements[1] = -1;
    577   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    578   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
    579   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
    580   ASSERT_EQ(789, elements[0]);
    581   ASSERT_EQ(-1, elements[1]);
    582 
    583   // Should be never-readable.
    584   hss = MojoHandleSignalsState();
    585   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    586             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
    587                      MOJO_DEADLINE_INDEFINITE, &hss));
    588   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
    589   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
    590 }
    591 
    592 // Test with two-phase APIs and also closing the producer with an active
    593 // consumer waiter.
    594 TEST_F(DataPipeTest, ConsumerWaitingTwoPhase) {
    595   const MojoCreateDataPipeOptions options = {
    596       kSizeOfOptions,                           // |struct_size|.
    597       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
    598       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
    599       1000 * sizeof(int32_t)                    // |capacity_num_bytes|.
    600   };
    601   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
    602   MojoHandleSignalsState hss;
    603 
    604   // Write two elements.
    605   int32_t* elements = nullptr;
    606   void* buffer = nullptr;
    607   // Request room for three (but we'll only write two).
    608   uint32_t num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
    609   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes, true));
    610   EXPECT_TRUE(buffer);
    611   EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0])));
    612   elements = static_cast<int32_t*>(buffer);
    613   elements[0] = 123;
    614   elements[1] = 456;
    615   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(2u * sizeof(elements[0])));
    616 
    617   // Wait for readability.
    618   hss = MojoHandleSignalsState();
    619   ASSERT_EQ(MOJO_RESULT_OK,
    620             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
    621                      MOJO_DEADLINE_INDEFINITE, &hss));
    622   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    623   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    624             hss.satisfiable_signals);
    625 
    626   // Read one element.
    627   // Request two in all-or-none mode, but only read one.
    628   const void* read_buffer = nullptr;
    629   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
    630   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes, true));
    631   EXPECT_TRUE(read_buffer);
    632   ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
    633   const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
    634   ASSERT_EQ(123, read_elements[0]);
    635   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
    636 
    637   // Should still be readable.
    638   hss = MojoHandleSignalsState();
    639   ASSERT_EQ(MOJO_RESULT_OK,
    640             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
    641                      MOJO_DEADLINE_INDEFINITE, &hss));
    642   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    643   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    644             hss.satisfiable_signals);
    645 
    646   // Read one element.
    647   // Request three, but not in all-or-none mode.
    648   read_buffer = nullptr;
    649   num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
    650   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
    651   EXPECT_TRUE(read_buffer);
    652   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
    653   read_elements = static_cast<const int32_t*>(read_buffer);
    654   ASSERT_EQ(456, read_elements[0]);
    655   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
    656 
    657   // Close the producer.
    658   CloseProducer();
    659 
    660   // Should be never-readable.
    661   hss = MojoHandleSignalsState();
    662   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    663             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
    664                      MOJO_DEADLINE_INDEFINITE, &hss));
    665   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
    666   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
    667 }
    668 
    669 // Tests that data pipes aren't writable/readable during two-phase writes/reads.
    670 TEST_F(DataPipeTest, BasicTwoPhaseWaiting) {
    671   const MojoCreateDataPipeOptions options = {
    672       kSizeOfOptions,                           // |struct_size|.
    673       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
    674       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
    675       1000 * sizeof(int32_t)                    // |capacity_num_bytes|.
    676   };
    677   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
    678   MojoHandleSignalsState hss;
    679 
    680   // It should be writable.
    681   hss = MojoHandleSignalsState();
    682   ASSERT_EQ(MOJO_RESULT_OK,
    683             MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
    684   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
    685   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    686             hss.satisfiable_signals);
    687 
    688   uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
    689   void* write_ptr = nullptr;
    690   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
    691   EXPECT_TRUE(write_ptr);
    692   EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
    693 
    694   // At this point, it shouldn't be writable.
    695   hss = MojoHandleSignalsState();
    696   ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
    697             MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
    698   ASSERT_EQ(0u, hss.satisfied_signals);
    699   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    700             hss.satisfiable_signals);
    701 
    702   // It shouldn't be readable yet either (we'll wait later).
    703   hss = MojoHandleSignalsState();
    704   ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
    705             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
    706   ASSERT_EQ(0u, hss.satisfied_signals);
    707   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    708             hss.satisfiable_signals);
    709 
    710   static_cast<int32_t*>(write_ptr)[0] = 123;
    711   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(1u * sizeof(int32_t)));
    712 
    713   // It should immediately be writable again.
    714   hss = MojoHandleSignalsState();
    715   ASSERT_EQ(MOJO_RESULT_OK,
    716             MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
    717   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
    718   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    719             hss.satisfiable_signals);
    720 
    721   // It should become readable.
    722   hss = MojoHandleSignalsState();
    723   ASSERT_EQ(MOJO_RESULT_OK,
    724             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
    725                      MOJO_DEADLINE_INDEFINITE, &hss));
    726   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    727   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    728             hss.satisfiable_signals);
    729 
    730   // Start another two-phase write and check that it's readable even in the
    731   // middle of it.
    732   num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
    733   write_ptr = nullptr;
    734   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
    735   EXPECT_TRUE(write_ptr);
    736   EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
    737 
    738   // It should be readable.
    739   hss = MojoHandleSignalsState();
    740   ASSERT_EQ(MOJO_RESULT_OK,
    741             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
    742                      MOJO_DEADLINE_INDEFINITE, &hss));
    743   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    744   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    745             hss.satisfiable_signals);
    746 
    747   // End the two-phase write without writing anything.
    748   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0u));
    749 
    750   // Start a two-phase read.
    751   num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
    752   const void* read_ptr = nullptr;
    753   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
    754   EXPECT_TRUE(read_ptr);
    755   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes);
    756 
    757   // At this point, it should still be writable.
    758   hss = MojoHandleSignalsState();
    759   ASSERT_EQ(MOJO_RESULT_OK,
    760             MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
    761   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
    762   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    763             hss.satisfiable_signals);
    764 
    765   // But not readable.
    766   hss = MojoHandleSignalsState();
    767   ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
    768             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
    769   ASSERT_EQ(0u, hss.satisfied_signals);
    770   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    771             hss.satisfiable_signals);
    772 
    773   // End the two-phase read without reading anything.
    774   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0u));
    775 
    776   // It should be readable again.
    777   hss = MojoHandleSignalsState();
    778   ASSERT_EQ(MOJO_RESULT_OK,
    779             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
    780   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    781   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    782             hss.satisfiable_signals);
    783 }
    784 
    785 void Seq(int32_t start, size_t count, int32_t* out) {
    786   for (size_t i = 0; i < count; i++)
    787     out[i] = start + static_cast<int32_t>(i);
    788 }
    789 
    790 TEST_F(DataPipeTest, AllOrNone) {
    791   const MojoCreateDataPipeOptions options = {
    792       kSizeOfOptions,                           // |struct_size|.
    793       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
    794       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
    795       10 * sizeof(int32_t)                      // |capacity_num_bytes|.
    796   };
    797   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
    798   MojoHandleSignalsState hss;
    799 
    800   // Try writing way too much.
    801   uint32_t num_bytes = 20u * sizeof(int32_t);
    802   int32_t buffer[100];
    803   Seq(0, arraysize(buffer), buffer);
    804   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
    805 
    806   // Should still be empty.
    807   num_bytes = ~0u;
    808   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    809   ASSERT_EQ(0u, num_bytes);
    810 
    811   // Write some data.
    812   num_bytes = 5u * sizeof(int32_t);
    813   Seq(100, arraysize(buffer), buffer);
    814   ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
    815   ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
    816 
    817   // Wait for data.
    818   // TODO(vtl): There's no real guarantee that all the data will become
    819   // available at once (except that in current implementations, with reasonable
    820   // limits, it will). Eventually, we'll be able to wait for a specified amount
    821   // of data to become available.
    822   hss = MojoHandleSignalsState();
    823   ASSERT_EQ(MOJO_RESULT_OK,
    824             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
    825                      MOJO_DEADLINE_INDEFINITE, &hss));
    826   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    827   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    828             hss.satisfiable_signals);
    829 
    830   // Half full.
    831   num_bytes = 0u;
    832   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    833   ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
    834 
    835   /* TODO(jam): enable if we end up observing max capacity
    836   // Too much.
    837   num_bytes = 6u * sizeof(int32_t);
    838   Seq(200, arraysize(buffer), buffer);
    839   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
    840   */
    841 
    842   // Try reading too much.
    843   num_bytes = 11u * sizeof(int32_t);
    844   memset(buffer, 0xab, sizeof(buffer));
    845   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
    846   int32_t expected_buffer[100];
    847   memset(expected_buffer, 0xab, sizeof(expected_buffer));
    848   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
    849 
    850   // Try discarding too much.
    851   num_bytes = 11u * sizeof(int32_t);
    852   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
    853 
    854   // Just a little.
    855   num_bytes = 2u * sizeof(int32_t);
    856   Seq(300, arraysize(buffer), buffer);
    857   ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
    858   ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
    859 
    860   // Just right.
    861   num_bytes = 3u * sizeof(int32_t);
    862   Seq(400, arraysize(buffer), buffer);
    863   ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
    864   ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
    865 
    866   // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
    867   // specified amount of data to be available, so poll.
    868   for (size_t i = 0; i < kMaxPoll; i++) {
    869     num_bytes = 0u;
    870     ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    871     if (num_bytes >= 10u * sizeof(int32_t))
    872       break;
    873 
    874     test::Sleep(test::EpsilonDeadline());
    875   }
    876   ASSERT_EQ(10u * sizeof(int32_t), num_bytes);
    877 
    878   // Read half.
    879   num_bytes = 5u * sizeof(int32_t);
    880   memset(buffer, 0xab, sizeof(buffer));
    881   ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
    882   ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
    883   memset(expected_buffer, 0xab, sizeof(expected_buffer));
    884   Seq(100, 5, expected_buffer);
    885   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
    886 
    887   // Try reading too much again.
    888   num_bytes = 6u * sizeof(int32_t);
    889   memset(buffer, 0xab, sizeof(buffer));
    890   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
    891   memset(expected_buffer, 0xab, sizeof(expected_buffer));
    892   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
    893 
    894   // Try discarding too much again.
    895   num_bytes = 6u * sizeof(int32_t);
    896   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
    897 
    898   // Discard a little.
    899   num_bytes = 2u * sizeof(int32_t);
    900   ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
    901   ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
    902 
    903   // Three left.
    904   num_bytes = 0u;
    905   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    906   ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
    907 
    908   // Close the producer, then test producer-closed cases.
    909   CloseProducer();
    910 
    911   // Wait.
    912   hss = MojoHandleSignalsState();
    913   ASSERT_EQ(MOJO_RESULT_OK,
    914             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    915                      MOJO_DEADLINE_INDEFINITE, &hss));
    916   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    917             hss.satisfied_signals);
    918   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    919             hss.satisfiable_signals);
    920 
    921   // Try reading too much; "failed precondition" since the producer is closed.
    922   num_bytes = 4u * sizeof(int32_t);
    923   memset(buffer, 0xab, sizeof(buffer));
    924   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    925             ReadData(buffer, &num_bytes, true));
    926   memset(expected_buffer, 0xab, sizeof(expected_buffer));
    927   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
    928 
    929   // Try discarding too much; "failed precondition" again.
    930   num_bytes = 4u * sizeof(int32_t);
    931   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes, true));
    932 
    933   // Read a little.
    934   num_bytes = 2u * sizeof(int32_t);
    935   memset(buffer, 0xab, sizeof(buffer));
    936   ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
    937   ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
    938   memset(expected_buffer, 0xab, sizeof(expected_buffer));
    939   Seq(400, 2, expected_buffer);
    940   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
    941 
    942   // Discard the remaining element.
    943   num_bytes = 1u * sizeof(int32_t);
    944   ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
    945   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
    946 
    947   // Empty again.
    948   num_bytes = ~0u;
    949   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    950   ASSERT_EQ(0u, num_bytes);
    951 }
    952 
    953 // Tests that |ProducerWriteData()| and |ConsumerReadData()| writes and reads,
    954 // respectively, as much as possible, even if it may have to "wrap around" the
    955 // internal circular buffer. (Note that the two-phase write and read need not do
    956 // this.)
    957 TEST_F(DataPipeTest, WrapAround) {
    958   unsigned char test_data[1000];
    959   for (size_t i = 0; i < arraysize(test_data); i++)
    960     test_data[i] = static_cast<unsigned char>(i);
    961 
    962   const MojoCreateDataPipeOptions options = {
    963       kSizeOfOptions,                           // |struct_size|.
    964       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
    965       1u,                                       // |element_num_bytes|.
    966       100u                                      // |capacity_num_bytes|.
    967   };
    968 
    969   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
    970   MojoHandleSignalsState hss;
    971 
    972   // Write 20 bytes.
    973   uint32_t num_bytes = 20u;
    974   ASSERT_EQ(MOJO_RESULT_OK, WriteData(&test_data[0], &num_bytes, true));
    975   ASSERT_EQ(20u, num_bytes);
    976 
    977   // Wait for data.
    978   ASSERT_EQ(MOJO_RESULT_OK,
    979             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
    980                      MOJO_DEADLINE_INDEFINITE, &hss));
    981   ASSERT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE) != 0);
    982   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    983             hss.satisfiable_signals);
    984 
    985   // Read 10 bytes.
    986   unsigned char read_buffer[1000] = {0};
    987   num_bytes = 10u;
    988   ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes, true));
    989   ASSERT_EQ(10u, num_bytes);
    990   ASSERT_EQ(0, memcmp(read_buffer, &test_data[0], 10u));
    991 
    992   // Check that a two-phase write can now only write (at most) 80 bytes. (This
    993   // checks an implementation detail; this behavior is not guaranteed.)
    994   void* write_buffer_ptr = nullptr;
    995   num_bytes = 0u;
    996   ASSERT_EQ(MOJO_RESULT_OK,
    997             BeginWriteData(&write_buffer_ptr, &num_bytes, false));
    998   EXPECT_TRUE(write_buffer_ptr);
    999   ASSERT_EQ(80u, num_bytes);
   1000   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0));
   1001 
   1002   size_t total_num_bytes = 0;
   1003   while (total_num_bytes < 90) {
   1004     // Wait to write.
   1005     ASSERT_EQ(MOJO_RESULT_OK,
   1006               MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE,
   1007                        MOJO_DEADLINE_INDEFINITE, &hss));
   1008     ASSERT_EQ(hss.satisfied_signals, MOJO_HANDLE_SIGNAL_WRITABLE);
   1009     ASSERT_EQ(hss.satisfiable_signals,
   1010               MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED);
   1011 
   1012     // Write as much as we can.
   1013     num_bytes = 100;
   1014     ASSERT_EQ(MOJO_RESULT_OK,
   1015               WriteData(&test_data[20 + total_num_bytes], &num_bytes, false));
   1016     total_num_bytes += num_bytes;
   1017   }
   1018 
   1019   ASSERT_EQ(90u, total_num_bytes);
   1020 
   1021   num_bytes = 0;
   1022   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1023   ASSERT_EQ(100u, num_bytes);
   1024 
   1025   // Check that a two-phase read can now only read (at most) 90 bytes. (This
   1026   // checks an implementation detail; this behavior is not guaranteed.)
   1027   const void* read_buffer_ptr = nullptr;
   1028   num_bytes = 0;
   1029   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes, false));
   1030   EXPECT_TRUE(read_buffer_ptr);
   1031   ASSERT_EQ(90u, num_bytes);
   1032   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0));
   1033 
   1034   // Read as much as possible. We should read 100 bytes.
   1035   num_bytes = static_cast<uint32_t>(arraysize(read_buffer) *
   1036                                     sizeof(read_buffer[0]));
   1037   memset(read_buffer, 0, num_bytes);
   1038   ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes));
   1039   ASSERT_EQ(100u, num_bytes);
   1040   ASSERT_EQ(0, memcmp(read_buffer, &test_data[10], 100u));
   1041 }
   1042 
   1043 // Tests the behavior of writing (simple and two-phase), closing the producer,
   1044 // then reading (simple and two-phase).
   1045 TEST_F(DataPipeTest, WriteCloseProducerRead) {
   1046   const char kTestData[] = "hello world";
   1047   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
   1048 
   1049   const MojoCreateDataPipeOptions options = {
   1050       kSizeOfOptions,                           // |struct_size|.
   1051       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
   1052       1u,                                       // |element_num_bytes|.
   1053       1000u                                     // |capacity_num_bytes|.
   1054   };
   1055   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1056 
   1057   // Write some data, so we'll have something to read.
   1058   uint32_t num_bytes = kTestDataSize;
   1059   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
   1060   ASSERT_EQ(kTestDataSize, num_bytes);
   1061 
   1062   // Write it again, so we'll have something left over.
   1063   num_bytes = kTestDataSize;
   1064   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
   1065   ASSERT_EQ(kTestDataSize, num_bytes);
   1066 
   1067   // Start two-phase write.
   1068   void* write_buffer_ptr = nullptr;
   1069   num_bytes = 0u;
   1070   ASSERT_EQ(MOJO_RESULT_OK,
   1071             BeginWriteData(&write_buffer_ptr, &num_bytes, false));
   1072   EXPECT_TRUE(write_buffer_ptr);
   1073   EXPECT_GT(num_bytes, 0u);
   1074 
   1075   // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
   1076   for (size_t i = 0; i < kMaxPoll; i++) {
   1077     num_bytes = 0u;
   1078     ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1079     if (num_bytes >= 2u * kTestDataSize)
   1080       break;
   1081 
   1082     test::Sleep(test::EpsilonDeadline());
   1083   }
   1084   ASSERT_EQ(2u * kTestDataSize, num_bytes);
   1085 
   1086   // Start two-phase read.
   1087   const void* read_buffer_ptr = nullptr;
   1088   num_bytes = 0u;
   1089   ASSERT_EQ(MOJO_RESULT_OK,
   1090             BeginReadData(&read_buffer_ptr, &num_bytes));
   1091   EXPECT_TRUE(read_buffer_ptr);
   1092   ASSERT_EQ(2u * kTestDataSize, num_bytes);
   1093 
   1094   // Close the producer.
   1095   CloseProducer();
   1096 
   1097   // The consumer can finish its two-phase read.
   1098   ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
   1099   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(kTestDataSize));
   1100 
   1101   // And start another.
   1102   read_buffer_ptr = nullptr;
   1103   num_bytes = 0u;
   1104   ASSERT_EQ(MOJO_RESULT_OK,
   1105             BeginReadData(&read_buffer_ptr, &num_bytes));
   1106   EXPECT_TRUE(read_buffer_ptr);
   1107   ASSERT_EQ(kTestDataSize, num_bytes);
   1108 }
   1109 
   1110 
   1111 // Tests the behavior of interrupting a two-phase read and write by closing the
   1112 // consumer.
   1113 TEST_F(DataPipeTest, TwoPhaseWriteReadCloseConsumer) {
   1114   const char kTestData[] = "hello world";
   1115   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
   1116 
   1117   const MojoCreateDataPipeOptions options = {
   1118       kSizeOfOptions,                           // |struct_size|.
   1119       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
   1120       1u,                                       // |element_num_bytes|.
   1121       1000u                                     // |capacity_num_bytes|.
   1122   };
   1123   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1124   MojoHandleSignalsState hss;
   1125 
   1126   // Write some data, so we'll have something to read.
   1127   uint32_t num_bytes = kTestDataSize;
   1128   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
   1129   ASSERT_EQ(kTestDataSize, num_bytes);
   1130 
   1131   // Start two-phase write.
   1132   void* write_buffer_ptr = nullptr;
   1133   num_bytes = 0u;
   1134   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
   1135   EXPECT_TRUE(write_buffer_ptr);
   1136   ASSERT_GT(num_bytes, kTestDataSize);
   1137 
   1138   // Wait for data.
   1139   // TODO(vtl): (See corresponding TODO in AllOrNone.)
   1140   hss = MojoHandleSignalsState();
   1141   ASSERT_EQ(MOJO_RESULT_OK,
   1142             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
   1143                      MOJO_DEADLINE_INDEFINITE, &hss));
   1144   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
   1145   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1146             hss.satisfiable_signals);
   1147 
   1148   // Start two-phase read.
   1149   const void* read_buffer_ptr = nullptr;
   1150   num_bytes = 0u;
   1151   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
   1152   EXPECT_TRUE(read_buffer_ptr);
   1153   ASSERT_EQ(kTestDataSize, num_bytes);
   1154 
   1155   // Close the consumer.
   1156   CloseConsumer();
   1157 
   1158   // Wait for producer to know that the consumer is closed.
   1159   hss = MojoHandleSignalsState();
   1160   ASSERT_EQ(MOJO_RESULT_OK,
   1161             MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1162                      MOJO_DEADLINE_INDEFINITE, &hss));
   1163   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
   1164   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
   1165 
   1166   // Actually write some data. (Note: Premature freeing of the buffer would
   1167   // probably only be detected under ASAN or similar.)
   1168   memcpy(write_buffer_ptr, kTestData, kTestDataSize);
   1169   // Note: Even though the consumer has been closed, ending the two-phase
   1170   // write will report success.
   1171   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(kTestDataSize));
   1172 
   1173   // But trying to write should result in failure.
   1174   num_bytes = kTestDataSize;
   1175   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, WriteData(kTestData, &num_bytes));
   1176 
   1177   // As will trying to start another two-phase write.
   1178   write_buffer_ptr = nullptr;
   1179   num_bytes = 0u;
   1180   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
   1181             BeginWriteData(&write_buffer_ptr, &num_bytes));
   1182 }
   1183 
   1184 // Tests the behavior of "interrupting" a two-phase write by closing both the
   1185 // producer and the consumer.
   1186 TEST_F(DataPipeTest, TwoPhaseWriteCloseBoth) {
   1187   const uint32_t kTestDataSize = 15u;
   1188 
   1189   const MojoCreateDataPipeOptions options = {
   1190       kSizeOfOptions,                           // |struct_size|.
   1191       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
   1192       1u,                                       // |element_num_bytes|.
   1193       1000u                                     // |capacity_num_bytes|.
   1194   };
   1195   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1196 
   1197   // Start two-phase write.
   1198   void* write_buffer_ptr = nullptr;
   1199   uint32_t num_bytes = 0u;
   1200   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
   1201   EXPECT_TRUE(write_buffer_ptr);
   1202   ASSERT_GT(num_bytes, kTestDataSize);
   1203 }
   1204 
   1205 // Tests the behavior of writing, closing the producer, and then reading (with
   1206 // and without data remaining).
   1207 TEST_F(DataPipeTest, WriteCloseProducerReadNoData) {
   1208   const char kTestData[] = "hello world";
   1209   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
   1210 
   1211   const MojoCreateDataPipeOptions options = {
   1212       kSizeOfOptions,                           // |struct_size|.
   1213       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
   1214       1u,                                       // |element_num_bytes|.
   1215       1000u                                     // |capacity_num_bytes|.
   1216   };
   1217   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1218   MojoHandleSignalsState hss;
   1219 
   1220   // Write some data, so we'll have something to read.
   1221   uint32_t num_bytes = kTestDataSize;
   1222   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
   1223   ASSERT_EQ(kTestDataSize, num_bytes);
   1224 
   1225   // Close the producer.
   1226   CloseProducer();
   1227 
   1228   // Wait. (Note that once the consumer knows that the producer is closed, it
   1229   // must also know about all the data that was sent.)
   1230   hss = MojoHandleSignalsState();
   1231   ASSERT_EQ(MOJO_RESULT_OK,
   1232             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1233                      MOJO_DEADLINE_INDEFINITE, &hss));
   1234   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1235             hss.satisfied_signals);
   1236   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1237             hss.satisfiable_signals);
   1238 
   1239   // Peek that data.
   1240   char buffer[1000];
   1241   num_bytes = static_cast<uint32_t>(sizeof(buffer));
   1242   ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, false, true));
   1243   ASSERT_EQ(kTestDataSize, num_bytes);
   1244   ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
   1245 
   1246   // Read that data.
   1247   memset(buffer, 0, 1000);
   1248   num_bytes = static_cast<uint32_t>(sizeof(buffer));
   1249   ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes));
   1250   ASSERT_EQ(kTestDataSize, num_bytes);
   1251   ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
   1252 
   1253   // A second read should fail.
   1254   num_bytes = static_cast<uint32_t>(sizeof(buffer));
   1255   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ReadData(buffer, &num_bytes));
   1256 
   1257   // A two-phase read should also fail.
   1258   const void* read_buffer_ptr = nullptr;
   1259   num_bytes = 0u;
   1260   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
   1261             BeginReadData(&read_buffer_ptr, &num_bytes));
   1262 
   1263   // Ditto for discard.
   1264   num_bytes = 10u;
   1265   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes));
   1266 }
   1267 
   1268 // Test that during a two phase read the memory stays valid even if more data
   1269 // comes in.
   1270 TEST_F(DataPipeTest, TwoPhaseReadMemoryStable) {
   1271   const char kTestData[] = "hello world";
   1272   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
   1273 
   1274   const MojoCreateDataPipeOptions options = {
   1275       kSizeOfOptions,                           // |struct_size|.
   1276       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
   1277       1u,                                       // |element_num_bytes|.
   1278       1000u                                     // |capacity_num_bytes|.
   1279   };
   1280   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1281   MojoHandleSignalsState hss;
   1282 
   1283   // Write some data.
   1284   uint32_t num_bytes = kTestDataSize;
   1285   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
   1286   ASSERT_EQ(kTestDataSize, num_bytes);
   1287 
   1288   // Wait for the data.
   1289   hss = MojoHandleSignalsState();
   1290   ASSERT_EQ(MOJO_RESULT_OK,
   1291             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
   1292                      MOJO_DEADLINE_INDEFINITE, &hss));
   1293   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
   1294   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1295             hss.satisfiable_signals);
   1296 
   1297   // Begin a two-phase read.
   1298   const void* read_buffer_ptr = nullptr;
   1299   uint32_t read_buffer_size = 0u;
   1300   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &read_buffer_size));
   1301 
   1302   // Write more data.
   1303   const char kExtraData[] = "bye world";
   1304   const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData));
   1305   num_bytes = kExtraDataSize;
   1306   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes));
   1307   ASSERT_EQ(kExtraDataSize, num_bytes);
   1308 
   1309   // Close the producer.
   1310   CloseProducer();
   1311 
   1312   // Wait. (Note that once the consumer knows that the producer is closed, it
   1313   // must also have received the extra data).
   1314   hss = MojoHandleSignalsState();
   1315   ASSERT_EQ(MOJO_RESULT_OK,
   1316             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1317                      MOJO_DEADLINE_INDEFINITE, &hss));
   1318   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
   1319   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1320             hss.satisfiable_signals);
   1321 
   1322   // Read the two phase memory to check it's still valid.
   1323   ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
   1324   EndReadData(read_buffer_size);
   1325 }
   1326 
   1327 // Test that two-phase reads/writes behave correctly when given invalid
   1328 // arguments.
   1329 TEST_F(DataPipeTest, TwoPhaseMoreInvalidArguments) {
   1330   const MojoCreateDataPipeOptions options = {
   1331       kSizeOfOptions,                           // |struct_size|.
   1332       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
   1333       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
   1334       10 * sizeof(int32_t)                      // |capacity_num_bytes|.
   1335   };
   1336   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1337   MojoHandleSignalsState hss;
   1338 
   1339   // No data.
   1340   uint32_t num_bytes = 1000u;
   1341   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1342   ASSERT_EQ(0u, num_bytes);
   1343 
   1344   // Try "ending" a two-phase write when one isn't active.
   1345   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
   1346             EndWriteData(1u * sizeof(int32_t)));
   1347 
   1348   // Wait a bit, to make sure that if a signal were (incorrectly) sent, it'd
   1349   // have time to propagate.
   1350   test::Sleep(test::EpsilonDeadline());
   1351 
   1352   // Still no data.
   1353   num_bytes = 1000u;
   1354   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1355   ASSERT_EQ(0u, num_bytes);
   1356 
   1357   // Try ending a two-phase write with an invalid amount (too much).
   1358   num_bytes = 0u;
   1359   void* write_ptr = nullptr;
   1360   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
   1361   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
   1362             EndWriteData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
   1363 
   1364   // But the two-phase write still ended.
   1365   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
   1366 
   1367   // Wait a bit (as above).
   1368   test::Sleep(test::EpsilonDeadline());
   1369 
   1370   // Still no data.
   1371   num_bytes = 1000u;
   1372   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1373   ASSERT_EQ(0u, num_bytes);
   1374 
   1375   // Try ending a two-phase write with an invalid amount (not a multiple of the
   1376   // element size).
   1377   num_bytes = 0u;
   1378   write_ptr = nullptr;
   1379   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
   1380   EXPECT_GE(num_bytes, 1u);
   1381   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndWriteData(1u));
   1382 
   1383   // But the two-phase write still ended.
   1384   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
   1385 
   1386   // Wait a bit (as above).
   1387   test::Sleep(test::EpsilonDeadline());
   1388 
   1389   // Still no data.
   1390   num_bytes = 1000u;
   1391   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1392   ASSERT_EQ(0u, num_bytes);
   1393 
   1394   // Now write some data, so we'll be able to try reading.
   1395   int32_t element = 123;
   1396   num_bytes = 1u * sizeof(int32_t);
   1397   ASSERT_EQ(MOJO_RESULT_OK, WriteData(&element, &num_bytes));
   1398 
   1399   // Wait for data.
   1400   // TODO(vtl): (See corresponding TODO in AllOrNone.)
   1401   hss = MojoHandleSignalsState();
   1402   ASSERT_EQ(MOJO_RESULT_OK,
   1403             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
   1404                      MOJO_DEADLINE_INDEFINITE, &hss));
   1405   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
   1406   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1407             hss.satisfiable_signals);
   1408 
   1409   // One element available.
   1410   num_bytes = 0u;
   1411   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1412   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
   1413 
   1414   // Try "ending" a two-phase read when one isn't active.
   1415   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndReadData(1u * sizeof(int32_t)));
   1416 
   1417   // Still one element available.
   1418   num_bytes = 0u;
   1419   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1420   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
   1421 
   1422   // Try ending a two-phase read with an invalid amount (too much).
   1423   num_bytes = 0u;
   1424   const void* read_ptr = nullptr;
   1425   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
   1426   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
   1427             EndReadData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
   1428 
   1429   // Still one element available.
   1430   num_bytes = 0u;
   1431   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1432   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
   1433 
   1434   // Try ending a two-phase read with an invalid amount (not a multiple of the
   1435   // element size).
   1436   num_bytes = 0u;
   1437   read_ptr = nullptr;
   1438   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
   1439   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
   1440   ASSERT_EQ(123, static_cast<const int32_t*>(read_ptr)[0]);
   1441   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndReadData(1u));
   1442 
   1443   // Still one element available.
   1444   num_bytes = 0u;
   1445   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1446   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
   1447 }
   1448 
   1449 // Test that a producer can be sent over a MP.
   1450 TEST_F(DataPipeTest, SendProducer) {
   1451   const char kTestData[] = "hello world";
   1452   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
   1453 
   1454   const MojoCreateDataPipeOptions options = {
   1455       kSizeOfOptions,                           // |struct_size|.
   1456       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
   1457       1u,                                       // |element_num_bytes|.
   1458       1000u                                     // |capacity_num_bytes|.
   1459   };
   1460   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1461   MojoHandleSignalsState hss;
   1462 
   1463   // Write some data.
   1464   uint32_t num_bytes = kTestDataSize;
   1465   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
   1466   ASSERT_EQ(kTestDataSize, num_bytes);
   1467 
   1468   // Wait for the data.
   1469   hss = MojoHandleSignalsState();
   1470   ASSERT_EQ(MOJO_RESULT_OK,
   1471             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
   1472                      MOJO_DEADLINE_INDEFINITE, &hss));
   1473   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
   1474   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1475             hss.satisfiable_signals);
   1476 
   1477   // Check the data.
   1478   const void* read_buffer = nullptr;
   1479   num_bytes = 0u;
   1480   ASSERT_EQ(MOJO_RESULT_OK,
   1481             BeginReadData(&read_buffer, &num_bytes, false));
   1482   ASSERT_EQ(0, memcmp(read_buffer, kTestData, kTestDataSize));
   1483   EndReadData(num_bytes);
   1484 
   1485   // Now send the producer over a MP so that it's serialized.
   1486   MojoHandle pipe0, pipe1;
   1487   ASSERT_EQ(MOJO_RESULT_OK,
   1488             MojoCreateMessagePipe(nullptr, &pipe0, &pipe1));
   1489 
   1490   ASSERT_EQ(MOJO_RESULT_OK,
   1491             MojoWriteMessage(pipe0, nullptr, 0, &producer_, 1,
   1492                              MOJO_WRITE_MESSAGE_FLAG_NONE));
   1493   producer_ = MOJO_HANDLE_INVALID;
   1494   ASSERT_EQ(MOJO_RESULT_OK, MojoWait(pipe1, MOJO_HANDLE_SIGNAL_READABLE,
   1495                                      MOJO_DEADLINE_INDEFINITE, &hss));
   1496   uint32_t num_handles = 1;
   1497   ASSERT_EQ(MOJO_RESULT_OK,
   1498             MojoReadMessage(pipe1, nullptr, 0, &producer_, &num_handles,
   1499                             MOJO_READ_MESSAGE_FLAG_NONE));
   1500   ASSERT_EQ(num_handles, 1u);
   1501 
   1502   // Write more data.
   1503   const char kExtraData[] = "bye world";
   1504   const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData));
   1505   num_bytes = kExtraDataSize;
   1506   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes));
   1507   ASSERT_EQ(kExtraDataSize, num_bytes);
   1508 
   1509   // Wait for it.
   1510   hss = MojoHandleSignalsState();
   1511   ASSERT_EQ(MOJO_RESULT_OK,
   1512             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
   1513                      MOJO_DEADLINE_INDEFINITE, &hss));
   1514   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
   1515   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1516             hss.satisfiable_signals);
   1517 
   1518   // Check the second write.
   1519   num_bytes = 0u;
   1520   ASSERT_EQ(MOJO_RESULT_OK,
   1521             BeginReadData(&read_buffer, &num_bytes, false));
   1522   ASSERT_EQ(0, memcmp(read_buffer, kExtraData, kExtraDataSize));
   1523   EndReadData(num_bytes);
   1524 
   1525   ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0));
   1526   ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1));
   1527 }
   1528 
   1529 // Ensures that if a data pipe consumer whose producer has closed is passed over
   1530 // a message pipe, the deserialized dispatcher is also marked as having a closed
   1531 // peer.
   1532 TEST_F(DataPipeTest, ConsumerWithClosedProducerSent) {
   1533   const MojoCreateDataPipeOptions options = {
   1534       kSizeOfOptions,                           // |struct_size|.
   1535       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
   1536       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
   1537       1000 * sizeof(int32_t)                    // |capacity_num_bytes|.
   1538   };
   1539 
   1540   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1541 
   1542   // We can write to a data pipe handle immediately.
   1543   int32_t data = 123;
   1544   uint32_t num_bytes = sizeof(data);
   1545   ASSERT_EQ(MOJO_RESULT_OK, WriteData(&data, &num_bytes));
   1546   ASSERT_EQ(MOJO_RESULT_OK, CloseProducer());
   1547 
   1548   // Now wait for the other side to become readable and to see the peer closed.
   1549   MojoHandleSignalsState state;
   1550   ASSERT_EQ(MOJO_RESULT_OK,
   1551             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1552                      MOJO_DEADLINE_INDEFINITE, &state));
   1553   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1554             state.satisfied_signals);
   1555   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1556             state.satisfiable_signals);
   1557 
   1558   // Now send the consumer over a MP so that it's serialized.
   1559   MojoHandle pipe0, pipe1;
   1560   ASSERT_EQ(MOJO_RESULT_OK,
   1561             MojoCreateMessagePipe(nullptr, &pipe0, &pipe1));
   1562 
   1563   ASSERT_EQ(MOJO_RESULT_OK,
   1564             MojoWriteMessage(pipe0, nullptr, 0, &consumer_, 1,
   1565                              MOJO_WRITE_MESSAGE_FLAG_NONE));
   1566   consumer_ = MOJO_HANDLE_INVALID;
   1567   ASSERT_EQ(MOJO_RESULT_OK, MojoWait(pipe1, MOJO_HANDLE_SIGNAL_READABLE,
   1568                                      MOJO_DEADLINE_INDEFINITE, &state));
   1569   uint32_t num_handles = 1;
   1570   ASSERT_EQ(MOJO_RESULT_OK,
   1571             MojoReadMessage(pipe1, nullptr, 0, &consumer_, &num_handles,
   1572                             MOJO_READ_MESSAGE_FLAG_NONE));
   1573   ASSERT_EQ(num_handles, 1u);
   1574 
   1575   ASSERT_EQ(MOJO_RESULT_OK,
   1576             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1577                      MOJO_DEADLINE_INDEFINITE, &state));
   1578   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1579             state.satisfied_signals);
   1580   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1581             state.satisfiable_signals);
   1582 
   1583   int32_t read_data;
   1584   ASSERT_EQ(MOJO_RESULT_OK, ReadData(&read_data, &num_bytes));
   1585   ASSERT_EQ(sizeof(read_data), num_bytes);
   1586   ASSERT_EQ(data, read_data);
   1587 
   1588   ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0));
   1589   ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1));
   1590 }
   1591 
   1592 bool WriteAllData(MojoHandle producer,
   1593                   const void* elements,
   1594                   uint32_t num_bytes) {
   1595   for (size_t i = 0; i < kMaxPoll; i++) {
   1596     // Write as much data as we can.
   1597     uint32_t write_bytes = num_bytes;
   1598     MojoResult result = MojoWriteData(producer, elements, &write_bytes,
   1599                                       MOJO_WRITE_DATA_FLAG_NONE);
   1600     if (result == MOJO_RESULT_OK) {
   1601       num_bytes -= write_bytes;
   1602       elements = static_cast<const uint8_t*>(elements) + write_bytes;
   1603       if (num_bytes == 0)
   1604         return true;
   1605     } else {
   1606       EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result);
   1607     }
   1608 
   1609     MojoHandleSignalsState hss = MojoHandleSignalsState();
   1610     EXPECT_EQ(MOJO_RESULT_OK, MojoWait(producer, MOJO_HANDLE_SIGNAL_WRITABLE,
   1611                                        MOJO_DEADLINE_INDEFINITE, &hss));
   1612     EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
   1613     EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1614               hss.satisfiable_signals);
   1615   }
   1616 
   1617   return false;
   1618 }
   1619 
   1620 // If |expect_empty| is true, expect |consumer| to be empty after reading.
   1621 bool ReadAllData(MojoHandle consumer,
   1622                  void* elements,
   1623                  uint32_t num_bytes,
   1624                  bool expect_empty) {
   1625   for (size_t i = 0; i < kMaxPoll; i++) {
   1626     // Read as much data as we can.
   1627     uint32_t read_bytes = num_bytes;
   1628     MojoResult result =
   1629         MojoReadData(consumer, elements, &read_bytes, MOJO_READ_DATA_FLAG_NONE);
   1630     if (result == MOJO_RESULT_OK) {
   1631       num_bytes -= read_bytes;
   1632       elements = static_cast<uint8_t*>(elements) + read_bytes;
   1633       if (num_bytes == 0) {
   1634         if (expect_empty) {
   1635           // Expect no more data.
   1636           test::Sleep(test::TinyDeadline());
   1637           MojoReadData(consumer, nullptr, &num_bytes,
   1638                        MOJO_READ_DATA_FLAG_QUERY);
   1639           EXPECT_EQ(0u, num_bytes);
   1640         }
   1641         return true;
   1642       }
   1643     } else {
   1644       EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result);
   1645     }
   1646 
   1647     MojoHandleSignalsState hss = MojoHandleSignalsState();
   1648     EXPECT_EQ(MOJO_RESULT_OK, MojoWait(consumer, MOJO_HANDLE_SIGNAL_READABLE,
   1649                                        MOJO_DEADLINE_INDEFINITE, &hss));
   1650     // Peer could have become closed while we're still waiting for data.
   1651     EXPECT_TRUE(MOJO_HANDLE_SIGNAL_READABLE & hss.satisfied_signals);
   1652     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1653               hss.satisfiable_signals);
   1654   }
   1655 
   1656   return num_bytes == 0;
   1657 }
   1658 
   1659 #if !defined(OS_IOS)
   1660 
   1661 TEST_F(DataPipeTest, Multiprocess) {
   1662   const uint32_t kTestDataSize =
   1663       static_cast<uint32_t>(sizeof(kMultiprocessTestData));
   1664   const MojoCreateDataPipeOptions options = {
   1665       kSizeOfOptions,                           // |struct_size|.
   1666       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
   1667       1,                                        // |element_num_bytes|.
   1668       kMultiprocessCapacity                     // |capacity_num_bytes|.
   1669   };
   1670   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1671 
   1672   RUN_CHILD_ON_PIPE(MultiprocessClient, server_mp)
   1673     // Send some data before serialising and sending the data pipe over.
   1674     // This is the first write so we don't need to use WriteAllData.
   1675     uint32_t num_bytes = kTestDataSize;
   1676     ASSERT_EQ(MOJO_RESULT_OK, WriteData(kMultiprocessTestData, &num_bytes,
   1677                                         MOJO_WRITE_DATA_FLAG_ALL_OR_NONE));
   1678     ASSERT_EQ(kTestDataSize, num_bytes);
   1679 
   1680     // Send child process the data pipe.
   1681     ASSERT_EQ(MOJO_RESULT_OK,
   1682               MojoWriteMessage(server_mp, nullptr, 0, &consumer_, 1,
   1683                                MOJO_WRITE_MESSAGE_FLAG_NONE));
   1684 
   1685     // Send a bunch of data of varying sizes.
   1686     uint8_t buffer[100];
   1687     int seq = 0;
   1688     for (int i = 0; i < kMultiprocessMaxIter; ++i) {
   1689       for (uint32_t size = 1; size <= kMultiprocessCapacity; size++) {
   1690         for (unsigned int j = 0; j < size; ++j)
   1691           buffer[j] = seq + j;
   1692         EXPECT_TRUE(WriteAllData(producer_, buffer, size));
   1693         seq += size;
   1694       }
   1695     }
   1696 
   1697     // Write the test string in again.
   1698     ASSERT_TRUE(WriteAllData(producer_, kMultiprocessTestData, kTestDataSize));
   1699 
   1700     // Swap ends.
   1701     ASSERT_EQ(MOJO_RESULT_OK,
   1702               MojoWriteMessage(server_mp, nullptr, 0, &producer_, 1,
   1703                                MOJO_WRITE_MESSAGE_FLAG_NONE));
   1704 
   1705     // Receive the consumer from the other side.
   1706     producer_ = MOJO_HANDLE_INVALID;
   1707     MojoHandleSignalsState hss = MojoHandleSignalsState();
   1708     ASSERT_EQ(MOJO_RESULT_OK, MojoWait(server_mp, MOJO_HANDLE_SIGNAL_READABLE,
   1709                                        MOJO_DEADLINE_INDEFINITE, &hss));
   1710     MojoHandle handles[2];
   1711     uint32_t num_handles = arraysize(handles);
   1712     ASSERT_EQ(MOJO_RESULT_OK,
   1713               MojoReadMessage(server_mp, nullptr, 0, handles, &num_handles,
   1714                               MOJO_READ_MESSAGE_FLAG_NONE));
   1715     ASSERT_EQ(1u, num_handles);
   1716     consumer_ = handles[0];
   1717 
   1718     // Read the test string twice. Once for when we sent it, and once for the
   1719     // other end sending it.
   1720     for (int i = 0; i < 2; ++i) {
   1721       EXPECT_TRUE(ReadAllData(consumer_, buffer, kTestDataSize, i == 1));
   1722       EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize));
   1723     }
   1724 
   1725     WriteMessage(server_mp, "quit");
   1726 
   1727     // Don't have to close the consumer here because it will be done for us.
   1728   END_CHILD()
   1729 }
   1730 
   1731 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessClient, DataPipeTest, client_mp) {
   1732   const uint32_t kTestDataSize =
   1733       static_cast<uint32_t>(sizeof(kMultiprocessTestData));
   1734 
   1735   // Receive the data pipe from the other side.
   1736   MojoHandle consumer = MOJO_HANDLE_INVALID;
   1737   MojoHandleSignalsState hss = MojoHandleSignalsState();
   1738   ASSERT_EQ(MOJO_RESULT_OK, MojoWait(client_mp, MOJO_HANDLE_SIGNAL_READABLE,
   1739                                      MOJO_DEADLINE_INDEFINITE, &hss));
   1740   MojoHandle handles[2];
   1741   uint32_t num_handles = arraysize(handles);
   1742   ASSERT_EQ(MOJO_RESULT_OK,
   1743             MojoReadMessage(client_mp, nullptr, 0, handles, &num_handles,
   1744                             MOJO_READ_MESSAGE_FLAG_NONE));
   1745   ASSERT_EQ(1u, num_handles);
   1746   consumer = handles[0];
   1747 
   1748   // Read the initial string that was sent.
   1749   int32_t buffer[100];
   1750   EXPECT_TRUE(ReadAllData(consumer, buffer, kTestDataSize, false));
   1751   EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize));
   1752 
   1753   // Receive the main data and check it is correct.
   1754   int seq = 0;
   1755   uint8_t expected_buffer[100];
   1756   for (int i = 0; i < kMultiprocessMaxIter; ++i) {
   1757     for (uint32_t size = 1; size <= kMultiprocessCapacity; ++size) {
   1758       for (unsigned int j = 0; j < size; ++j)
   1759         expected_buffer[j] = seq + j;
   1760       EXPECT_TRUE(ReadAllData(consumer, buffer, size, false));
   1761       EXPECT_EQ(0, memcmp(buffer, expected_buffer, size));
   1762 
   1763       seq += size;
   1764     }
   1765   }
   1766 
   1767   // Swap ends.
   1768   ASSERT_EQ(MOJO_RESULT_OK, MojoWriteMessage(client_mp, nullptr, 0, &consumer,
   1769                                              1, MOJO_WRITE_MESSAGE_FLAG_NONE));
   1770 
   1771   // Receive the producer from the other side.
   1772   MojoHandle producer = MOJO_HANDLE_INVALID;
   1773   hss = MojoHandleSignalsState();
   1774   ASSERT_EQ(MOJO_RESULT_OK, MojoWait(client_mp, MOJO_HANDLE_SIGNAL_READABLE,
   1775                                      MOJO_DEADLINE_INDEFINITE, &hss));
   1776   num_handles = arraysize(handles);
   1777   ASSERT_EQ(MOJO_RESULT_OK,
   1778             MojoReadMessage(client_mp, nullptr, 0, handles, &num_handles,
   1779                             MOJO_READ_MESSAGE_FLAG_NONE));
   1780   ASSERT_EQ(1u, num_handles);
   1781   producer = handles[0];
   1782 
   1783   // Write the test string one more time.
   1784   EXPECT_TRUE(WriteAllData(producer, kMultiprocessTestData, kTestDataSize));
   1785 
   1786   // We swapped ends, so close the producer.
   1787   EXPECT_EQ(MOJO_RESULT_OK, MojoClose(producer));
   1788 
   1789   // Wait to receive a "quit" message before exiting.
   1790   EXPECT_EQ("quit", ReadMessage(client_mp));
   1791 }
   1792 
   1793 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(WriteAndCloseProducer, DataPipeTest, h) {
   1794   MojoHandle p;
   1795   std::string message = ReadMessageWithHandles(h, &p, 1);
   1796 
   1797   // Write some data to the producer and close it.
   1798   uint32_t num_bytes = static_cast<uint32_t>(message.size());
   1799   EXPECT_EQ(MOJO_RESULT_OK, MojoWriteData(p, message.data(), &num_bytes,
   1800                                           MOJO_WRITE_DATA_FLAG_NONE));
   1801   EXPECT_EQ(num_bytes, static_cast<uint32_t>(message.size()));
   1802 
   1803   // Close the producer before quitting.
   1804   EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p));
   1805 
   1806   // Wait for a quit message.
   1807   EXPECT_EQ("quit", ReadMessage(h));
   1808 }
   1809 
   1810 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReadAndCloseConsumer, DataPipeTest, h) {
   1811   MojoHandle c;
   1812   std::string expected_message = ReadMessageWithHandles(h, &c, 1);
   1813 
   1814   // Wait for the consumer to become readable.
   1815   EXPECT_EQ(MOJO_RESULT_OK, MojoWait(c, MOJO_HANDLE_SIGNAL_READABLE,
   1816                                      MOJO_DEADLINE_INDEFINITE, nullptr));
   1817 
   1818   // Drain the consumer and expect to find the given message.
   1819   uint32_t num_bytes = static_cast<uint32_t>(expected_message.size());
   1820   std::vector<char> bytes(expected_message.size());
   1821   EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(c, bytes.data(), &num_bytes,
   1822                                          MOJO_READ_DATA_FLAG_NONE));
   1823   EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size()));
   1824 
   1825   std::string message(bytes.data(), bytes.size());
   1826   EXPECT_EQ(expected_message, message);
   1827 
   1828   EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c));
   1829 
   1830   // Wait for a quit message.
   1831   EXPECT_EQ("quit", ReadMessage(h));
   1832 }
   1833 
   1834 TEST_F(DataPipeTest, SendConsumerAndCloseProducer) {
   1835   // Create a new data pipe.
   1836   MojoHandle p, c;
   1837   EXPECT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(nullptr, &p ,&c));
   1838 
   1839   RUN_CHILD_ON_PIPE(WriteAndCloseProducer, producer_client)
   1840     RUN_CHILD_ON_PIPE(ReadAndCloseConsumer, consumer_client)
   1841       const std::string kMessage = "Hello, world!";
   1842       WriteMessageWithHandles(producer_client, kMessage, &p, 1);
   1843       WriteMessageWithHandles(consumer_client, kMessage, &c, 1);
   1844 
   1845       WriteMessage(consumer_client, "quit");
   1846     END_CHILD()
   1847 
   1848     WriteMessage(producer_client, "quit");
   1849   END_CHILD()
   1850 }
   1851 
   1852 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndWrite, DataPipeTest, h) {
   1853   const MojoCreateDataPipeOptions options = {
   1854       kSizeOfOptions,                           // |struct_size|.
   1855       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
   1856       1,                                        // |element_num_bytes|.
   1857       kMultiprocessCapacity                     // |capacity_num_bytes|.
   1858   };
   1859 
   1860   MojoHandle p, c;
   1861   ASSERT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(&options, &p, &c));
   1862 
   1863   const std::string kMessage = "Hello, world!";
   1864   WriteMessageWithHandles(h, kMessage, &c, 1);
   1865 
   1866   // Write some data to the producer and close it.
   1867   uint32_t num_bytes = static_cast<uint32_t>(kMessage.size());
   1868   EXPECT_EQ(MOJO_RESULT_OK, MojoWriteData(p, kMessage.data(), &num_bytes,
   1869                                           MOJO_WRITE_DATA_FLAG_NONE));
   1870   EXPECT_EQ(num_bytes, static_cast<uint32_t>(kMessage.size()));
   1871   EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p));
   1872 
   1873   // Wait for a quit message.
   1874   EXPECT_EQ("quit", ReadMessage(h));
   1875 }
   1876 
   1877 TEST_F(DataPipeTest, CreateInChild) {
   1878   RUN_CHILD_ON_PIPE(CreateAndWrite, child)
   1879     MojoHandle c;
   1880     std::string expected_message = ReadMessageWithHandles(child, &c, 1);
   1881 
   1882     // Wait for the consumer to become readable.
   1883     EXPECT_EQ(MOJO_RESULT_OK, MojoWait(c, MOJO_HANDLE_SIGNAL_READABLE,
   1884                                        MOJO_DEADLINE_INDEFINITE, nullptr));
   1885 
   1886     // Drain the consumer and expect to find the given message.
   1887     uint32_t num_bytes = static_cast<uint32_t>(expected_message.size());
   1888     std::vector<char> bytes(expected_message.size());
   1889     EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(c, bytes.data(), &num_bytes,
   1890                                            MOJO_READ_DATA_FLAG_NONE));
   1891     EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size()));
   1892 
   1893     std::string message(bytes.data(), bytes.size());
   1894     EXPECT_EQ(expected_message, message);
   1895 
   1896     EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c));
   1897     WriteMessage(child, "quit");
   1898   END_CHILD()
   1899 }
   1900 
   1901 #endif  // !defined(OS_IOS)
   1902 
   1903 }  // namespace
   1904 }  // namespace edk
   1905 }  // namespace mojo
   1906