Home | History | Annotate | Download | only in core
      1 // Copyright 2015 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include <stddef.h>
      6 #include <stdint.h>
      7 
      8 #include <memory>
      9 
     10 #include "base/bind.h"
     11 #include "base/location.h"
     12 #include "base/logging.h"
     13 #include "base/macros.h"
     14 #include "base/message_loop/message_loop.h"
     15 #include "base/run_loop.h"
     16 #include "build/build_config.h"
     17 #include "mojo/core/embedder/embedder.h"
     18 #include "mojo/core/test/mojo_test_base.h"
     19 #include "mojo/core/test_utils.h"
     20 #include "mojo/public/c/system/data_pipe.h"
     21 #include "mojo/public/c/system/functions.h"
     22 #include "mojo/public/c/system/message_pipe.h"
     23 #include "mojo/public/cpp/system/message_pipe.h"
     24 #include "mojo/public/cpp/system/simple_watcher.h"
     25 #include "testing/gtest/include/gtest/gtest.h"
     26 
     27 namespace mojo {
     28 namespace core {
     29 namespace {
     30 
     31 const uint32_t kSizeOfOptions =
     32     static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions));
     33 
     34 // In various places, we have to poll (since, e.g., we can't yet wait for a
     35 // certain amount of data to be available). This is the maximum number of
     36 // iterations (separated by a short sleep).
     37 // TODO(vtl): Get rid of this.
     38 const size_t kMaxPoll = 100;
     39 
     40 // Used in Multiprocess test.
     41 const size_t kMultiprocessCapacity = 37;
     42 const char kMultiprocessTestData[] = "hello i'm a string that is 36 bytes";
     43 const int kMultiprocessMaxIter = 5;
     44 
     45 // TODO(rockot): There are many uses of ASSERT where EXPECT would be more
     46 // appropriate. Fix this.
     47 
     48 class DataPipeTest : public test::MojoTestBase {
     49  public:
     50   DataPipeTest()
     51       : producer_(MOJO_HANDLE_INVALID), consumer_(MOJO_HANDLE_INVALID) {}
     52 
     53   ~DataPipeTest() override {
     54     if (producer_ != MOJO_HANDLE_INVALID)
     55       CHECK_EQ(MOJO_RESULT_OK, MojoClose(producer_));
     56     if (consumer_ != MOJO_HANDLE_INVALID)
     57       CHECK_EQ(MOJO_RESULT_OK, MojoClose(consumer_));
     58   }
     59 
     60   MojoResult ReadEmptyMessageWithHandles(MojoHandle pipe,
     61                                          MojoHandle* out_handles,
     62                                          uint32_t num_handles) {
     63     std::vector<uint8_t> bytes;
     64     std::vector<ScopedHandle> handles;
     65     MojoResult rv = ReadMessageRaw(MessagePipeHandle(pipe), &bytes, &handles,
     66                                    MOJO_READ_MESSAGE_FLAG_NONE);
     67     if (rv == MOJO_RESULT_OK) {
     68       CHECK_EQ(0u, bytes.size());
     69       CHECK_EQ(num_handles, handles.size());
     70       for (size_t i = 0; i < num_handles; ++i)
     71         out_handles[i] = handles[i].release().value();
     72     }
     73     return rv;
     74   }
     75 
     76   MojoResult Create(const MojoCreateDataPipeOptions* options) {
     77     return MojoCreateDataPipe(options, &producer_, &consumer_);
     78   }
     79 
     80   MojoResult WriteData(const void* elements,
     81                        uint32_t* num_bytes,
     82                        bool all_or_none = false) {
     83     MojoWriteDataOptions options;
     84     options.struct_size = sizeof(options);
     85     options.flags = all_or_none ? MOJO_WRITE_DATA_FLAG_ALL_OR_NONE
     86                                 : MOJO_WRITE_DATA_FLAG_NONE;
     87     return MojoWriteData(producer_, elements, num_bytes, &options);
     88   }
     89 
     90   MojoResult ReadData(void* elements,
     91                       uint32_t* num_bytes,
     92                       bool all_or_none = false,
     93                       bool peek = false) {
     94     MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
     95     if (all_or_none)
     96       flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
     97     if (peek)
     98       flags |= MOJO_READ_DATA_FLAG_PEEK;
     99 
    100     MojoReadDataOptions options;
    101     options.struct_size = sizeof(options);
    102     options.flags = flags;
    103     return MojoReadData(consumer_, &options, elements, num_bytes);
    104   }
    105 
    106   MojoResult QueryData(uint32_t* num_bytes) {
    107     MojoReadDataOptions options;
    108     options.struct_size = sizeof(options);
    109     options.flags = MOJO_READ_DATA_FLAG_QUERY;
    110     return MojoReadData(consumer_, &options, nullptr, num_bytes);
    111   }
    112 
    113   MojoResult DiscardData(uint32_t* num_bytes, bool all_or_none = false) {
    114     MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_DISCARD;
    115     if (all_or_none)
    116       flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
    117     MojoReadDataOptions options;
    118     options.struct_size = sizeof(options);
    119     options.flags = flags;
    120     return MojoReadData(consumer_, &options, nullptr, num_bytes);
    121   }
    122 
    123   MojoResult BeginReadData(const void** elements, uint32_t* num_bytes) {
    124     return MojoBeginReadData(consumer_, nullptr, elements, num_bytes);
    125   }
    126 
    127   MojoResult EndReadData(uint32_t num_bytes_read) {
    128     return MojoEndReadData(consumer_, num_bytes_read, nullptr);
    129   }
    130 
    131   MojoResult BeginWriteData(void** elements, uint32_t* num_bytes) {
    132     return MojoBeginWriteData(producer_, nullptr, elements, num_bytes);
    133   }
    134 
    135   MojoResult EndWriteData(uint32_t num_bytes_written) {
    136     return MojoEndWriteData(producer_, num_bytes_written, nullptr);
    137   }
    138 
    139   MojoResult CloseProducer() {
    140     MojoResult rv = MojoClose(producer_);
    141     producer_ = MOJO_HANDLE_INVALID;
    142     return rv;
    143   }
    144 
    145   MojoResult CloseConsumer() {
    146     MojoResult rv = MojoClose(consumer_);
    147     consumer_ = MOJO_HANDLE_INVALID;
    148     return rv;
    149   }
    150 
    151   MojoHandle producer_, consumer_;
    152 
    153  private:
    154   DISALLOW_COPY_AND_ASSIGN(DataPipeTest);
    155 };
    156 
    157 TEST_F(DataPipeTest, Basic) {
    158   const MojoCreateDataPipeOptions options = {
    159       kSizeOfOptions,                          // |struct_size|.
    160       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
    161       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
    162       1000 * sizeof(int32_t)                   // |capacity_num_bytes|.
    163   };
    164 
    165   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
    166 
    167   // We can write to a data pipe handle immediately.
    168   int32_t elements[10] = {};
    169   uint32_t num_bytes = 0;
    170 
    171   num_bytes = static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
    172 
    173   elements[0] = 123;
    174   elements[1] = 456;
    175   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
    176   ASSERT_EQ(MOJO_RESULT_OK, WriteData(&elements[0], &num_bytes));
    177 
    178   // Now wait for the other side to become readable.
    179   MojoHandleSignalsState state;
    180   ASSERT_EQ(MOJO_RESULT_OK,
    181             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &state));
    182   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
    183             state.satisfied_signals);
    184 
    185   elements[0] = -1;
    186   elements[1] = -1;
    187   ASSERT_EQ(MOJO_RESULT_OK, ReadData(&elements[0], &num_bytes));
    188   ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
    189   ASSERT_EQ(elements[0], 123);
    190   ASSERT_EQ(elements[1], 456);
    191 }
    192 
    193 // Tests creation of data pipes with various (valid) options.
    194 TEST_F(DataPipeTest, CreateAndMaybeTransfer) {
    195   MojoCreateDataPipeOptions test_options[] = {
    196       // Default options.
    197       {},
    198       // Trivial element size, non-default capacity.
    199       {kSizeOfOptions,                   // |struct_size|.
    200        MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
    201        1,                                // |element_num_bytes|.
    202        1000},                            // |capacity_num_bytes|.
    203       // Nontrivial element size, non-default capacity.
    204       {kSizeOfOptions,                   // |struct_size|.
    205        MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
    206        4,                                // |element_num_bytes|.
    207        4000},                            // |capacity_num_bytes|.
    208       // Nontrivial element size, default capacity.
    209       {kSizeOfOptions,                   // |struct_size|.
    210        MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
    211        100,                              // |element_num_bytes|.
    212        0}                                // |capacity_num_bytes|.
    213   };
    214   for (size_t i = 0; i < arraysize(test_options); i++) {
    215     MojoHandle producer_handle, consumer_handle;
    216     MojoCreateDataPipeOptions* options = i ? &test_options[i] : nullptr;
    217     ASSERT_EQ(MOJO_RESULT_OK,
    218               MojoCreateDataPipe(options, &producer_handle, &consumer_handle));
    219     ASSERT_EQ(MOJO_RESULT_OK, MojoClose(producer_handle));
    220     ASSERT_EQ(MOJO_RESULT_OK, MojoClose(consumer_handle));
    221   }
    222 }
    223 
    224 TEST_F(DataPipeTest, SimpleReadWrite) {
    225   const MojoCreateDataPipeOptions options = {
    226       kSizeOfOptions,                          // |struct_size|.
    227       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
    228       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
    229       1000 * sizeof(int32_t)                   // |capacity_num_bytes|.
    230   };
    231 
    232   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
    233   MojoHandleSignalsState hss;
    234 
    235   int32_t elements[10] = {};
    236   uint32_t num_bytes = 0;
    237 
    238   // Try reading; nothing there yet.
    239   num_bytes = static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
    240   ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, ReadData(elements, &num_bytes));
    241 
    242   // Query; nothing there yet.
    243   num_bytes = 0;
    244   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    245   ASSERT_EQ(0u, num_bytes);
    246 
    247   // Discard; nothing there yet.
    248   num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0]));
    249   ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, DiscardData(&num_bytes));
    250 
    251   // Read with invalid |num_bytes|.
    252   num_bytes = sizeof(elements[0]) + 1;
    253   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, ReadData(elements, &num_bytes));
    254 
    255   // Write two elements.
    256   elements[0] = 123;
    257   elements[1] = 456;
    258   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
    259   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
    260   // It should have written everything (even without "all or none").
    261   ASSERT_EQ(2u * sizeof(elements[0]), num_bytes);
    262 
    263   // Wait.
    264   ASSERT_EQ(MOJO_RESULT_OK,
    265             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
    266   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
    267             hss.satisfied_signals);
    268   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    269                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
    270                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    271             hss.satisfiable_signals);
    272 
    273   // Query.
    274   // TODO(vtl): It's theoretically possible (though not with the current
    275   // implementation/configured limits) that not all the data has arrived yet.
    276   // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...|
    277   // or |2 * ...|.)
    278   num_bytes = 0;
    279   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    280   ASSERT_EQ(2 * sizeof(elements[0]), num_bytes);
    281 
    282   // Read one element.
    283   elements[0] = -1;
    284   elements[1] = -1;
    285   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    286   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes));
    287   ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
    288   ASSERT_EQ(123, elements[0]);
    289   ASSERT_EQ(-1, elements[1]);
    290 
    291   // Query.
    292   // TODO(vtl): See previous TODO. (If we got 2 elements there, however, we
    293   // should get 1 here.)
    294   num_bytes = 0;
    295   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    296   ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
    297 
    298   // Peek one element.
    299   elements[0] = -1;
    300   elements[1] = -1;
    301   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    302   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, true));
    303   ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
    304   ASSERT_EQ(456, elements[0]);
    305   ASSERT_EQ(-1, elements[1]);
    306 
    307   // Query. Still has 1 element remaining.
    308   num_bytes = 0;
    309   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    310   ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
    311 
    312   // Try to read two elements, with "all or none".
    313   elements[0] = -1;
    314   elements[1] = -1;
    315   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
    316   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
    317             ReadData(elements, &num_bytes, true, false));
    318   ASSERT_EQ(-1, elements[0]);
    319   ASSERT_EQ(-1, elements[1]);
    320 
    321   // Try to read two elements, without "all or none".
    322   elements[0] = -1;
    323   elements[1] = -1;
    324   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
    325   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, false));
    326   ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
    327   ASSERT_EQ(456, elements[0]);
    328   ASSERT_EQ(-1, elements[1]);
    329 
    330   // Query.
    331   num_bytes = 0;
    332   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    333   ASSERT_EQ(0u, num_bytes);
    334 }
    335 
    336 // Note: The "basic" waiting tests test that the "wait states" are correct in
    337 // various situations; they don't test that waiters are properly awoken on state
    338 // changes. (For that, we need to use multiple threads.)
    339 TEST_F(DataPipeTest, BasicProducerWaiting) {
    340   // Note: We take advantage of the fact that current for current
    341   // implementations capacities are strict maximums. This is not guaranteed by
    342   // the API.
    343 
    344   const MojoCreateDataPipeOptions options = {
    345       kSizeOfOptions,                          // |struct_size|.
    346       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
    347       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
    348       2 * sizeof(int32_t)                      // |capacity_num_bytes|.
    349   };
    350   Create(&options);
    351   MojoHandleSignalsState hss;
    352 
    353   // Never readable. Already writable.
    354   hss = GetSignalsState(producer_);
    355   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
    356   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    357                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    358             hss.satisfiable_signals);
    359 
    360   // Write two elements.
    361   int32_t elements[2] = {123, 456};
    362   uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
    363   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
    364   ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
    365 
    366   // Wait for data to become available to the consumer.
    367   ASSERT_EQ(MOJO_RESULT_OK,
    368             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
    369   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
    370             hss.satisfied_signals);
    371   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    372                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
    373                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    374             hss.satisfiable_signals);
    375 
    376   // Peek one element.
    377   elements[0] = -1;
    378   elements[1] = -1;
    379   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    380   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
    381   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
    382   ASSERT_EQ(123, elements[0]);
    383   ASSERT_EQ(-1, elements[1]);
    384 
    385   // Read one element.
    386   elements[0] = -1;
    387   elements[1] = -1;
    388   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    389   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, false));
    390   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
    391   ASSERT_EQ(123, elements[0]);
    392   ASSERT_EQ(-1, elements[1]);
    393 
    394   // Try writing, using a two-phase write.
    395   void* buffer = nullptr;
    396   num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
    397   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes));
    398   EXPECT_TRUE(buffer);
    399   ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
    400 
    401   static_cast<int32_t*>(buffer)[0] = 789;
    402   ASSERT_EQ(MOJO_RESULT_OK,
    403             EndWriteData(static_cast<uint32_t>(1u * sizeof(elements[0]))));
    404 
    405   // Read one element, using a two-phase read.
    406   const void* read_buffer = nullptr;
    407   num_bytes = 0u;
    408   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
    409   EXPECT_TRUE(read_buffer);
    410   // The two-phase read should be able to read at least one element.
    411   ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
    412   ASSERT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]);
    413   ASSERT_EQ(MOJO_RESULT_OK,
    414             EndReadData(static_cast<uint32_t>(1u * sizeof(elements[0]))));
    415 
    416   // Write one element.
    417   elements[0] = 123;
    418   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    419   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
    420   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
    421 
    422   // Close the consumer.
    423   CloseConsumer();
    424 
    425   // It should now be never-writable.
    426   hss = MojoHandleSignalsState();
    427   ASSERT_EQ(MOJO_RESULT_OK,
    428             WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
    429   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
    430   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
    431 }
    432 
    433 TEST_F(DataPipeTest, PeerClosedProducerWaiting) {
    434   const MojoCreateDataPipeOptions options = {
    435       kSizeOfOptions,                          // |struct_size|.
    436       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
    437       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
    438       2 * sizeof(int32_t)                      // |capacity_num_bytes|.
    439   };
    440   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
    441   MojoHandleSignalsState hss;
    442 
    443   // Close the consumer.
    444   CloseConsumer();
    445 
    446   // It should be signaled.
    447   hss = MojoHandleSignalsState();
    448   ASSERT_EQ(MOJO_RESULT_OK,
    449             WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
    450   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
    451   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
    452 }
    453 
    454 TEST_F(DataPipeTest, PeerClosedConsumerWaiting) {
    455   const MojoCreateDataPipeOptions options = {
    456       kSizeOfOptions,                          // |struct_size|.
    457       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
    458       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
    459       2 * sizeof(int32_t)                      // |capacity_num_bytes|.
    460   };
    461   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
    462   MojoHandleSignalsState hss;
    463 
    464   // Close the producer.
    465   CloseProducer();
    466 
    467   // It should be signaled.
    468   hss = MojoHandleSignalsState();
    469   ASSERT_EQ(MOJO_RESULT_OK,
    470             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
    471   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
    472   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
    473 }
    474 
    475 TEST_F(DataPipeTest, BasicConsumerWaiting) {
    476   const MojoCreateDataPipeOptions options = {
    477       kSizeOfOptions,                          // |struct_size|.
    478       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
    479       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
    480       1000 * sizeof(int32_t)                   // |capacity_num_bytes|.
    481   };
    482   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
    483   MojoHandleSignalsState hss;
    484 
    485   // Never writable.
    486   hss = MojoHandleSignalsState();
    487   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    488             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
    489   EXPECT_EQ(0u, hss.satisfied_signals);
    490   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    491                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
    492                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    493             hss.satisfiable_signals);
    494 
    495   // Write two elements.
    496   int32_t elements[2] = {123, 456};
    497   uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
    498   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
    499 
    500   // Wait for readability.
    501   hss = MojoHandleSignalsState();
    502   ASSERT_EQ(MOJO_RESULT_OK,
    503             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
    504   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
    505             hss.satisfied_signals);
    506   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    507                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
    508                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    509             hss.satisfiable_signals);
    510 
    511   // Discard one element.
    512   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    513   ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
    514   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
    515 
    516   // Should still be readable.
    517   hss = MojoHandleSignalsState();
    518   ASSERT_EQ(MOJO_RESULT_OK,
    519             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
    520   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    521   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    522                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
    523                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    524             hss.satisfiable_signals);
    525 
    526   // Peek one element.
    527   elements[0] = -1;
    528   elements[1] = -1;
    529   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    530   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
    531   ASSERT_EQ(456, elements[0]);
    532   ASSERT_EQ(-1, elements[1]);
    533 
    534   // Should still be readable.
    535   hss = MojoHandleSignalsState();
    536   ASSERT_EQ(MOJO_RESULT_OK,
    537             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
    538   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    539   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    540                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
    541                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    542             hss.satisfiable_signals);
    543 
    544   // Read one element.
    545   elements[0] = -1;
    546   elements[1] = -1;
    547   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    548   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
    549   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
    550   ASSERT_EQ(456, elements[0]);
    551   ASSERT_EQ(-1, elements[1]);
    552 
    553   // Write one element.
    554   elements[0] = 789;
    555   elements[1] = -1;
    556   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    557   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
    558 
    559   // Waiting should now succeed.
    560   hss = MojoHandleSignalsState();
    561   ASSERT_EQ(MOJO_RESULT_OK,
    562             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
    563   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
    564             hss.satisfied_signals);
    565   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    566                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
    567                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    568             hss.satisfiable_signals);
    569 
    570   // Close the producer.
    571   CloseProducer();
    572 
    573   // Should still be readable.
    574   hss = MojoHandleSignalsState();
    575   ASSERT_EQ(MOJO_RESULT_OK,
    576             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
    577   EXPECT_TRUE(hss.satisfied_signals & (MOJO_HANDLE_SIGNAL_READABLE |
    578                                        MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE));
    579   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    580                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
    581             hss.satisfiable_signals);
    582 
    583   // Wait for the peer closed signal.
    584   hss = MojoHandleSignalsState();
    585   ASSERT_EQ(MOJO_RESULT_OK,
    586             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
    587   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
    588                 MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    589             hss.satisfied_signals);
    590   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    591                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
    592             hss.satisfiable_signals);
    593 
    594   // Read one element.
    595   elements[0] = -1;
    596   elements[1] = -1;
    597   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
    598   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
    599   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
    600   ASSERT_EQ(789, elements[0]);
    601   ASSERT_EQ(-1, elements[1]);
    602 
    603   // Should be never-readable.
    604   hss = MojoHandleSignalsState();
    605   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    606             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
    607   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
    608   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
    609 }
    610 
    611 TEST_F(DataPipeTest, ConsumerNewDataReadable) {
    612   const MojoCreateDataPipeOptions create_options = {
    613       kSizeOfOptions,                          // |struct_size|.
    614       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
    615       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
    616       1000 * sizeof(int32_t)                   // |capacity_num_bytes|.
    617   };
    618   EXPECT_EQ(MOJO_RESULT_OK, Create(&create_options));
    619 
    620   int32_t elements[2] = {123, 456};
    621   uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
    622   EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
    623 
    624   // The consumer handle should appear to be readable and have new data.
    625   EXPECT_EQ(MOJO_RESULT_OK,
    626             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE));
    627   EXPECT_TRUE(GetSignalsState(consumer_).satisfied_signals &
    628               MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE);
    629 
    630   // Now try to read a minimum of 6 elements.
    631   int32_t read_elements[6];
    632   uint32_t num_read_bytes = sizeof(read_elements);
    633   MojoReadDataOptions read_options;
    634   read_options.struct_size = sizeof(read_options);
    635   read_options.flags = MOJO_READ_DATA_FLAG_ALL_OR_NONE;
    636   EXPECT_EQ(
    637       MOJO_RESULT_OUT_OF_RANGE,
    638       MojoReadData(consumer_, &read_options, read_elements, &num_read_bytes));
    639 
    640   // The consumer should still appear to be readable but not with new data.
    641   EXPECT_TRUE(GetSignalsState(consumer_).satisfied_signals &
    642               MOJO_HANDLE_SIGNAL_READABLE);
    643   EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals &
    644                MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE);
    645 
    646   // Write four more elements.
    647   EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
    648   EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
    649 
    650   // The consumer handle should once again appear to be readable.
    651   EXPECT_EQ(MOJO_RESULT_OK,
    652             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE));
    653 
    654   // Try again to read a minimum of 6 elements. Should succeed this time.
    655   EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(consumer_, &read_options,
    656                                          read_elements, &num_read_bytes));
    657 
    658   // And now the consumer is unreadable.
    659   EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals &
    660                MOJO_HANDLE_SIGNAL_READABLE);
    661   EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals &
    662                MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE);
    663 }
    664 
    665 // Test with two-phase APIs and also closing the producer with an active
    666 // consumer waiter.
    667 TEST_F(DataPipeTest, ConsumerWaitingTwoPhase) {
    668   const MojoCreateDataPipeOptions options = {
    669       kSizeOfOptions,                          // |struct_size|.
    670       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
    671       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
    672       1000 * sizeof(int32_t)                   // |capacity_num_bytes|.
    673   };
    674   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
    675   MojoHandleSignalsState hss;
    676 
    677   // Write two elements.
    678   int32_t* elements = nullptr;
    679   void* buffer = nullptr;
    680   // Request room for three (but we'll only write two).
    681   uint32_t num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
    682   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes));
    683   EXPECT_TRUE(buffer);
    684   EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0])));
    685   elements = static_cast<int32_t*>(buffer);
    686   elements[0] = 123;
    687   elements[1] = 456;
    688   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(2u * sizeof(elements[0])));
    689 
    690   // Wait for readability.
    691   hss = MojoHandleSignalsState();
    692   ASSERT_EQ(MOJO_RESULT_OK,
    693             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
    694   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
    695             hss.satisfied_signals);
    696   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    697                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
    698                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    699             hss.satisfiable_signals);
    700 
    701   // Read one element.
    702   // Two should be available, but only read one.
    703   const void* read_buffer = nullptr;
    704   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
    705   EXPECT_TRUE(read_buffer);
    706   ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
    707   const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
    708   ASSERT_EQ(123, read_elements[0]);
    709   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
    710 
    711   // Should still be readable.
    712   hss = MojoHandleSignalsState();
    713   ASSERT_EQ(MOJO_RESULT_OK,
    714             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
    715   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    716   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    717                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
    718                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    719             hss.satisfiable_signals);
    720 
    721   // Read one element.
    722   // Request three, but not in all-or-none mode.
    723   read_buffer = nullptr;
    724   num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
    725   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
    726   EXPECT_TRUE(read_buffer);
    727   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
    728   read_elements = static_cast<const int32_t*>(read_buffer);
    729   ASSERT_EQ(456, read_elements[0]);
    730   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
    731 
    732   // Close the producer.
    733   CloseProducer();
    734 
    735   // Should be never-readable.
    736   hss = MojoHandleSignalsState();
    737   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    738             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
    739   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
    740   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
    741 }
    742 
    743 // Tests that data pipes aren't writable/readable during two-phase writes/reads.
    744 TEST_F(DataPipeTest, BasicTwoPhaseWaiting) {
    745   const MojoCreateDataPipeOptions options = {
    746       kSizeOfOptions,                          // |struct_size|.
    747       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
    748       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
    749       1000 * sizeof(int32_t)                   // |capacity_num_bytes|.
    750   };
    751   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
    752   MojoHandleSignalsState hss;
    753 
    754   // It should be writable.
    755   hss = GetSignalsState(producer_);
    756   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
    757   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    758                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    759             hss.satisfiable_signals);
    760 
    761   uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
    762   void* write_ptr = nullptr;
    763   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
    764   EXPECT_TRUE(write_ptr);
    765   EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
    766 
    767   // At this point, it shouldn't be writable.
    768   hss = GetSignalsState(producer_);
    769   ASSERT_EQ(0u, hss.satisfied_signals);
    770   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    771                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    772             hss.satisfiable_signals);
    773 
    774   // It shouldn't be readable yet either (we'll wait later).
    775   hss = GetSignalsState(consumer_);
    776   ASSERT_EQ(0u, hss.satisfied_signals);
    777   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    778                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
    779                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    780             hss.satisfiable_signals);
    781 
    782   static_cast<int32_t*>(write_ptr)[0] = 123;
    783   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(1u * sizeof(int32_t)));
    784 
    785   // It should immediately be writable again.
    786   hss = GetSignalsState(producer_);
    787   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
    788   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    789                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    790             hss.satisfiable_signals);
    791 
    792   // It should become readable.
    793   hss = MojoHandleSignalsState();
    794   ASSERT_EQ(MOJO_RESULT_OK,
    795             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
    796   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
    797             hss.satisfied_signals);
    798   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    799                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
    800                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    801             hss.satisfiable_signals);
    802 
    803   // Start another two-phase write and check that it's readable even in the
    804   // middle of it.
    805   num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
    806   write_ptr = nullptr;
    807   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
    808   EXPECT_TRUE(write_ptr);
    809   EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
    810 
    811   // It should be readable.
    812   hss = MojoHandleSignalsState();
    813   ASSERT_EQ(MOJO_RESULT_OK,
    814             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
    815   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
    816             hss.satisfied_signals);
    817   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    818                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
    819                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    820             hss.satisfiable_signals);
    821 
    822   // End the two-phase write without writing anything.
    823   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0u));
    824 
    825   // Start a two-phase read.
    826   num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
    827   const void* read_ptr = nullptr;
    828   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
    829   EXPECT_TRUE(read_ptr);
    830   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes);
    831 
    832   // At this point, it should still be writable.
    833   hss = GetSignalsState(producer_);
    834   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
    835   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    836                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    837             hss.satisfiable_signals);
    838 
    839   // But not readable.
    840   hss = GetSignalsState(consumer_);
    841   ASSERT_EQ(0u, hss.satisfied_signals);
    842   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    843                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
    844                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    845             hss.satisfiable_signals);
    846 
    847   // End the two-phase read without reading anything.
    848   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0u));
    849 
    850   // It should be readable again.
    851   hss = GetSignalsState(consumer_);
    852   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    853   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
    854                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
    855                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    856             hss.satisfiable_signals);
    857 }
    858 
    859 void Seq(int32_t start, size_t count, int32_t* out) {
    860   for (size_t i = 0; i < count; i++)
    861     out[i] = start + static_cast<int32_t>(i);
    862 }
    863 
    864 TEST_F(DataPipeTest, AllOrNone) {
    865   const MojoCreateDataPipeOptions options = {
    866       kSizeOfOptions,                          // |struct_size|.
    867       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
    868       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
    869       10 * sizeof(int32_t)                     // |capacity_num_bytes|.
    870   };
    871   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
    872   MojoHandleSignalsState hss;
    873 
    874   // Try writing more than the total capacity of the pipe.
    875   uint32_t num_bytes = 20u * sizeof(int32_t);
    876   int32_t buffer[100];
    877   Seq(0, arraysize(buffer), buffer);
    878   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
    879 
    880   // Should still be empty.
    881   num_bytes = ~0u;
    882   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    883   ASSERT_EQ(0u, num_bytes);
    884 
    885   // Write some data.
    886   num_bytes = 5u * sizeof(int32_t);
    887   Seq(100, arraysize(buffer), buffer);
    888   ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
    889   ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
    890 
    891   // Wait for data.
    892   // TODO(vtl): There's no real guarantee that all the data will become
    893   // available at once (except that in current implementations, with reasonable
    894   // limits, it will). Eventually, we'll be able to wait for a specified amount
    895   // of data to become available.
    896   hss = MojoHandleSignalsState();
    897   ASSERT_EQ(MOJO_RESULT_OK,
    898             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
    899   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
    900             hss.satisfied_signals);
    901   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
    902                 MOJO_HANDLE_SIGNAL_PEER_CLOSED | MOJO_HANDLE_SIGNAL_PEER_REMOTE,
    903             hss.satisfiable_signals);
    904 
    905   // Half full.
    906   num_bytes = 0u;
    907   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    908   ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
    909 
    910   // Try writing more than the available capacity of the pipe, but less than the
    911   // total capacity.
    912   num_bytes = 6u * sizeof(int32_t);
    913   Seq(200, arraysize(buffer), buffer);
    914   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
    915 
    916   // Try reading too much.
    917   num_bytes = 11u * sizeof(int32_t);
    918   memset(buffer, 0xab, sizeof(buffer));
    919   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
    920   int32_t expected_buffer[100];
    921   memset(expected_buffer, 0xab, sizeof(expected_buffer));
    922   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
    923 
    924   // Try discarding too much.
    925   num_bytes = 11u * sizeof(int32_t);
    926   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
    927 
    928   // Just a little.
    929   num_bytes = 2u * sizeof(int32_t);
    930   Seq(300, arraysize(buffer), buffer);
    931   ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
    932   ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
    933 
    934   // Just right.
    935   num_bytes = 3u * sizeof(int32_t);
    936   Seq(400, arraysize(buffer), buffer);
    937   ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
    938   ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
    939 
    940   // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
    941   // specified amount of data to be available, so poll.
    942   for (size_t i = 0; i < kMaxPoll; i++) {
    943     num_bytes = 0u;
    944     ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    945     if (num_bytes >= 10u * sizeof(int32_t))
    946       break;
    947 
    948     test::Sleep(test::EpsilonDeadline());
    949   }
    950   ASSERT_EQ(10u * sizeof(int32_t), num_bytes);
    951 
    952   // Read half.
    953   num_bytes = 5u * sizeof(int32_t);
    954   memset(buffer, 0xab, sizeof(buffer));
    955   ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
    956   ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
    957   memset(expected_buffer, 0xab, sizeof(expected_buffer));
    958   Seq(100, 5, expected_buffer);
    959   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
    960 
    961   // Try reading too much again.
    962   num_bytes = 6u * sizeof(int32_t);
    963   memset(buffer, 0xab, sizeof(buffer));
    964   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
    965   memset(expected_buffer, 0xab, sizeof(expected_buffer));
    966   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
    967 
    968   // Try discarding too much again.
    969   num_bytes = 6u * sizeof(int32_t);
    970   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
    971 
    972   // Discard a little.
    973   num_bytes = 2u * sizeof(int32_t);
    974   ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
    975   ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
    976 
    977   // Three left.
    978   num_bytes = 0u;
    979   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
    980   ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
    981 
    982   // Close the producer, then test producer-closed cases.
    983   CloseProducer();
    984 
    985   // Wait.
    986   hss = MojoHandleSignalsState();
    987   ASSERT_EQ(MOJO_RESULT_OK,
    988             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
    989   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    990             hss.satisfied_signals);
    991   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
    992             hss.satisfiable_signals);
    993 
    994   // Try reading too much; "failed precondition" since the producer is closed.
    995   num_bytes = 4u * sizeof(int32_t);
    996   memset(buffer, 0xab, sizeof(buffer));
    997   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    998             ReadData(buffer, &num_bytes, true));
    999   memset(expected_buffer, 0xab, sizeof(expected_buffer));
   1000   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
   1001 
   1002   // Try discarding too much; "failed precondition" again.
   1003   num_bytes = 4u * sizeof(int32_t);
   1004   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes, true));
   1005 
   1006   // Read a little.
   1007   num_bytes = 2u * sizeof(int32_t);
   1008   memset(buffer, 0xab, sizeof(buffer));
   1009   ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
   1010   ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
   1011   memset(expected_buffer, 0xab, sizeof(expected_buffer));
   1012   Seq(400, 2, expected_buffer);
   1013   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
   1014 
   1015   // Discard the remaining element.
   1016   num_bytes = 1u * sizeof(int32_t);
   1017   ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
   1018   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
   1019 
   1020   // Empty again.
   1021   num_bytes = ~0u;
   1022   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1023   ASSERT_EQ(0u, num_bytes);
   1024 }
   1025 
   1026 // Tests that |ProducerWriteData()| and |ConsumerReadData()| writes and reads,
   1027 // respectively, as much as possible, even if it may have to "wrap around" the
   1028 // internal circular buffer. (Note that the two-phase write and read need not do
   1029 // this.)
   1030 TEST_F(DataPipeTest, WrapAround) {
   1031   unsigned char test_data[1000];
   1032   for (size_t i = 0; i < arraysize(test_data); i++)
   1033     test_data[i] = static_cast<unsigned char>(i);
   1034 
   1035   const MojoCreateDataPipeOptions options = {
   1036       kSizeOfOptions,                   // |struct_size|.
   1037       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
   1038       1u,                               // |element_num_bytes|.
   1039       100u                              // |capacity_num_bytes|.
   1040   };
   1041 
   1042   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1043   MojoHandleSignalsState hss;
   1044 
   1045   // Write 20 bytes.
   1046   uint32_t num_bytes = 20u;
   1047   ASSERT_EQ(MOJO_RESULT_OK, WriteData(&test_data[0], &num_bytes, true));
   1048   ASSERT_EQ(20u, num_bytes);
   1049 
   1050   // Wait for data.
   1051   ASSERT_EQ(MOJO_RESULT_OK,
   1052             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
   1053   EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
   1054   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
   1055                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
   1056                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
   1057             hss.satisfiable_signals);
   1058 
   1059   // Read 10 bytes.
   1060   unsigned char read_buffer[1000] = {0};
   1061   num_bytes = 10u;
   1062   ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes, true));
   1063   ASSERT_EQ(10u, num_bytes);
   1064   ASSERT_EQ(0, memcmp(read_buffer, &test_data[0], 10u));
   1065 
   1066   // Check that a two-phase write can now only write (at most) 80 bytes. (This
   1067   // checks an implementation detail; this behavior is not guaranteed.)
   1068   void* write_buffer_ptr = nullptr;
   1069   num_bytes = 0u;
   1070   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
   1071   EXPECT_TRUE(write_buffer_ptr);
   1072   ASSERT_EQ(80u, num_bytes);
   1073   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0));
   1074 
   1075   size_t total_num_bytes = 0;
   1076   while (total_num_bytes < 90) {
   1077     // Wait to write.
   1078     ASSERT_EQ(MOJO_RESULT_OK,
   1079               WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
   1080     ASSERT_EQ(hss.satisfied_signals, MOJO_HANDLE_SIGNAL_WRITABLE);
   1081     ASSERT_EQ(hss.satisfiable_signals, MOJO_HANDLE_SIGNAL_WRITABLE |
   1082                                            MOJO_HANDLE_SIGNAL_PEER_CLOSED |
   1083                                            MOJO_HANDLE_SIGNAL_PEER_REMOTE);
   1084 
   1085     // Write as much as we can.
   1086     num_bytes = 100;
   1087     ASSERT_EQ(MOJO_RESULT_OK,
   1088               WriteData(&test_data[20 + total_num_bytes], &num_bytes, false));
   1089     total_num_bytes += num_bytes;
   1090   }
   1091 
   1092   ASSERT_EQ(90u, total_num_bytes);
   1093 
   1094   num_bytes = 0;
   1095   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1096   ASSERT_EQ(100u, num_bytes);
   1097 
   1098   // Check that a two-phase read can now only read (at most) 90 bytes. (This
   1099   // checks an implementation detail; this behavior is not guaranteed.)
   1100   const void* read_buffer_ptr = nullptr;
   1101   num_bytes = 0;
   1102   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
   1103   EXPECT_TRUE(read_buffer_ptr);
   1104   ASSERT_EQ(90u, num_bytes);
   1105   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0));
   1106 
   1107   // Read as much as possible. We should read 100 bytes.
   1108   num_bytes =
   1109       static_cast<uint32_t>(arraysize(read_buffer) * sizeof(read_buffer[0]));
   1110   memset(read_buffer, 0, num_bytes);
   1111   ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes));
   1112   ASSERT_EQ(100u, num_bytes);
   1113   ASSERT_EQ(0, memcmp(read_buffer, &test_data[10], 100u));
   1114 }
   1115 
   1116 // Tests the behavior of writing (simple and two-phase), closing the producer,
   1117 // then reading (simple and two-phase).
   1118 TEST_F(DataPipeTest, WriteCloseProducerRead) {
   1119   const char kTestData[] = "hello world";
   1120   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
   1121 
   1122   const MojoCreateDataPipeOptions options = {
   1123       kSizeOfOptions,                   // |struct_size|.
   1124       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
   1125       1u,                               // |element_num_bytes|.
   1126       1000u                             // |capacity_num_bytes|.
   1127   };
   1128   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1129 
   1130   // Write some data, so we'll have something to read.
   1131   uint32_t num_bytes = kTestDataSize;
   1132   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
   1133   ASSERT_EQ(kTestDataSize, num_bytes);
   1134 
   1135   // Write it again, so we'll have something left over.
   1136   num_bytes = kTestDataSize;
   1137   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
   1138   ASSERT_EQ(kTestDataSize, num_bytes);
   1139 
   1140   // Start two-phase write.
   1141   void* write_buffer_ptr = nullptr;
   1142   num_bytes = 0u;
   1143   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
   1144   EXPECT_TRUE(write_buffer_ptr);
   1145   EXPECT_GT(num_bytes, 0u);
   1146 
   1147   // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
   1148   for (size_t i = 0; i < kMaxPoll; i++) {
   1149     num_bytes = 0u;
   1150     ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1151     if (num_bytes >= 2u * kTestDataSize)
   1152       break;
   1153 
   1154     test::Sleep(test::EpsilonDeadline());
   1155   }
   1156   ASSERT_EQ(2u * kTestDataSize, num_bytes);
   1157 
   1158   // Start two-phase read.
   1159   const void* read_buffer_ptr = nullptr;
   1160   num_bytes = 0u;
   1161   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
   1162   EXPECT_TRUE(read_buffer_ptr);
   1163   ASSERT_EQ(2u * kTestDataSize, num_bytes);
   1164 
   1165   // Close the producer.
   1166   CloseProducer();
   1167 
   1168   // The consumer can finish its two-phase read.
   1169   ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
   1170   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(kTestDataSize));
   1171 
   1172   // And start another.
   1173   read_buffer_ptr = nullptr;
   1174   num_bytes = 0u;
   1175   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
   1176   EXPECT_TRUE(read_buffer_ptr);
   1177   ASSERT_EQ(kTestDataSize, num_bytes);
   1178 }
   1179 
   1180 // Tests the behavior of interrupting a two-phase read and write by closing the
   1181 // consumer.
   1182 TEST_F(DataPipeTest, TwoPhaseWriteReadCloseConsumer) {
   1183   const char kTestData[] = "hello world";
   1184   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
   1185 
   1186   const MojoCreateDataPipeOptions options = {
   1187       kSizeOfOptions,                   // |struct_size|.
   1188       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
   1189       1u,                               // |element_num_bytes|.
   1190       1000u                             // |capacity_num_bytes|.
   1191   };
   1192   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1193   MojoHandleSignalsState hss;
   1194 
   1195   // Write some data, so we'll have something to read.
   1196   uint32_t num_bytes = kTestDataSize;
   1197   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
   1198   ASSERT_EQ(kTestDataSize, num_bytes);
   1199 
   1200   // Start two-phase write.
   1201   void* write_buffer_ptr = nullptr;
   1202   num_bytes = 0u;
   1203   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
   1204   EXPECT_TRUE(write_buffer_ptr);
   1205   ASSERT_GT(num_bytes, kTestDataSize);
   1206 
   1207   // Wait for data.
   1208   // TODO(vtl): (See corresponding TODO in AllOrNone.)
   1209   hss = MojoHandleSignalsState();
   1210   ASSERT_EQ(MOJO_RESULT_OK,
   1211             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
   1212   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
   1213             hss.satisfied_signals);
   1214   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
   1215                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
   1216                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
   1217             hss.satisfiable_signals);
   1218 
   1219   // Start two-phase read.
   1220   const void* read_buffer_ptr = nullptr;
   1221   num_bytes = 0u;
   1222   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
   1223   EXPECT_TRUE(read_buffer_ptr);
   1224   ASSERT_EQ(kTestDataSize, num_bytes);
   1225 
   1226   // Close the consumer.
   1227   CloseConsumer();
   1228 
   1229   // Wait for producer to know that the consumer is closed.
   1230   hss = MojoHandleSignalsState();
   1231   ASSERT_EQ(MOJO_RESULT_OK,
   1232             WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
   1233   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
   1234   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
   1235 
   1236   // Actually write some data. (Note: Premature freeing of the buffer would
   1237   // probably only be detected under ASAN or similar.)
   1238   memcpy(write_buffer_ptr, kTestData, kTestDataSize);
   1239   // Note: Even though the consumer has been closed, ending the two-phase
   1240   // write will report success.
   1241   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(kTestDataSize));
   1242 
   1243   // But trying to write should result in failure.
   1244   num_bytes = kTestDataSize;
   1245   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, WriteData(kTestData, &num_bytes));
   1246 
   1247   // As will trying to start another two-phase write.
   1248   write_buffer_ptr = nullptr;
   1249   num_bytes = 0u;
   1250   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
   1251             BeginWriteData(&write_buffer_ptr, &num_bytes));
   1252 }
   1253 
   1254 // Tests the behavior of "interrupting" a two-phase write by closing both the
   1255 // producer and the consumer.
   1256 TEST_F(DataPipeTest, TwoPhaseWriteCloseBoth) {
   1257   const uint32_t kTestDataSize = 15u;
   1258 
   1259   const MojoCreateDataPipeOptions options = {
   1260       kSizeOfOptions,                   // |struct_size|.
   1261       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
   1262       1u,                               // |element_num_bytes|.
   1263       1000u                             // |capacity_num_bytes|.
   1264   };
   1265   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1266 
   1267   // Start two-phase write.
   1268   void* write_buffer_ptr = nullptr;
   1269   uint32_t num_bytes = 0u;
   1270   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
   1271   EXPECT_TRUE(write_buffer_ptr);
   1272   ASSERT_GT(num_bytes, kTestDataSize);
   1273 }
   1274 
   1275 // Tests the behavior of writing, closing the producer, and then reading (with
   1276 // and without data remaining).
   1277 TEST_F(DataPipeTest, WriteCloseProducerReadNoData) {
   1278   const char kTestData[] = "hello world";
   1279   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
   1280 
   1281   const MojoCreateDataPipeOptions options = {
   1282       kSizeOfOptions,                   // |struct_size|.
   1283       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
   1284       1u,                               // |element_num_bytes|.
   1285       1000u                             // |capacity_num_bytes|.
   1286   };
   1287   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1288   MojoHandleSignalsState hss;
   1289 
   1290   // Write some data, so we'll have something to read.
   1291   uint32_t num_bytes = kTestDataSize;
   1292   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
   1293   ASSERT_EQ(kTestDataSize, num_bytes);
   1294 
   1295   // Close the producer.
   1296   CloseProducer();
   1297 
   1298   // Wait. (Note that once the consumer knows that the producer is closed, it
   1299   // must also know about all the data that was sent.)
   1300   hss = MojoHandleSignalsState();
   1301   ASSERT_EQ(MOJO_RESULT_OK,
   1302             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
   1303   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
   1304                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
   1305             hss.satisfied_signals);
   1306   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
   1307                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
   1308             hss.satisfiable_signals);
   1309 
   1310   // Peek that data.
   1311   char buffer[1000];
   1312   num_bytes = static_cast<uint32_t>(sizeof(buffer));
   1313   ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, false, true));
   1314   ASSERT_EQ(kTestDataSize, num_bytes);
   1315   ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
   1316 
   1317   // Read that data.
   1318   memset(buffer, 0, 1000);
   1319   num_bytes = static_cast<uint32_t>(sizeof(buffer));
   1320   ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes));
   1321   ASSERT_EQ(kTestDataSize, num_bytes);
   1322   ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
   1323 
   1324   // A second read should fail.
   1325   num_bytes = static_cast<uint32_t>(sizeof(buffer));
   1326   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ReadData(buffer, &num_bytes));
   1327 
   1328   // A two-phase read should also fail.
   1329   const void* read_buffer_ptr = nullptr;
   1330   num_bytes = 0u;
   1331   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
   1332             BeginReadData(&read_buffer_ptr, &num_bytes));
   1333 
   1334   // Ditto for discard.
   1335   num_bytes = 10u;
   1336   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes));
   1337 }
   1338 
   1339 // Test that during a two phase read the memory stays valid even if more data
   1340 // comes in.
   1341 TEST_F(DataPipeTest, TwoPhaseReadMemoryStable) {
   1342   const char kTestData[] = "hello world";
   1343   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
   1344 
   1345   const MojoCreateDataPipeOptions options = {
   1346       kSizeOfOptions,                   // |struct_size|.
   1347       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
   1348       1u,                               // |element_num_bytes|.
   1349       1000u                             // |capacity_num_bytes|.
   1350   };
   1351   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1352   MojoHandleSignalsState hss;
   1353 
   1354   // Write some data.
   1355   uint32_t num_bytes = kTestDataSize;
   1356   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
   1357   ASSERT_EQ(kTestDataSize, num_bytes);
   1358 
   1359   // Wait for the data.
   1360   hss = MojoHandleSignalsState();
   1361   ASSERT_EQ(MOJO_RESULT_OK,
   1362             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
   1363   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
   1364             hss.satisfied_signals);
   1365   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
   1366                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
   1367                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
   1368             hss.satisfiable_signals);
   1369 
   1370   // Begin a two-phase read.
   1371   const void* read_buffer_ptr = nullptr;
   1372   uint32_t read_buffer_size = 0u;
   1373   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &read_buffer_size));
   1374 
   1375   // Write more data.
   1376   const char kExtraData[] = "bye world";
   1377   const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData));
   1378   num_bytes = kExtraDataSize;
   1379   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes));
   1380   ASSERT_EQ(kExtraDataSize, num_bytes);
   1381 
   1382   // Close the producer.
   1383   CloseProducer();
   1384 
   1385   // Wait. (Note that once the consumer knows that the producer is closed, it
   1386   // must also have received the extra data).
   1387   hss = MojoHandleSignalsState();
   1388   ASSERT_EQ(MOJO_RESULT_OK,
   1389             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
   1390   EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
   1391   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
   1392                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
   1393             hss.satisfiable_signals);
   1394 
   1395   // Read the two phase memory to check it's still valid.
   1396   ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
   1397   EndReadData(read_buffer_size);
   1398 }
   1399 
   1400 // Test that two-phase reads/writes behave correctly when given invalid
   1401 // arguments.
   1402 TEST_F(DataPipeTest, TwoPhaseMoreInvalidArguments) {
   1403   const MojoCreateDataPipeOptions options = {
   1404       kSizeOfOptions,                          // |struct_size|.
   1405       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
   1406       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
   1407       10 * sizeof(int32_t)                     // |capacity_num_bytes|.
   1408   };
   1409   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1410   MojoHandleSignalsState hss;
   1411 
   1412   // No data.
   1413   uint32_t num_bytes = 1000u;
   1414   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1415   ASSERT_EQ(0u, num_bytes);
   1416 
   1417   // Try "ending" a two-phase write when one isn't active.
   1418   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
   1419             EndWriteData(1u * sizeof(int32_t)));
   1420 
   1421   // Wait a bit, to make sure that if a signal were (incorrectly) sent, it'd
   1422   // have time to propagate.
   1423   test::Sleep(test::EpsilonDeadline());
   1424 
   1425   // Still no data.
   1426   num_bytes = 1000u;
   1427   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1428   ASSERT_EQ(0u, num_bytes);
   1429 
   1430   // Try ending a two-phase write with an invalid amount (too much).
   1431   num_bytes = 0u;
   1432   void* write_ptr = nullptr;
   1433   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
   1434   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
   1435             EndWriteData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
   1436 
   1437   // But the two-phase write still ended.
   1438   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
   1439 
   1440   // Wait a bit (as above).
   1441   test::Sleep(test::EpsilonDeadline());
   1442 
   1443   // Still no data.
   1444   num_bytes = 1000u;
   1445   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1446   ASSERT_EQ(0u, num_bytes);
   1447 
   1448   // Try ending a two-phase write with an invalid amount (not a multiple of the
   1449   // element size).
   1450   num_bytes = 0u;
   1451   write_ptr = nullptr;
   1452   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
   1453   EXPECT_GE(num_bytes, 1u);
   1454   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndWriteData(1u));
   1455 
   1456   // But the two-phase write still ended.
   1457   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
   1458 
   1459   // Wait a bit (as above).
   1460   test::Sleep(test::EpsilonDeadline());
   1461 
   1462   // Still no data.
   1463   num_bytes = 1000u;
   1464   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1465   ASSERT_EQ(0u, num_bytes);
   1466 
   1467   // Now write some data, so we'll be able to try reading.
   1468   int32_t element = 123;
   1469   num_bytes = 1u * sizeof(int32_t);
   1470   ASSERT_EQ(MOJO_RESULT_OK, WriteData(&element, &num_bytes));
   1471 
   1472   // Wait for data.
   1473   // TODO(vtl): (See corresponding TODO in AllOrNone.)
   1474   hss = MojoHandleSignalsState();
   1475   ASSERT_EQ(MOJO_RESULT_OK,
   1476             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
   1477   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
   1478             hss.satisfied_signals);
   1479   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
   1480                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
   1481                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
   1482             hss.satisfiable_signals);
   1483 
   1484   // One element available.
   1485   num_bytes = 0u;
   1486   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1487   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
   1488 
   1489   // Try "ending" a two-phase read when one isn't active.
   1490   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndReadData(1u * sizeof(int32_t)));
   1491 
   1492   // Still one element available.
   1493   num_bytes = 0u;
   1494   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1495   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
   1496 
   1497   // Try ending a two-phase read with an invalid amount (too much).
   1498   num_bytes = 0u;
   1499   const void* read_ptr = nullptr;
   1500   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
   1501   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
   1502             EndReadData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
   1503 
   1504   // Still one element available.
   1505   num_bytes = 0u;
   1506   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1507   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
   1508 
   1509   // Try ending a two-phase read with an invalid amount (not a multiple of the
   1510   // element size).
   1511   num_bytes = 0u;
   1512   read_ptr = nullptr;
   1513   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
   1514   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
   1515   ASSERT_EQ(123, static_cast<const int32_t*>(read_ptr)[0]);
   1516   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndReadData(1u));
   1517 
   1518   // Still one element available.
   1519   num_bytes = 0u;
   1520   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
   1521   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
   1522 }
   1523 
   1524 // Test that a producer can be sent over a MP.
   1525 TEST_F(DataPipeTest, SendProducer) {
   1526   const char kTestData[] = "hello world";
   1527   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
   1528 
   1529   const MojoCreateDataPipeOptions options = {
   1530       kSizeOfOptions,                   // |struct_size|.
   1531       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
   1532       1u,                               // |element_num_bytes|.
   1533       1000u                             // |capacity_num_bytes|.
   1534   };
   1535   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1536   MojoHandleSignalsState hss;
   1537 
   1538   // Write some data.
   1539   uint32_t num_bytes = kTestDataSize;
   1540   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
   1541   ASSERT_EQ(kTestDataSize, num_bytes);
   1542 
   1543   // Wait for the data.
   1544   hss = MojoHandleSignalsState();
   1545   ASSERT_EQ(MOJO_RESULT_OK,
   1546             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
   1547   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
   1548             hss.satisfied_signals);
   1549   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
   1550                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
   1551                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
   1552             hss.satisfiable_signals);
   1553 
   1554   // Check the data.
   1555   const void* read_buffer = nullptr;
   1556   num_bytes = 0u;
   1557   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
   1558   ASSERT_EQ(0, memcmp(read_buffer, kTestData, kTestDataSize));
   1559   EndReadData(num_bytes);
   1560 
   1561   // Now send the producer over a MP so that it's serialized.
   1562   MojoHandle pipe0, pipe1;
   1563   ASSERT_EQ(MOJO_RESULT_OK, MojoCreateMessagePipe(nullptr, &pipe0, &pipe1));
   1564 
   1565   ASSERT_EQ(MOJO_RESULT_OK,
   1566             WriteMessageRaw(MessagePipeHandle(pipe0), nullptr, 0, &producer_, 1,
   1567                             MOJO_WRITE_MESSAGE_FLAG_NONE));
   1568   producer_ = MOJO_HANDLE_INVALID;
   1569   ASSERT_EQ(MOJO_RESULT_OK,
   1570             WaitForSignals(pipe1, MOJO_HANDLE_SIGNAL_READABLE, &hss));
   1571   ASSERT_EQ(MOJO_RESULT_OK, ReadEmptyMessageWithHandles(pipe1, &producer_, 1));
   1572 
   1573   // Write more data.
   1574   const char kExtraData[] = "bye world";
   1575   const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData));
   1576   num_bytes = kExtraDataSize;
   1577   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes));
   1578   ASSERT_EQ(kExtraDataSize, num_bytes);
   1579 
   1580   // Wait for it.
   1581   hss = MojoHandleSignalsState();
   1582   ASSERT_EQ(MOJO_RESULT_OK,
   1583             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
   1584   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
   1585             hss.satisfied_signals);
   1586   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
   1587                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
   1588                 MOJO_HANDLE_SIGNAL_PEER_REMOTE,
   1589             hss.satisfiable_signals);
   1590 
   1591   // Check the second write.
   1592   num_bytes = 0u;
   1593   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
   1594   ASSERT_EQ(0, memcmp(read_buffer, kExtraData, kExtraDataSize));
   1595   EndReadData(num_bytes);
   1596 
   1597   ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0));
   1598   ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1));
   1599 }
   1600 
   1601 // Ensures that if a data pipe consumer whose producer has closed is passed over
   1602 // a message pipe, the deserialized dispatcher is also marked as having a closed
   1603 // peer.
   1604 TEST_F(DataPipeTest, ConsumerWithClosedProducerSent) {
   1605   const MojoCreateDataPipeOptions options = {
   1606       kSizeOfOptions,                          // |struct_size|.
   1607       MOJO_CREATE_DATA_PIPE_FLAG_NONE,         // |flags|.
   1608       static_cast<uint32_t>(sizeof(int32_t)),  // |element_num_bytes|.
   1609       1000 * sizeof(int32_t)                   // |capacity_num_bytes|.
   1610   };
   1611 
   1612   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1613 
   1614   // We can write to a data pipe handle immediately.
   1615   int32_t data = 123;
   1616   uint32_t num_bytes = sizeof(data);
   1617   ASSERT_EQ(MOJO_RESULT_OK, WriteData(&data, &num_bytes));
   1618   ASSERT_EQ(MOJO_RESULT_OK, CloseProducer());
   1619 
   1620   // Now wait for the other side to become readable and to see the peer closed.
   1621   MojoHandleSignalsState state;
   1622   ASSERT_EQ(MOJO_RESULT_OK,
   1623             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &state));
   1624   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
   1625                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
   1626             state.satisfied_signals);
   1627   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
   1628                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
   1629             state.satisfiable_signals);
   1630 
   1631   // Now send the consumer over a MP so that it's serialized.
   1632   MojoHandle pipe0, pipe1;
   1633   ASSERT_EQ(MOJO_RESULT_OK, MojoCreateMessagePipe(nullptr, &pipe0, &pipe1));
   1634 
   1635   ASSERT_EQ(MOJO_RESULT_OK,
   1636             WriteMessageRaw(MessagePipeHandle(pipe0), nullptr, 0, &consumer_, 1,
   1637                             MOJO_WRITE_MESSAGE_FLAG_NONE));
   1638   consumer_ = MOJO_HANDLE_INVALID;
   1639   ASSERT_EQ(MOJO_RESULT_OK,
   1640             WaitForSignals(pipe1, MOJO_HANDLE_SIGNAL_READABLE, &state));
   1641   ASSERT_EQ(MOJO_RESULT_OK, ReadEmptyMessageWithHandles(pipe1, &consumer_, 1));
   1642 
   1643   ASSERT_EQ(MOJO_RESULT_OK,
   1644             WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &state));
   1645   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
   1646                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
   1647             state.satisfied_signals);
   1648   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
   1649                 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
   1650             state.satisfiable_signals);
   1651 
   1652   int32_t read_data;
   1653   ASSERT_EQ(MOJO_RESULT_OK, ReadData(&read_data, &num_bytes));
   1654   ASSERT_EQ(sizeof(read_data), num_bytes);
   1655   ASSERT_EQ(data, read_data);
   1656 
   1657   ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0));
   1658   ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1));
   1659 }
   1660 
   1661 bool WriteAllData(MojoHandle producer,
   1662                   const void* elements,
   1663                   uint32_t num_bytes) {
   1664   for (size_t i = 0; i < kMaxPoll; i++) {
   1665     // Write as much data as we can.
   1666     uint32_t write_bytes = num_bytes;
   1667     MojoResult result =
   1668         MojoWriteData(producer, elements, &write_bytes, nullptr);
   1669     if (result == MOJO_RESULT_OK) {
   1670       num_bytes -= write_bytes;
   1671       elements = static_cast<const uint8_t*>(elements) + write_bytes;
   1672       if (num_bytes == 0)
   1673         return true;
   1674     } else {
   1675       EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result);
   1676     }
   1677 
   1678     MojoHandleSignalsState hss = MojoHandleSignalsState();
   1679     EXPECT_EQ(MOJO_RESULT_OK, test::MojoTestBase::WaitForSignals(
   1680                                   producer, MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
   1681     EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_WRITABLE);
   1682     EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
   1683                   MOJO_HANDLE_SIGNAL_PEER_REMOTE,
   1684               hss.satisfiable_signals);
   1685   }
   1686 
   1687   return false;
   1688 }
   1689 
   1690 // If |expect_empty| is true, expect |consumer| to be empty after reading.
   1691 bool ReadAllData(MojoHandle consumer,
   1692                  void* elements,
   1693                  uint32_t num_bytes,
   1694                  bool expect_empty) {
   1695   for (size_t i = 0; i < kMaxPoll; i++) {
   1696     // Read as much data as we can.
   1697     uint32_t read_bytes = num_bytes;
   1698     MojoResult result = MojoReadData(consumer, nullptr, elements, &read_bytes);
   1699     if (result == MOJO_RESULT_OK) {
   1700       num_bytes -= read_bytes;
   1701       elements = static_cast<uint8_t*>(elements) + read_bytes;
   1702       if (num_bytes == 0) {
   1703         if (expect_empty) {
   1704           // Expect no more data.
   1705           test::Sleep(test::TinyDeadline());
   1706           MojoReadDataOptions options;
   1707           options.struct_size = sizeof(options);
   1708           options.flags = MOJO_READ_DATA_FLAG_QUERY;
   1709           MojoReadData(consumer, &options, nullptr, &num_bytes);
   1710           EXPECT_EQ(0u, num_bytes);
   1711         }
   1712         return true;
   1713       }
   1714     } else {
   1715       EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result);
   1716     }
   1717 
   1718     MojoHandleSignalsState hss = MojoHandleSignalsState();
   1719     EXPECT_EQ(MOJO_RESULT_OK, test::MojoTestBase::WaitForSignals(
   1720                                   consumer, MOJO_HANDLE_SIGNAL_READABLE, &hss));
   1721     // Peer could have become closed while we're still waiting for data.
   1722     EXPECT_TRUE(MOJO_HANDLE_SIGNAL_READABLE & hss.satisfied_signals);
   1723     EXPECT_TRUE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE);
   1724     EXPECT_TRUE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED);
   1725   }
   1726 
   1727   return num_bytes == 0;
   1728 }
   1729 
   1730 #if !defined(OS_IOS)
   1731 
   1732 TEST_F(DataPipeTest, Multiprocess) {
   1733   const uint32_t kTestDataSize =
   1734       static_cast<uint32_t>(sizeof(kMultiprocessTestData));
   1735   const MojoCreateDataPipeOptions options = {
   1736       kSizeOfOptions,                   // |struct_size|.
   1737       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
   1738       1,                                // |element_num_bytes|.
   1739       kMultiprocessCapacity             // |capacity_num_bytes|.
   1740   };
   1741   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
   1742 
   1743   RunTestClient("MultiprocessClient", [&](MojoHandle server_mp) {
   1744     // Send some data before serialising and sending the data pipe over.
   1745     // This is the first write so we don't need to use WriteAllData.
   1746     uint32_t num_bytes = kTestDataSize;
   1747     ASSERT_EQ(MOJO_RESULT_OK, WriteData(kMultiprocessTestData, &num_bytes,
   1748                                         MOJO_WRITE_DATA_FLAG_ALL_OR_NONE));
   1749     ASSERT_EQ(kTestDataSize, num_bytes);
   1750 
   1751     // Send child process the data pipe.
   1752     ASSERT_EQ(MOJO_RESULT_OK,
   1753               WriteMessageRaw(MessagePipeHandle(server_mp), nullptr, 0,
   1754                               &consumer_, 1, MOJO_WRITE_MESSAGE_FLAG_NONE));
   1755 
   1756     // Send a bunch of data of varying sizes.
   1757     uint8_t buffer[100];
   1758     int seq = 0;
   1759     for (int i = 0; i < kMultiprocessMaxIter; ++i) {
   1760       for (uint32_t size = 1; size <= kMultiprocessCapacity; size++) {
   1761         for (unsigned int j = 0; j < size; ++j)
   1762           buffer[j] = seq + j;
   1763         EXPECT_TRUE(WriteAllData(producer_, buffer, size));
   1764         seq += size;
   1765       }
   1766     }
   1767 
   1768     // Write the test string in again.
   1769     ASSERT_TRUE(WriteAllData(producer_, kMultiprocessTestData, kTestDataSize));
   1770 
   1771     // Swap ends.
   1772     ASSERT_EQ(MOJO_RESULT_OK,
   1773               WriteMessageRaw(MessagePipeHandle(server_mp), nullptr, 0,
   1774                               &producer_, 1, MOJO_WRITE_MESSAGE_FLAG_NONE));
   1775 
   1776     // Receive the consumer from the other side.
   1777     producer_ = MOJO_HANDLE_INVALID;
   1778     MojoHandleSignalsState hss = MojoHandleSignalsState();
   1779     ASSERT_EQ(MOJO_RESULT_OK,
   1780               WaitForSignals(server_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss));
   1781     ASSERT_EQ(MOJO_RESULT_OK,
   1782               ReadEmptyMessageWithHandles(server_mp, &consumer_, 1));
   1783 
   1784     // Read the test string twice. Once for when we sent it, and once for the
   1785     // other end sending it.
   1786     for (int i = 0; i < 2; ++i) {
   1787       EXPECT_TRUE(ReadAllData(consumer_, buffer, kTestDataSize, i == 1));
   1788       EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize));
   1789     }
   1790 
   1791     WriteMessage(server_mp, "quit");
   1792 
   1793     // Don't have to close the consumer here because it will be done for us.
   1794   });
   1795 }
   1796 
   1797 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessClient, DataPipeTest, client_mp) {
   1798   const uint32_t kTestDataSize =
   1799       static_cast<uint32_t>(sizeof(kMultiprocessTestData));
   1800 
   1801   // Receive the data pipe from the other side.
   1802   MojoHandle consumer = MOJO_HANDLE_INVALID;
   1803   MojoHandleSignalsState hss = MojoHandleSignalsState();
   1804   ASSERT_EQ(MOJO_RESULT_OK,
   1805             WaitForSignals(client_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss));
   1806   ASSERT_EQ(MOJO_RESULT_OK,
   1807             ReadEmptyMessageWithHandles(client_mp, &consumer, 1));
   1808 
   1809   // Read the initial string that was sent.
   1810   int32_t buffer[100];
   1811   EXPECT_TRUE(ReadAllData(consumer, buffer, kTestDataSize, false));
   1812   EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize));
   1813 
   1814   // Receive the main data and check it is correct.
   1815   int seq = 0;
   1816   uint8_t expected_buffer[100];
   1817   for (int i = 0; i < kMultiprocessMaxIter; ++i) {
   1818     for (uint32_t size = 1; size <= kMultiprocessCapacity; ++size) {
   1819       for (unsigned int j = 0; j < size; ++j)
   1820         expected_buffer[j] = seq + j;
   1821       EXPECT_TRUE(ReadAllData(consumer, buffer, size, false));
   1822       EXPECT_EQ(0, memcmp(buffer, expected_buffer, size));
   1823 
   1824       seq += size;
   1825     }
   1826   }
   1827 
   1828   // Swap ends.
   1829   ASSERT_EQ(MOJO_RESULT_OK,
   1830             WriteMessageRaw(MessagePipeHandle(client_mp), nullptr, 0, &consumer,
   1831                             1, MOJO_WRITE_MESSAGE_FLAG_NONE));
   1832 
   1833   // Receive the producer from the other side.
   1834   MojoHandle producer = MOJO_HANDLE_INVALID;
   1835   hss = MojoHandleSignalsState();
   1836   ASSERT_EQ(MOJO_RESULT_OK,
   1837             WaitForSignals(client_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss));
   1838   ASSERT_EQ(MOJO_RESULT_OK,
   1839             ReadEmptyMessageWithHandles(client_mp, &producer, 1));
   1840 
   1841   // Write the test string one more time.
   1842   EXPECT_TRUE(WriteAllData(producer, kMultiprocessTestData, kTestDataSize));
   1843 
   1844   // We swapped ends, so close the producer.
   1845   EXPECT_EQ(MOJO_RESULT_OK, MojoClose(producer));
   1846 
   1847   // Wait to receive a "quit" message before exiting.
   1848   EXPECT_EQ("quit", ReadMessage(client_mp));
   1849 }
   1850 
   1851 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(WriteAndCloseProducer, DataPipeTest, h) {
   1852   MojoHandle p;
   1853   std::string message = ReadMessageWithHandles(h, &p, 1);
   1854 
   1855   // Write some data to the producer and close it.
   1856   uint32_t num_bytes = static_cast<uint32_t>(message.size());
   1857   EXPECT_EQ(MOJO_RESULT_OK,
   1858             MojoWriteData(p, message.data(), &num_bytes, nullptr));
   1859   EXPECT_EQ(num_bytes, static_cast<uint32_t>(message.size()));
   1860 
   1861   // Close the producer before quitting.
   1862   EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p));
   1863 
   1864   // Wait for a quit message.
   1865   EXPECT_EQ("quit", ReadMessage(h));
   1866 }
   1867 
   1868 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReadAndCloseConsumer, DataPipeTest, h) {
   1869   MojoHandle c;
   1870   std::string expected_message = ReadMessageWithHandles(h, &c, 1);
   1871 
   1872   // Wait for the consumer to become readable.
   1873   EXPECT_EQ(MOJO_RESULT_OK, WaitForSignals(c, MOJO_HANDLE_SIGNAL_READABLE));
   1874 
   1875   // Drain the consumer and expect to find the given message.
   1876   uint32_t num_bytes = static_cast<uint32_t>(expected_message.size());
   1877   std::vector<char> bytes(expected_message.size());
   1878   EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(c, nullptr, bytes.data(), &num_bytes));
   1879   EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size()));
   1880 
   1881   std::string message(bytes.data(), bytes.size());
   1882   EXPECT_EQ(expected_message, message);
   1883 
   1884   EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c));
   1885 
   1886   // Wait for a quit message.
   1887   EXPECT_EQ("quit", ReadMessage(h));
   1888 }
   1889 
   1890 TEST_F(DataPipeTest, SendConsumerAndCloseProducer) {
   1891   // Create a new data pipe.
   1892   MojoHandle p, c;
   1893   EXPECT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(nullptr, &p, &c));
   1894 
   1895   RunTestClient("WriteAndCloseProducer", [&](MojoHandle producer_client) {
   1896     RunTestClient("ReadAndCloseConsumer", [&](MojoHandle consumer_client) {
   1897       const std::string kMessage = "Hello, world!";
   1898       WriteMessageWithHandles(producer_client, kMessage, &p, 1);
   1899       WriteMessageWithHandles(consumer_client, kMessage, &c, 1);
   1900 
   1901       WriteMessage(consumer_client, "quit");
   1902     });
   1903 
   1904     WriteMessage(producer_client, "quit");
   1905   });
   1906 }
   1907 
   1908 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndWrite, DataPipeTest, h) {
   1909   const MojoCreateDataPipeOptions options = {
   1910       kSizeOfOptions,                   // |struct_size|.
   1911       MOJO_CREATE_DATA_PIPE_FLAG_NONE,  // |flags|.
   1912       1,                                // |element_num_bytes|.
   1913       kMultiprocessCapacity             // |capacity_num_bytes|.
   1914   };
   1915 
   1916   MojoHandle p, c;
   1917   ASSERT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(&options, &p, &c));
   1918 
   1919   const std::string kMessage = "Hello, world!";
   1920   WriteMessageWithHandles(h, kMessage, &c, 1);
   1921 
   1922   // Write some data to the producer and close it.
   1923   uint32_t num_bytes = static_cast<uint32_t>(kMessage.size());
   1924   EXPECT_EQ(MOJO_RESULT_OK,
   1925             MojoWriteData(p, kMessage.data(), &num_bytes, nullptr));
   1926   EXPECT_EQ(num_bytes, static_cast<uint32_t>(kMessage.size()));
   1927   EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p));
   1928 
   1929   // Wait for a quit message.
   1930   EXPECT_EQ("quit", ReadMessage(h));
   1931 }
   1932 
   1933 TEST_F(DataPipeTest, CreateInChild) {
   1934   RunTestClient("CreateAndWrite", [&](MojoHandle child) {
   1935     MojoHandle c;
   1936     std::string expected_message = ReadMessageWithHandles(child, &c, 1);
   1937 
   1938     // Wait for the consumer to become readable.
   1939     EXPECT_EQ(MOJO_RESULT_OK, WaitForSignals(c, MOJO_HANDLE_SIGNAL_READABLE));
   1940 
   1941     // Drain the consumer and expect to find the given message.
   1942     uint32_t num_bytes = static_cast<uint32_t>(expected_message.size());
   1943     std::vector<char> bytes(expected_message.size());
   1944     EXPECT_EQ(MOJO_RESULT_OK,
   1945               MojoReadData(c, nullptr, bytes.data(), &num_bytes));
   1946     EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size()));
   1947 
   1948     std::string message(bytes.data(), bytes.size());
   1949     EXPECT_EQ(expected_message, message);
   1950 
   1951     EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c));
   1952     WriteMessage(child, "quit");
   1953   });
   1954 }
   1955 
   1956 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(DataPipeStatusChangeInTransitClient,
   1957                                   DataPipeTest,
   1958                                   parent) {
   1959   // This test verifies that peer closure is detectable through various
   1960   // mechanisms when it races with handle transfer.
   1961 
   1962   MojoHandle handles[6];
   1963   EXPECT_EQ("o_O", ReadMessageWithHandles(parent, handles, 6));
   1964   MojoHandle* producers = &handles[0];
   1965   MojoHandle* consumers = &handles[3];
   1966 
   1967   // Wait on producer 0
   1968   EXPECT_EQ(MOJO_RESULT_OK,
   1969             WaitForSignals(producers[0], MOJO_HANDLE_SIGNAL_PEER_CLOSED));
   1970 
   1971   // Wait on consumer 0
   1972   EXPECT_EQ(MOJO_RESULT_OK,
   1973             WaitForSignals(consumers[0], MOJO_HANDLE_SIGNAL_PEER_CLOSED));
   1974 
   1975   base::MessageLoop message_loop;
   1976 
   1977   // Wait on producer 1 and consumer 1 using SimpleWatchers.
   1978   {
   1979     base::RunLoop run_loop;
   1980     int count = 0;
   1981     auto callback = base::Bind(
   1982         [](base::RunLoop* loop, int* count, MojoResult result) {
   1983           EXPECT_EQ(MOJO_RESULT_OK, result);
   1984           if (++*count == 2)
   1985             loop->Quit();
   1986         },
   1987         &run_loop, &count);
   1988     SimpleWatcher producer_watcher(FROM_HERE,
   1989                                    SimpleWatcher::ArmingPolicy::AUTOMATIC,
   1990                                    base::SequencedTaskRunnerHandle::Get());
   1991     SimpleWatcher consumer_watcher(FROM_HERE,
   1992                                    SimpleWatcher::ArmingPolicy::AUTOMATIC,
   1993                                    base::SequencedTaskRunnerHandle::Get());
   1994     producer_watcher.Watch(Handle(producers[1]), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1995                            callback);
   1996     consumer_watcher.Watch(Handle(consumers[1]), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
   1997                            callback);
   1998     run_loop.Run();
   1999     EXPECT_EQ(2, count);
   2000   }
   2001 
   2002   // Wait on producer 2 by polling with MojoWriteData.
   2003   MojoResult result;
   2004   do {
   2005     uint32_t num_bytes = 0;
   2006     result = MojoWriteData(producers[2], nullptr, &num_bytes, nullptr);
   2007   } while (result == MOJO_RESULT_OK);
   2008   EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
   2009 
   2010   // Wait on consumer 2 by polling with MojoReadData.
   2011   do {
   2012     char byte;
   2013     uint32_t num_bytes = 1;
   2014     result = MojoReadData(consumers[2], nullptr, &byte, &num_bytes);
   2015   } while (result == MOJO_RESULT_SHOULD_WAIT);
   2016   EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
   2017 
   2018   for (size_t i = 0; i < 6; ++i)
   2019     CloseHandle(handles[i]);
   2020 }
   2021 
   2022 TEST_F(DataPipeTest, StatusChangeInTransit) {
   2023   MojoHandle producers[6];
   2024   MojoHandle consumers[6];
   2025   for (size_t i = 0; i < 6; ++i)
   2026     CreateDataPipe(&producers[i], &consumers[i], 1);
   2027 
   2028   RunTestClient("DataPipeStatusChangeInTransitClient", [&](MojoHandle child) {
   2029     MojoHandle handles[] = {producers[0], producers[1], producers[2],
   2030                             consumers[3], consumers[4], consumers[5]};
   2031 
   2032     // Send 3 producers and 3 consumers, and let their transfer race with their
   2033     // peers' closure.
   2034     WriteMessageWithHandles(child, "o_O", handles, 6);
   2035 
   2036     for (size_t i = 0; i < 3; ++i)
   2037       CloseHandle(consumers[i]);
   2038     for (size_t i = 3; i < 6; ++i)
   2039       CloseHandle(producers[i]);
   2040   });
   2041 }
   2042 
   2043 #endif  // !defined(OS_IOS)
   2044 
   2045 }  // namespace
   2046 }  // namespace core
   2047 }  // namespace mojo
   2048