1 // Copyright 2015 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include <stddef.h> 6 #include <stdint.h> 7 8 #include <memory> 9 10 #include "base/bind.h" 11 #include "base/location.h" 12 #include "base/logging.h" 13 #include "base/macros.h" 14 #include "base/message_loop/message_loop.h" 15 #include "base/run_loop.h" 16 #include "build/build_config.h" 17 #include "mojo/core/embedder/embedder.h" 18 #include "mojo/core/test/mojo_test_base.h" 19 #include "mojo/core/test_utils.h" 20 #include "mojo/public/c/system/data_pipe.h" 21 #include "mojo/public/c/system/functions.h" 22 #include "mojo/public/c/system/message_pipe.h" 23 #include "mojo/public/cpp/system/message_pipe.h" 24 #include "mojo/public/cpp/system/simple_watcher.h" 25 #include "testing/gtest/include/gtest/gtest.h" 26 27 namespace mojo { 28 namespace core { 29 namespace { 30 31 const uint32_t kSizeOfOptions = 32 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)); 33 34 // In various places, we have to poll (since, e.g., we can't yet wait for a 35 // certain amount of data to be available). This is the maximum number of 36 // iterations (separated by a short sleep). 37 // TODO(vtl): Get rid of this. 38 const size_t kMaxPoll = 100; 39 40 // Used in Multiprocess test. 41 const size_t kMultiprocessCapacity = 37; 42 const char kMultiprocessTestData[] = "hello i'm a string that is 36 bytes"; 43 const int kMultiprocessMaxIter = 5; 44 45 // TODO(rockot): There are many uses of ASSERT where EXPECT would be more 46 // appropriate. Fix this. 47 48 class DataPipeTest : public test::MojoTestBase { 49 public: 50 DataPipeTest() 51 : producer_(MOJO_HANDLE_INVALID), consumer_(MOJO_HANDLE_INVALID) {} 52 53 ~DataPipeTest() override { 54 if (producer_ != MOJO_HANDLE_INVALID) 55 CHECK_EQ(MOJO_RESULT_OK, MojoClose(producer_)); 56 if (consumer_ != MOJO_HANDLE_INVALID) 57 CHECK_EQ(MOJO_RESULT_OK, MojoClose(consumer_)); 58 } 59 60 MojoResult ReadEmptyMessageWithHandles(MojoHandle pipe, 61 MojoHandle* out_handles, 62 uint32_t num_handles) { 63 std::vector<uint8_t> bytes; 64 std::vector<ScopedHandle> handles; 65 MojoResult rv = ReadMessageRaw(MessagePipeHandle(pipe), &bytes, &handles, 66 MOJO_READ_MESSAGE_FLAG_NONE); 67 if (rv == MOJO_RESULT_OK) { 68 CHECK_EQ(0u, bytes.size()); 69 CHECK_EQ(num_handles, handles.size()); 70 for (size_t i = 0; i < num_handles; ++i) 71 out_handles[i] = handles[i].release().value(); 72 } 73 return rv; 74 } 75 76 MojoResult Create(const MojoCreateDataPipeOptions* options) { 77 return MojoCreateDataPipe(options, &producer_, &consumer_); 78 } 79 80 MojoResult WriteData(const void* elements, 81 uint32_t* num_bytes, 82 bool all_or_none = false) { 83 MojoWriteDataOptions options; 84 options.struct_size = sizeof(options); 85 options.flags = all_or_none ? MOJO_WRITE_DATA_FLAG_ALL_OR_NONE 86 : MOJO_WRITE_DATA_FLAG_NONE; 87 return MojoWriteData(producer_, elements, num_bytes, &options); 88 } 89 90 MojoResult ReadData(void* elements, 91 uint32_t* num_bytes, 92 bool all_or_none = false, 93 bool peek = false) { 94 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE; 95 if (all_or_none) 96 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE; 97 if (peek) 98 flags |= MOJO_READ_DATA_FLAG_PEEK; 99 100 MojoReadDataOptions options; 101 options.struct_size = sizeof(options); 102 options.flags = flags; 103 return MojoReadData(consumer_, &options, elements, num_bytes); 104 } 105 106 MojoResult QueryData(uint32_t* num_bytes) { 107 MojoReadDataOptions options; 108 options.struct_size = sizeof(options); 109 options.flags = MOJO_READ_DATA_FLAG_QUERY; 110 return MojoReadData(consumer_, &options, nullptr, num_bytes); 111 } 112 113 MojoResult DiscardData(uint32_t* num_bytes, bool all_or_none = false) { 114 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_DISCARD; 115 if (all_or_none) 116 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE; 117 MojoReadDataOptions options; 118 options.struct_size = sizeof(options); 119 options.flags = flags; 120 return MojoReadData(consumer_, &options, nullptr, num_bytes); 121 } 122 123 MojoResult BeginReadData(const void** elements, uint32_t* num_bytes) { 124 return MojoBeginReadData(consumer_, nullptr, elements, num_bytes); 125 } 126 127 MojoResult EndReadData(uint32_t num_bytes_read) { 128 return MojoEndReadData(consumer_, num_bytes_read, nullptr); 129 } 130 131 MojoResult BeginWriteData(void** elements, uint32_t* num_bytes) { 132 return MojoBeginWriteData(producer_, nullptr, elements, num_bytes); 133 } 134 135 MojoResult EndWriteData(uint32_t num_bytes_written) { 136 return MojoEndWriteData(producer_, num_bytes_written, nullptr); 137 } 138 139 MojoResult CloseProducer() { 140 MojoResult rv = MojoClose(producer_); 141 producer_ = MOJO_HANDLE_INVALID; 142 return rv; 143 } 144 145 MojoResult CloseConsumer() { 146 MojoResult rv = MojoClose(consumer_); 147 consumer_ = MOJO_HANDLE_INVALID; 148 return rv; 149 } 150 151 MojoHandle producer_, consumer_; 152 153 private: 154 DISALLOW_COPY_AND_ASSIGN(DataPipeTest); 155 }; 156 157 TEST_F(DataPipeTest, Basic) { 158 const MojoCreateDataPipeOptions options = { 159 kSizeOfOptions, // |struct_size|. 160 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 161 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 162 1000 * sizeof(int32_t) // |capacity_num_bytes|. 163 }; 164 165 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 166 167 // We can write to a data pipe handle immediately. 168 int32_t elements[10] = {}; 169 uint32_t num_bytes = 0; 170 171 num_bytes = static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0])); 172 173 elements[0] = 123; 174 elements[1] = 456; 175 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 176 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&elements[0], &num_bytes)); 177 178 // Now wait for the other side to become readable. 179 MojoHandleSignalsState state; 180 ASSERT_EQ(MOJO_RESULT_OK, 181 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &state)); 182 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 183 state.satisfied_signals); 184 185 elements[0] = -1; 186 elements[1] = -1; 187 ASSERT_EQ(MOJO_RESULT_OK, ReadData(&elements[0], &num_bytes)); 188 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes); 189 ASSERT_EQ(elements[0], 123); 190 ASSERT_EQ(elements[1], 456); 191 } 192 193 // Tests creation of data pipes with various (valid) options. 194 TEST_F(DataPipeTest, CreateAndMaybeTransfer) { 195 MojoCreateDataPipeOptions test_options[] = { 196 // Default options. 197 {}, 198 // Trivial element size, non-default capacity. 199 {kSizeOfOptions, // |struct_size|. 200 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 201 1, // |element_num_bytes|. 202 1000}, // |capacity_num_bytes|. 203 // Nontrivial element size, non-default capacity. 204 {kSizeOfOptions, // |struct_size|. 205 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 206 4, // |element_num_bytes|. 207 4000}, // |capacity_num_bytes|. 208 // Nontrivial element size, default capacity. 209 {kSizeOfOptions, // |struct_size|. 210 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 211 100, // |element_num_bytes|. 212 0} // |capacity_num_bytes|. 213 }; 214 for (size_t i = 0; i < arraysize(test_options); i++) { 215 MojoHandle producer_handle, consumer_handle; 216 MojoCreateDataPipeOptions* options = i ? &test_options[i] : nullptr; 217 ASSERT_EQ(MOJO_RESULT_OK, 218 MojoCreateDataPipe(options, &producer_handle, &consumer_handle)); 219 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(producer_handle)); 220 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(consumer_handle)); 221 } 222 } 223 224 TEST_F(DataPipeTest, SimpleReadWrite) { 225 const MojoCreateDataPipeOptions options = { 226 kSizeOfOptions, // |struct_size|. 227 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 228 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 229 1000 * sizeof(int32_t) // |capacity_num_bytes|. 230 }; 231 232 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 233 MojoHandleSignalsState hss; 234 235 int32_t elements[10] = {}; 236 uint32_t num_bytes = 0; 237 238 // Try reading; nothing there yet. 239 num_bytes = static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0])); 240 ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, ReadData(elements, &num_bytes)); 241 242 // Query; nothing there yet. 243 num_bytes = 0; 244 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 245 ASSERT_EQ(0u, num_bytes); 246 247 // Discard; nothing there yet. 248 num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0])); 249 ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, DiscardData(&num_bytes)); 250 251 // Read with invalid |num_bytes|. 252 num_bytes = sizeof(elements[0]) + 1; 253 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, ReadData(elements, &num_bytes)); 254 255 // Write two elements. 256 elements[0] = 123; 257 elements[1] = 456; 258 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 259 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes)); 260 // It should have written everything (even without "all or none"). 261 ASSERT_EQ(2u * sizeof(elements[0]), num_bytes); 262 263 // Wait. 264 ASSERT_EQ(MOJO_RESULT_OK, 265 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 266 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 267 hss.satisfied_signals); 268 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 269 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 270 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 271 hss.satisfiable_signals); 272 273 // Query. 274 // TODO(vtl): It's theoretically possible (though not with the current 275 // implementation/configured limits) that not all the data has arrived yet. 276 // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...| 277 // or |2 * ...|.) 278 num_bytes = 0; 279 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 280 ASSERT_EQ(2 * sizeof(elements[0]), num_bytes); 281 282 // Read one element. 283 elements[0] = -1; 284 elements[1] = -1; 285 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 286 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes)); 287 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes); 288 ASSERT_EQ(123, elements[0]); 289 ASSERT_EQ(-1, elements[1]); 290 291 // Query. 292 // TODO(vtl): See previous TODO. (If we got 2 elements there, however, we 293 // should get 1 here.) 294 num_bytes = 0; 295 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 296 ASSERT_EQ(1 * sizeof(elements[0]), num_bytes); 297 298 // Peek one element. 299 elements[0] = -1; 300 elements[1] = -1; 301 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 302 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, true)); 303 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes); 304 ASSERT_EQ(456, elements[0]); 305 ASSERT_EQ(-1, elements[1]); 306 307 // Query. Still has 1 element remaining. 308 num_bytes = 0; 309 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 310 ASSERT_EQ(1 * sizeof(elements[0]), num_bytes); 311 312 // Try to read two elements, with "all or none". 313 elements[0] = -1; 314 elements[1] = -1; 315 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 316 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, 317 ReadData(elements, &num_bytes, true, false)); 318 ASSERT_EQ(-1, elements[0]); 319 ASSERT_EQ(-1, elements[1]); 320 321 // Try to read two elements, without "all or none". 322 elements[0] = -1; 323 elements[1] = -1; 324 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 325 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, false)); 326 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes); 327 ASSERT_EQ(456, elements[0]); 328 ASSERT_EQ(-1, elements[1]); 329 330 // Query. 331 num_bytes = 0; 332 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 333 ASSERT_EQ(0u, num_bytes); 334 } 335 336 // Note: The "basic" waiting tests test that the "wait states" are correct in 337 // various situations; they don't test that waiters are properly awoken on state 338 // changes. (For that, we need to use multiple threads.) 339 TEST_F(DataPipeTest, BasicProducerWaiting) { 340 // Note: We take advantage of the fact that current for current 341 // implementations capacities are strict maximums. This is not guaranteed by 342 // the API. 343 344 const MojoCreateDataPipeOptions options = { 345 kSizeOfOptions, // |struct_size|. 346 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 347 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 348 2 * sizeof(int32_t) // |capacity_num_bytes|. 349 }; 350 Create(&options); 351 MojoHandleSignalsState hss; 352 353 // Never readable. Already writable. 354 hss = GetSignalsState(producer_); 355 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); 356 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 357 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 358 hss.satisfiable_signals); 359 360 // Write two elements. 361 int32_t elements[2] = {123, 456}; 362 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 363 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true)); 364 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes); 365 366 // Wait for data to become available to the consumer. 367 ASSERT_EQ(MOJO_RESULT_OK, 368 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 369 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 370 hss.satisfied_signals); 371 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 372 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 373 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 374 hss.satisfiable_signals); 375 376 // Peek one element. 377 elements[0] = -1; 378 elements[1] = -1; 379 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 380 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true)); 381 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 382 ASSERT_EQ(123, elements[0]); 383 ASSERT_EQ(-1, elements[1]); 384 385 // Read one element. 386 elements[0] = -1; 387 elements[1] = -1; 388 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 389 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, false)); 390 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 391 ASSERT_EQ(123, elements[0]); 392 ASSERT_EQ(-1, elements[1]); 393 394 // Try writing, using a two-phase write. 395 void* buffer = nullptr; 396 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0])); 397 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes)); 398 EXPECT_TRUE(buffer); 399 ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0]))); 400 401 static_cast<int32_t*>(buffer)[0] = 789; 402 ASSERT_EQ(MOJO_RESULT_OK, 403 EndWriteData(static_cast<uint32_t>(1u * sizeof(elements[0])))); 404 405 // Read one element, using a two-phase read. 406 const void* read_buffer = nullptr; 407 num_bytes = 0u; 408 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes)); 409 EXPECT_TRUE(read_buffer); 410 // The two-phase read should be able to read at least one element. 411 ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0]))); 412 ASSERT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]); 413 ASSERT_EQ(MOJO_RESULT_OK, 414 EndReadData(static_cast<uint32_t>(1u * sizeof(elements[0])))); 415 416 // Write one element. 417 elements[0] = 123; 418 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 419 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes)); 420 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 421 422 // Close the consumer. 423 CloseConsumer(); 424 425 // It should now be never-writable. 426 hss = MojoHandleSignalsState(); 427 ASSERT_EQ(MOJO_RESULT_OK, 428 WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss)); 429 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 430 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 431 } 432 433 TEST_F(DataPipeTest, PeerClosedProducerWaiting) { 434 const MojoCreateDataPipeOptions options = { 435 kSizeOfOptions, // |struct_size|. 436 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 437 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 438 2 * sizeof(int32_t) // |capacity_num_bytes|. 439 }; 440 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 441 MojoHandleSignalsState hss; 442 443 // Close the consumer. 444 CloseConsumer(); 445 446 // It should be signaled. 447 hss = MojoHandleSignalsState(); 448 ASSERT_EQ(MOJO_RESULT_OK, 449 WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss)); 450 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 451 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 452 } 453 454 TEST_F(DataPipeTest, PeerClosedConsumerWaiting) { 455 const MojoCreateDataPipeOptions options = { 456 kSizeOfOptions, // |struct_size|. 457 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 458 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 459 2 * sizeof(int32_t) // |capacity_num_bytes|. 460 }; 461 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 462 MojoHandleSignalsState hss; 463 464 // Close the producer. 465 CloseProducer(); 466 467 // It should be signaled. 468 hss = MojoHandleSignalsState(); 469 ASSERT_EQ(MOJO_RESULT_OK, 470 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss)); 471 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 472 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 473 } 474 475 TEST_F(DataPipeTest, BasicConsumerWaiting) { 476 const MojoCreateDataPipeOptions options = { 477 kSizeOfOptions, // |struct_size|. 478 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 479 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 480 1000 * sizeof(int32_t) // |capacity_num_bytes|. 481 }; 482 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 483 MojoHandleSignalsState hss; 484 485 // Never writable. 486 hss = MojoHandleSignalsState(); 487 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 488 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_WRITABLE, &hss)); 489 EXPECT_EQ(0u, hss.satisfied_signals); 490 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 491 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 492 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 493 hss.satisfiable_signals); 494 495 // Write two elements. 496 int32_t elements[2] = {123, 456}; 497 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 498 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true)); 499 500 // Wait for readability. 501 hss = MojoHandleSignalsState(); 502 ASSERT_EQ(MOJO_RESULT_OK, 503 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 504 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 505 hss.satisfied_signals); 506 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 507 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 508 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 509 hss.satisfiable_signals); 510 511 // Discard one element. 512 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 513 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true)); 514 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 515 516 // Should still be readable. 517 hss = MojoHandleSignalsState(); 518 ASSERT_EQ(MOJO_RESULT_OK, 519 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 520 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 521 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 522 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 523 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 524 hss.satisfiable_signals); 525 526 // Peek one element. 527 elements[0] = -1; 528 elements[1] = -1; 529 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 530 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true)); 531 ASSERT_EQ(456, elements[0]); 532 ASSERT_EQ(-1, elements[1]); 533 534 // Should still be readable. 535 hss = MojoHandleSignalsState(); 536 ASSERT_EQ(MOJO_RESULT_OK, 537 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 538 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 539 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 540 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 541 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 542 hss.satisfiable_signals); 543 544 // Read one element. 545 elements[0] = -1; 546 elements[1] = -1; 547 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 548 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true)); 549 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 550 ASSERT_EQ(456, elements[0]); 551 ASSERT_EQ(-1, elements[1]); 552 553 // Write one element. 554 elements[0] = 789; 555 elements[1] = -1; 556 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 557 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true)); 558 559 // Waiting should now succeed. 560 hss = MojoHandleSignalsState(); 561 ASSERT_EQ(MOJO_RESULT_OK, 562 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 563 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 564 hss.satisfied_signals); 565 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 566 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 567 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 568 hss.satisfiable_signals); 569 570 // Close the producer. 571 CloseProducer(); 572 573 // Should still be readable. 574 hss = MojoHandleSignalsState(); 575 ASSERT_EQ(MOJO_RESULT_OK, 576 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 577 EXPECT_TRUE(hss.satisfied_signals & (MOJO_HANDLE_SIGNAL_READABLE | 578 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE)); 579 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 580 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 581 hss.satisfiable_signals); 582 583 // Wait for the peer closed signal. 584 hss = MojoHandleSignalsState(); 585 ASSERT_EQ(MOJO_RESULT_OK, 586 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss)); 587 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 588 MOJO_HANDLE_SIGNAL_PEER_CLOSED, 589 hss.satisfied_signals); 590 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 591 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 592 hss.satisfiable_signals); 593 594 // Read one element. 595 elements[0] = -1; 596 elements[1] = -1; 597 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 598 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true)); 599 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 600 ASSERT_EQ(789, elements[0]); 601 ASSERT_EQ(-1, elements[1]); 602 603 // Should be never-readable. 604 hss = MojoHandleSignalsState(); 605 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 606 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 607 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 608 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 609 } 610 611 TEST_F(DataPipeTest, ConsumerNewDataReadable) { 612 const MojoCreateDataPipeOptions create_options = { 613 kSizeOfOptions, // |struct_size|. 614 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 615 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 616 1000 * sizeof(int32_t) // |capacity_num_bytes|. 617 }; 618 EXPECT_EQ(MOJO_RESULT_OK, Create(&create_options)); 619 620 int32_t elements[2] = {123, 456}; 621 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 622 EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true)); 623 624 // The consumer handle should appear to be readable and have new data. 625 EXPECT_EQ(MOJO_RESULT_OK, 626 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE)); 627 EXPECT_TRUE(GetSignalsState(consumer_).satisfied_signals & 628 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE); 629 630 // Now try to read a minimum of 6 elements. 631 int32_t read_elements[6]; 632 uint32_t num_read_bytes = sizeof(read_elements); 633 MojoReadDataOptions read_options; 634 read_options.struct_size = sizeof(read_options); 635 read_options.flags = MOJO_READ_DATA_FLAG_ALL_OR_NONE; 636 EXPECT_EQ( 637 MOJO_RESULT_OUT_OF_RANGE, 638 MojoReadData(consumer_, &read_options, read_elements, &num_read_bytes)); 639 640 // The consumer should still appear to be readable but not with new data. 641 EXPECT_TRUE(GetSignalsState(consumer_).satisfied_signals & 642 MOJO_HANDLE_SIGNAL_READABLE); 643 EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals & 644 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE); 645 646 // Write four more elements. 647 EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true)); 648 EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true)); 649 650 // The consumer handle should once again appear to be readable. 651 EXPECT_EQ(MOJO_RESULT_OK, 652 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE)); 653 654 // Try again to read a minimum of 6 elements. Should succeed this time. 655 EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(consumer_, &read_options, 656 read_elements, &num_read_bytes)); 657 658 // And now the consumer is unreadable. 659 EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals & 660 MOJO_HANDLE_SIGNAL_READABLE); 661 EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals & 662 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE); 663 } 664 665 // Test with two-phase APIs and also closing the producer with an active 666 // consumer waiter. 667 TEST_F(DataPipeTest, ConsumerWaitingTwoPhase) { 668 const MojoCreateDataPipeOptions options = { 669 kSizeOfOptions, // |struct_size|. 670 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 671 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 672 1000 * sizeof(int32_t) // |capacity_num_bytes|. 673 }; 674 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 675 MojoHandleSignalsState hss; 676 677 // Write two elements. 678 int32_t* elements = nullptr; 679 void* buffer = nullptr; 680 // Request room for three (but we'll only write two). 681 uint32_t num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0])); 682 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes)); 683 EXPECT_TRUE(buffer); 684 EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0]))); 685 elements = static_cast<int32_t*>(buffer); 686 elements[0] = 123; 687 elements[1] = 456; 688 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(2u * sizeof(elements[0]))); 689 690 // Wait for readability. 691 hss = MojoHandleSignalsState(); 692 ASSERT_EQ(MOJO_RESULT_OK, 693 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 694 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 695 hss.satisfied_signals); 696 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 697 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 698 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 699 hss.satisfiable_signals); 700 701 // Read one element. 702 // Two should be available, but only read one. 703 const void* read_buffer = nullptr; 704 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes)); 705 EXPECT_TRUE(read_buffer); 706 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes); 707 const int32_t* read_elements = static_cast<const int32_t*>(read_buffer); 708 ASSERT_EQ(123, read_elements[0]); 709 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0]))); 710 711 // Should still be readable. 712 hss = MojoHandleSignalsState(); 713 ASSERT_EQ(MOJO_RESULT_OK, 714 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 715 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 716 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 717 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 718 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 719 hss.satisfiable_signals); 720 721 // Read one element. 722 // Request three, but not in all-or-none mode. 723 read_buffer = nullptr; 724 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0])); 725 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes)); 726 EXPECT_TRUE(read_buffer); 727 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 728 read_elements = static_cast<const int32_t*>(read_buffer); 729 ASSERT_EQ(456, read_elements[0]); 730 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0]))); 731 732 // Close the producer. 733 CloseProducer(); 734 735 // Should be never-readable. 736 hss = MojoHandleSignalsState(); 737 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 738 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 739 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 740 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 741 } 742 743 // Tests that data pipes aren't writable/readable during two-phase writes/reads. 744 TEST_F(DataPipeTest, BasicTwoPhaseWaiting) { 745 const MojoCreateDataPipeOptions options = { 746 kSizeOfOptions, // |struct_size|. 747 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 748 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 749 1000 * sizeof(int32_t) // |capacity_num_bytes|. 750 }; 751 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 752 MojoHandleSignalsState hss; 753 754 // It should be writable. 755 hss = GetSignalsState(producer_); 756 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); 757 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 758 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 759 hss.satisfiable_signals); 760 761 uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t)); 762 void* write_ptr = nullptr; 763 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes)); 764 EXPECT_TRUE(write_ptr); 765 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t))); 766 767 // At this point, it shouldn't be writable. 768 hss = GetSignalsState(producer_); 769 ASSERT_EQ(0u, hss.satisfied_signals); 770 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 771 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 772 hss.satisfiable_signals); 773 774 // It shouldn't be readable yet either (we'll wait later). 775 hss = GetSignalsState(consumer_); 776 ASSERT_EQ(0u, hss.satisfied_signals); 777 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 778 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 779 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 780 hss.satisfiable_signals); 781 782 static_cast<int32_t*>(write_ptr)[0] = 123; 783 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(1u * sizeof(int32_t))); 784 785 // It should immediately be writable again. 786 hss = GetSignalsState(producer_); 787 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); 788 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 789 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 790 hss.satisfiable_signals); 791 792 // It should become readable. 793 hss = MojoHandleSignalsState(); 794 ASSERT_EQ(MOJO_RESULT_OK, 795 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 796 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 797 hss.satisfied_signals); 798 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 799 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 800 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 801 hss.satisfiable_signals); 802 803 // Start another two-phase write and check that it's readable even in the 804 // middle of it. 805 num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t)); 806 write_ptr = nullptr; 807 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes)); 808 EXPECT_TRUE(write_ptr); 809 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t))); 810 811 // It should be readable. 812 hss = MojoHandleSignalsState(); 813 ASSERT_EQ(MOJO_RESULT_OK, 814 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 815 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 816 hss.satisfied_signals); 817 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 818 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 819 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 820 hss.satisfiable_signals); 821 822 // End the two-phase write without writing anything. 823 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0u)); 824 825 // Start a two-phase read. 826 num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t)); 827 const void* read_ptr = nullptr; 828 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes)); 829 EXPECT_TRUE(read_ptr); 830 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes); 831 832 // At this point, it should still be writable. 833 hss = GetSignalsState(producer_); 834 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); 835 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 836 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 837 hss.satisfiable_signals); 838 839 // But not readable. 840 hss = GetSignalsState(consumer_); 841 ASSERT_EQ(0u, hss.satisfied_signals); 842 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 843 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 844 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 845 hss.satisfiable_signals); 846 847 // End the two-phase read without reading anything. 848 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0u)); 849 850 // It should be readable again. 851 hss = GetSignalsState(consumer_); 852 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 853 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 854 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 855 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 856 hss.satisfiable_signals); 857 } 858 859 void Seq(int32_t start, size_t count, int32_t* out) { 860 for (size_t i = 0; i < count; i++) 861 out[i] = start + static_cast<int32_t>(i); 862 } 863 864 TEST_F(DataPipeTest, AllOrNone) { 865 const MojoCreateDataPipeOptions options = { 866 kSizeOfOptions, // |struct_size|. 867 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 868 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 869 10 * sizeof(int32_t) // |capacity_num_bytes|. 870 }; 871 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 872 MojoHandleSignalsState hss; 873 874 // Try writing more than the total capacity of the pipe. 875 uint32_t num_bytes = 20u * sizeof(int32_t); 876 int32_t buffer[100]; 877 Seq(0, arraysize(buffer), buffer); 878 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true)); 879 880 // Should still be empty. 881 num_bytes = ~0u; 882 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 883 ASSERT_EQ(0u, num_bytes); 884 885 // Write some data. 886 num_bytes = 5u * sizeof(int32_t); 887 Seq(100, arraysize(buffer), buffer); 888 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true)); 889 ASSERT_EQ(5u * sizeof(int32_t), num_bytes); 890 891 // Wait for data. 892 // TODO(vtl): There's no real guarantee that all the data will become 893 // available at once (except that in current implementations, with reasonable 894 // limits, it will). Eventually, we'll be able to wait for a specified amount 895 // of data to become available. 896 hss = MojoHandleSignalsState(); 897 ASSERT_EQ(MOJO_RESULT_OK, 898 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 899 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 900 hss.satisfied_signals); 901 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 902 MOJO_HANDLE_SIGNAL_PEER_CLOSED | MOJO_HANDLE_SIGNAL_PEER_REMOTE, 903 hss.satisfiable_signals); 904 905 // Half full. 906 num_bytes = 0u; 907 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 908 ASSERT_EQ(5u * sizeof(int32_t), num_bytes); 909 910 // Try writing more than the available capacity of the pipe, but less than the 911 // total capacity. 912 num_bytes = 6u * sizeof(int32_t); 913 Seq(200, arraysize(buffer), buffer); 914 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true)); 915 916 // Try reading too much. 917 num_bytes = 11u * sizeof(int32_t); 918 memset(buffer, 0xab, sizeof(buffer)); 919 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true)); 920 int32_t expected_buffer[100]; 921 memset(expected_buffer, 0xab, sizeof(expected_buffer)); 922 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); 923 924 // Try discarding too much. 925 num_bytes = 11u * sizeof(int32_t); 926 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true)); 927 928 // Just a little. 929 num_bytes = 2u * sizeof(int32_t); 930 Seq(300, arraysize(buffer), buffer); 931 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true)); 932 ASSERT_EQ(2u * sizeof(int32_t), num_bytes); 933 934 // Just right. 935 num_bytes = 3u * sizeof(int32_t); 936 Seq(400, arraysize(buffer), buffer); 937 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true)); 938 ASSERT_EQ(3u * sizeof(int32_t), num_bytes); 939 940 // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a 941 // specified amount of data to be available, so poll. 942 for (size_t i = 0; i < kMaxPoll; i++) { 943 num_bytes = 0u; 944 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 945 if (num_bytes >= 10u * sizeof(int32_t)) 946 break; 947 948 test::Sleep(test::EpsilonDeadline()); 949 } 950 ASSERT_EQ(10u * sizeof(int32_t), num_bytes); 951 952 // Read half. 953 num_bytes = 5u * sizeof(int32_t); 954 memset(buffer, 0xab, sizeof(buffer)); 955 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true)); 956 ASSERT_EQ(5u * sizeof(int32_t), num_bytes); 957 memset(expected_buffer, 0xab, sizeof(expected_buffer)); 958 Seq(100, 5, expected_buffer); 959 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); 960 961 // Try reading too much again. 962 num_bytes = 6u * sizeof(int32_t); 963 memset(buffer, 0xab, sizeof(buffer)); 964 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true)); 965 memset(expected_buffer, 0xab, sizeof(expected_buffer)); 966 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); 967 968 // Try discarding too much again. 969 num_bytes = 6u * sizeof(int32_t); 970 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true)); 971 972 // Discard a little. 973 num_bytes = 2u * sizeof(int32_t); 974 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true)); 975 ASSERT_EQ(2u * sizeof(int32_t), num_bytes); 976 977 // Three left. 978 num_bytes = 0u; 979 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 980 ASSERT_EQ(3u * sizeof(int32_t), num_bytes); 981 982 // Close the producer, then test producer-closed cases. 983 CloseProducer(); 984 985 // Wait. 986 hss = MojoHandleSignalsState(); 987 ASSERT_EQ(MOJO_RESULT_OK, 988 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss)); 989 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 990 hss.satisfied_signals); 991 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 992 hss.satisfiable_signals); 993 994 // Try reading too much; "failed precondition" since the producer is closed. 995 num_bytes = 4u * sizeof(int32_t); 996 memset(buffer, 0xab, sizeof(buffer)); 997 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 998 ReadData(buffer, &num_bytes, true)); 999 memset(expected_buffer, 0xab, sizeof(expected_buffer)); 1000 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); 1001 1002 // Try discarding too much; "failed precondition" again. 1003 num_bytes = 4u * sizeof(int32_t); 1004 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes, true)); 1005 1006 // Read a little. 1007 num_bytes = 2u * sizeof(int32_t); 1008 memset(buffer, 0xab, sizeof(buffer)); 1009 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true)); 1010 ASSERT_EQ(2u * sizeof(int32_t), num_bytes); 1011 memset(expected_buffer, 0xab, sizeof(expected_buffer)); 1012 Seq(400, 2, expected_buffer); 1013 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); 1014 1015 // Discard the remaining element. 1016 num_bytes = 1u * sizeof(int32_t); 1017 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true)); 1018 ASSERT_EQ(1u * sizeof(int32_t), num_bytes); 1019 1020 // Empty again. 1021 num_bytes = ~0u; 1022 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1023 ASSERT_EQ(0u, num_bytes); 1024 } 1025 1026 // Tests that |ProducerWriteData()| and |ConsumerReadData()| writes and reads, 1027 // respectively, as much as possible, even if it may have to "wrap around" the 1028 // internal circular buffer. (Note that the two-phase write and read need not do 1029 // this.) 1030 TEST_F(DataPipeTest, WrapAround) { 1031 unsigned char test_data[1000]; 1032 for (size_t i = 0; i < arraysize(test_data); i++) 1033 test_data[i] = static_cast<unsigned char>(i); 1034 1035 const MojoCreateDataPipeOptions options = { 1036 kSizeOfOptions, // |struct_size|. 1037 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 1038 1u, // |element_num_bytes|. 1039 100u // |capacity_num_bytes|. 1040 }; 1041 1042 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1043 MojoHandleSignalsState hss; 1044 1045 // Write 20 bytes. 1046 uint32_t num_bytes = 20u; 1047 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&test_data[0], &num_bytes, true)); 1048 ASSERT_EQ(20u, num_bytes); 1049 1050 // Wait for data. 1051 ASSERT_EQ(MOJO_RESULT_OK, 1052 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 1053 EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE); 1054 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1055 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 1056 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 1057 hss.satisfiable_signals); 1058 1059 // Read 10 bytes. 1060 unsigned char read_buffer[1000] = {0}; 1061 num_bytes = 10u; 1062 ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes, true)); 1063 ASSERT_EQ(10u, num_bytes); 1064 ASSERT_EQ(0, memcmp(read_buffer, &test_data[0], 10u)); 1065 1066 // Check that a two-phase write can now only write (at most) 80 bytes. (This 1067 // checks an implementation detail; this behavior is not guaranteed.) 1068 void* write_buffer_ptr = nullptr; 1069 num_bytes = 0u; 1070 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes)); 1071 EXPECT_TRUE(write_buffer_ptr); 1072 ASSERT_EQ(80u, num_bytes); 1073 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0)); 1074 1075 size_t total_num_bytes = 0; 1076 while (total_num_bytes < 90) { 1077 // Wait to write. 1078 ASSERT_EQ(MOJO_RESULT_OK, 1079 WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, &hss)); 1080 ASSERT_EQ(hss.satisfied_signals, MOJO_HANDLE_SIGNAL_WRITABLE); 1081 ASSERT_EQ(hss.satisfiable_signals, MOJO_HANDLE_SIGNAL_WRITABLE | 1082 MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1083 MOJO_HANDLE_SIGNAL_PEER_REMOTE); 1084 1085 // Write as much as we can. 1086 num_bytes = 100; 1087 ASSERT_EQ(MOJO_RESULT_OK, 1088 WriteData(&test_data[20 + total_num_bytes], &num_bytes, false)); 1089 total_num_bytes += num_bytes; 1090 } 1091 1092 ASSERT_EQ(90u, total_num_bytes); 1093 1094 num_bytes = 0; 1095 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1096 ASSERT_EQ(100u, num_bytes); 1097 1098 // Check that a two-phase read can now only read (at most) 90 bytes. (This 1099 // checks an implementation detail; this behavior is not guaranteed.) 1100 const void* read_buffer_ptr = nullptr; 1101 num_bytes = 0; 1102 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes)); 1103 EXPECT_TRUE(read_buffer_ptr); 1104 ASSERT_EQ(90u, num_bytes); 1105 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0)); 1106 1107 // Read as much as possible. We should read 100 bytes. 1108 num_bytes = 1109 static_cast<uint32_t>(arraysize(read_buffer) * sizeof(read_buffer[0])); 1110 memset(read_buffer, 0, num_bytes); 1111 ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes)); 1112 ASSERT_EQ(100u, num_bytes); 1113 ASSERT_EQ(0, memcmp(read_buffer, &test_data[10], 100u)); 1114 } 1115 1116 // Tests the behavior of writing (simple and two-phase), closing the producer, 1117 // then reading (simple and two-phase). 1118 TEST_F(DataPipeTest, WriteCloseProducerRead) { 1119 const char kTestData[] = "hello world"; 1120 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData)); 1121 1122 const MojoCreateDataPipeOptions options = { 1123 kSizeOfOptions, // |struct_size|. 1124 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 1125 1u, // |element_num_bytes|. 1126 1000u // |capacity_num_bytes|. 1127 }; 1128 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1129 1130 // Write some data, so we'll have something to read. 1131 uint32_t num_bytes = kTestDataSize; 1132 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false)); 1133 ASSERT_EQ(kTestDataSize, num_bytes); 1134 1135 // Write it again, so we'll have something left over. 1136 num_bytes = kTestDataSize; 1137 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false)); 1138 ASSERT_EQ(kTestDataSize, num_bytes); 1139 1140 // Start two-phase write. 1141 void* write_buffer_ptr = nullptr; 1142 num_bytes = 0u; 1143 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes)); 1144 EXPECT_TRUE(write_buffer_ptr); 1145 EXPECT_GT(num_bytes, 0u); 1146 1147 // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.) 1148 for (size_t i = 0; i < kMaxPoll; i++) { 1149 num_bytes = 0u; 1150 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1151 if (num_bytes >= 2u * kTestDataSize) 1152 break; 1153 1154 test::Sleep(test::EpsilonDeadline()); 1155 } 1156 ASSERT_EQ(2u * kTestDataSize, num_bytes); 1157 1158 // Start two-phase read. 1159 const void* read_buffer_ptr = nullptr; 1160 num_bytes = 0u; 1161 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes)); 1162 EXPECT_TRUE(read_buffer_ptr); 1163 ASSERT_EQ(2u * kTestDataSize, num_bytes); 1164 1165 // Close the producer. 1166 CloseProducer(); 1167 1168 // The consumer can finish its two-phase read. 1169 ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize)); 1170 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(kTestDataSize)); 1171 1172 // And start another. 1173 read_buffer_ptr = nullptr; 1174 num_bytes = 0u; 1175 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes)); 1176 EXPECT_TRUE(read_buffer_ptr); 1177 ASSERT_EQ(kTestDataSize, num_bytes); 1178 } 1179 1180 // Tests the behavior of interrupting a two-phase read and write by closing the 1181 // consumer. 1182 TEST_F(DataPipeTest, TwoPhaseWriteReadCloseConsumer) { 1183 const char kTestData[] = "hello world"; 1184 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData)); 1185 1186 const MojoCreateDataPipeOptions options = { 1187 kSizeOfOptions, // |struct_size|. 1188 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 1189 1u, // |element_num_bytes|. 1190 1000u // |capacity_num_bytes|. 1191 }; 1192 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1193 MojoHandleSignalsState hss; 1194 1195 // Write some data, so we'll have something to read. 1196 uint32_t num_bytes = kTestDataSize; 1197 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes)); 1198 ASSERT_EQ(kTestDataSize, num_bytes); 1199 1200 // Start two-phase write. 1201 void* write_buffer_ptr = nullptr; 1202 num_bytes = 0u; 1203 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes)); 1204 EXPECT_TRUE(write_buffer_ptr); 1205 ASSERT_GT(num_bytes, kTestDataSize); 1206 1207 // Wait for data. 1208 // TODO(vtl): (See corresponding TODO in AllOrNone.) 1209 hss = MojoHandleSignalsState(); 1210 ASSERT_EQ(MOJO_RESULT_OK, 1211 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 1212 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 1213 hss.satisfied_signals); 1214 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1215 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 1216 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 1217 hss.satisfiable_signals); 1218 1219 // Start two-phase read. 1220 const void* read_buffer_ptr = nullptr; 1221 num_bytes = 0u; 1222 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes)); 1223 EXPECT_TRUE(read_buffer_ptr); 1224 ASSERT_EQ(kTestDataSize, num_bytes); 1225 1226 // Close the consumer. 1227 CloseConsumer(); 1228 1229 // Wait for producer to know that the consumer is closed. 1230 hss = MojoHandleSignalsState(); 1231 ASSERT_EQ(MOJO_RESULT_OK, 1232 WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss)); 1233 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 1234 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 1235 1236 // Actually write some data. (Note: Premature freeing of the buffer would 1237 // probably only be detected under ASAN or similar.) 1238 memcpy(write_buffer_ptr, kTestData, kTestDataSize); 1239 // Note: Even though the consumer has been closed, ending the two-phase 1240 // write will report success. 1241 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(kTestDataSize)); 1242 1243 // But trying to write should result in failure. 1244 num_bytes = kTestDataSize; 1245 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, WriteData(kTestData, &num_bytes)); 1246 1247 // As will trying to start another two-phase write. 1248 write_buffer_ptr = nullptr; 1249 num_bytes = 0u; 1250 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 1251 BeginWriteData(&write_buffer_ptr, &num_bytes)); 1252 } 1253 1254 // Tests the behavior of "interrupting" a two-phase write by closing both the 1255 // producer and the consumer. 1256 TEST_F(DataPipeTest, TwoPhaseWriteCloseBoth) { 1257 const uint32_t kTestDataSize = 15u; 1258 1259 const MojoCreateDataPipeOptions options = { 1260 kSizeOfOptions, // |struct_size|. 1261 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 1262 1u, // |element_num_bytes|. 1263 1000u // |capacity_num_bytes|. 1264 }; 1265 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1266 1267 // Start two-phase write. 1268 void* write_buffer_ptr = nullptr; 1269 uint32_t num_bytes = 0u; 1270 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes)); 1271 EXPECT_TRUE(write_buffer_ptr); 1272 ASSERT_GT(num_bytes, kTestDataSize); 1273 } 1274 1275 // Tests the behavior of writing, closing the producer, and then reading (with 1276 // and without data remaining). 1277 TEST_F(DataPipeTest, WriteCloseProducerReadNoData) { 1278 const char kTestData[] = "hello world"; 1279 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData)); 1280 1281 const MojoCreateDataPipeOptions options = { 1282 kSizeOfOptions, // |struct_size|. 1283 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 1284 1u, // |element_num_bytes|. 1285 1000u // |capacity_num_bytes|. 1286 }; 1287 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1288 MojoHandleSignalsState hss; 1289 1290 // Write some data, so we'll have something to read. 1291 uint32_t num_bytes = kTestDataSize; 1292 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes)); 1293 ASSERT_EQ(kTestDataSize, num_bytes); 1294 1295 // Close the producer. 1296 CloseProducer(); 1297 1298 // Wait. (Note that once the consumer knows that the producer is closed, it 1299 // must also know about all the data that was sent.) 1300 hss = MojoHandleSignalsState(); 1301 ASSERT_EQ(MOJO_RESULT_OK, 1302 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss)); 1303 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1304 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 1305 hss.satisfied_signals); 1306 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1307 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 1308 hss.satisfiable_signals); 1309 1310 // Peek that data. 1311 char buffer[1000]; 1312 num_bytes = static_cast<uint32_t>(sizeof(buffer)); 1313 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, false, true)); 1314 ASSERT_EQ(kTestDataSize, num_bytes); 1315 ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize)); 1316 1317 // Read that data. 1318 memset(buffer, 0, 1000); 1319 num_bytes = static_cast<uint32_t>(sizeof(buffer)); 1320 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes)); 1321 ASSERT_EQ(kTestDataSize, num_bytes); 1322 ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize)); 1323 1324 // A second read should fail. 1325 num_bytes = static_cast<uint32_t>(sizeof(buffer)); 1326 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ReadData(buffer, &num_bytes)); 1327 1328 // A two-phase read should also fail. 1329 const void* read_buffer_ptr = nullptr; 1330 num_bytes = 0u; 1331 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 1332 BeginReadData(&read_buffer_ptr, &num_bytes)); 1333 1334 // Ditto for discard. 1335 num_bytes = 10u; 1336 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes)); 1337 } 1338 1339 // Test that during a two phase read the memory stays valid even if more data 1340 // comes in. 1341 TEST_F(DataPipeTest, TwoPhaseReadMemoryStable) { 1342 const char kTestData[] = "hello world"; 1343 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData)); 1344 1345 const MojoCreateDataPipeOptions options = { 1346 kSizeOfOptions, // |struct_size|. 1347 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 1348 1u, // |element_num_bytes|. 1349 1000u // |capacity_num_bytes|. 1350 }; 1351 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1352 MojoHandleSignalsState hss; 1353 1354 // Write some data. 1355 uint32_t num_bytes = kTestDataSize; 1356 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes)); 1357 ASSERT_EQ(kTestDataSize, num_bytes); 1358 1359 // Wait for the data. 1360 hss = MojoHandleSignalsState(); 1361 ASSERT_EQ(MOJO_RESULT_OK, 1362 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 1363 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 1364 hss.satisfied_signals); 1365 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1366 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 1367 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 1368 hss.satisfiable_signals); 1369 1370 // Begin a two-phase read. 1371 const void* read_buffer_ptr = nullptr; 1372 uint32_t read_buffer_size = 0u; 1373 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &read_buffer_size)); 1374 1375 // Write more data. 1376 const char kExtraData[] = "bye world"; 1377 const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData)); 1378 num_bytes = kExtraDataSize; 1379 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes)); 1380 ASSERT_EQ(kExtraDataSize, num_bytes); 1381 1382 // Close the producer. 1383 CloseProducer(); 1384 1385 // Wait. (Note that once the consumer knows that the producer is closed, it 1386 // must also have received the extra data). 1387 hss = MojoHandleSignalsState(); 1388 ASSERT_EQ(MOJO_RESULT_OK, 1389 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss)); 1390 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 1391 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1392 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 1393 hss.satisfiable_signals); 1394 1395 // Read the two phase memory to check it's still valid. 1396 ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize)); 1397 EndReadData(read_buffer_size); 1398 } 1399 1400 // Test that two-phase reads/writes behave correctly when given invalid 1401 // arguments. 1402 TEST_F(DataPipeTest, TwoPhaseMoreInvalidArguments) { 1403 const MojoCreateDataPipeOptions options = { 1404 kSizeOfOptions, // |struct_size|. 1405 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 1406 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 1407 10 * sizeof(int32_t) // |capacity_num_bytes|. 1408 }; 1409 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1410 MojoHandleSignalsState hss; 1411 1412 // No data. 1413 uint32_t num_bytes = 1000u; 1414 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1415 ASSERT_EQ(0u, num_bytes); 1416 1417 // Try "ending" a two-phase write when one isn't active. 1418 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 1419 EndWriteData(1u * sizeof(int32_t))); 1420 1421 // Wait a bit, to make sure that if a signal were (incorrectly) sent, it'd 1422 // have time to propagate. 1423 test::Sleep(test::EpsilonDeadline()); 1424 1425 // Still no data. 1426 num_bytes = 1000u; 1427 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1428 ASSERT_EQ(0u, num_bytes); 1429 1430 // Try ending a two-phase write with an invalid amount (too much). 1431 num_bytes = 0u; 1432 void* write_ptr = nullptr; 1433 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes)); 1434 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, 1435 EndWriteData(num_bytes + static_cast<uint32_t>(sizeof(int32_t)))); 1436 1437 // But the two-phase write still ended. 1438 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u)); 1439 1440 // Wait a bit (as above). 1441 test::Sleep(test::EpsilonDeadline()); 1442 1443 // Still no data. 1444 num_bytes = 1000u; 1445 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1446 ASSERT_EQ(0u, num_bytes); 1447 1448 // Try ending a two-phase write with an invalid amount (not a multiple of the 1449 // element size). 1450 num_bytes = 0u; 1451 write_ptr = nullptr; 1452 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes)); 1453 EXPECT_GE(num_bytes, 1u); 1454 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndWriteData(1u)); 1455 1456 // But the two-phase write still ended. 1457 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u)); 1458 1459 // Wait a bit (as above). 1460 test::Sleep(test::EpsilonDeadline()); 1461 1462 // Still no data. 1463 num_bytes = 1000u; 1464 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1465 ASSERT_EQ(0u, num_bytes); 1466 1467 // Now write some data, so we'll be able to try reading. 1468 int32_t element = 123; 1469 num_bytes = 1u * sizeof(int32_t); 1470 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&element, &num_bytes)); 1471 1472 // Wait for data. 1473 // TODO(vtl): (See corresponding TODO in AllOrNone.) 1474 hss = MojoHandleSignalsState(); 1475 ASSERT_EQ(MOJO_RESULT_OK, 1476 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 1477 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 1478 hss.satisfied_signals); 1479 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1480 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 1481 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 1482 hss.satisfiable_signals); 1483 1484 // One element available. 1485 num_bytes = 0u; 1486 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1487 ASSERT_EQ(1u * sizeof(int32_t), num_bytes); 1488 1489 // Try "ending" a two-phase read when one isn't active. 1490 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndReadData(1u * sizeof(int32_t))); 1491 1492 // Still one element available. 1493 num_bytes = 0u; 1494 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1495 ASSERT_EQ(1u * sizeof(int32_t), num_bytes); 1496 1497 // Try ending a two-phase read with an invalid amount (too much). 1498 num_bytes = 0u; 1499 const void* read_ptr = nullptr; 1500 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes)); 1501 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, 1502 EndReadData(num_bytes + static_cast<uint32_t>(sizeof(int32_t)))); 1503 1504 // Still one element available. 1505 num_bytes = 0u; 1506 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1507 ASSERT_EQ(1u * sizeof(int32_t), num_bytes); 1508 1509 // Try ending a two-phase read with an invalid amount (not a multiple of the 1510 // element size). 1511 num_bytes = 0u; 1512 read_ptr = nullptr; 1513 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes)); 1514 ASSERT_EQ(1u * sizeof(int32_t), num_bytes); 1515 ASSERT_EQ(123, static_cast<const int32_t*>(read_ptr)[0]); 1516 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndReadData(1u)); 1517 1518 // Still one element available. 1519 num_bytes = 0u; 1520 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1521 ASSERT_EQ(1u * sizeof(int32_t), num_bytes); 1522 } 1523 1524 // Test that a producer can be sent over a MP. 1525 TEST_F(DataPipeTest, SendProducer) { 1526 const char kTestData[] = "hello world"; 1527 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData)); 1528 1529 const MojoCreateDataPipeOptions options = { 1530 kSizeOfOptions, // |struct_size|. 1531 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 1532 1u, // |element_num_bytes|. 1533 1000u // |capacity_num_bytes|. 1534 }; 1535 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1536 MojoHandleSignalsState hss; 1537 1538 // Write some data. 1539 uint32_t num_bytes = kTestDataSize; 1540 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes)); 1541 ASSERT_EQ(kTestDataSize, num_bytes); 1542 1543 // Wait for the data. 1544 hss = MojoHandleSignalsState(); 1545 ASSERT_EQ(MOJO_RESULT_OK, 1546 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 1547 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 1548 hss.satisfied_signals); 1549 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1550 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 1551 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 1552 hss.satisfiable_signals); 1553 1554 // Check the data. 1555 const void* read_buffer = nullptr; 1556 num_bytes = 0u; 1557 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes)); 1558 ASSERT_EQ(0, memcmp(read_buffer, kTestData, kTestDataSize)); 1559 EndReadData(num_bytes); 1560 1561 // Now send the producer over a MP so that it's serialized. 1562 MojoHandle pipe0, pipe1; 1563 ASSERT_EQ(MOJO_RESULT_OK, MojoCreateMessagePipe(nullptr, &pipe0, &pipe1)); 1564 1565 ASSERT_EQ(MOJO_RESULT_OK, 1566 WriteMessageRaw(MessagePipeHandle(pipe0), nullptr, 0, &producer_, 1, 1567 MOJO_WRITE_MESSAGE_FLAG_NONE)); 1568 producer_ = MOJO_HANDLE_INVALID; 1569 ASSERT_EQ(MOJO_RESULT_OK, 1570 WaitForSignals(pipe1, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 1571 ASSERT_EQ(MOJO_RESULT_OK, ReadEmptyMessageWithHandles(pipe1, &producer_, 1)); 1572 1573 // Write more data. 1574 const char kExtraData[] = "bye world"; 1575 const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData)); 1576 num_bytes = kExtraDataSize; 1577 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes)); 1578 ASSERT_EQ(kExtraDataSize, num_bytes); 1579 1580 // Wait for it. 1581 hss = MojoHandleSignalsState(); 1582 ASSERT_EQ(MOJO_RESULT_OK, 1583 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 1584 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 1585 hss.satisfied_signals); 1586 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1587 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | 1588 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 1589 hss.satisfiable_signals); 1590 1591 // Check the second write. 1592 num_bytes = 0u; 1593 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes)); 1594 ASSERT_EQ(0, memcmp(read_buffer, kExtraData, kExtraDataSize)); 1595 EndReadData(num_bytes); 1596 1597 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0)); 1598 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1)); 1599 } 1600 1601 // Ensures that if a data pipe consumer whose producer has closed is passed over 1602 // a message pipe, the deserialized dispatcher is also marked as having a closed 1603 // peer. 1604 TEST_F(DataPipeTest, ConsumerWithClosedProducerSent) { 1605 const MojoCreateDataPipeOptions options = { 1606 kSizeOfOptions, // |struct_size|. 1607 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 1608 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 1609 1000 * sizeof(int32_t) // |capacity_num_bytes|. 1610 }; 1611 1612 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1613 1614 // We can write to a data pipe handle immediately. 1615 int32_t data = 123; 1616 uint32_t num_bytes = sizeof(data); 1617 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&data, &num_bytes)); 1618 ASSERT_EQ(MOJO_RESULT_OK, CloseProducer()); 1619 1620 // Now wait for the other side to become readable and to see the peer closed. 1621 MojoHandleSignalsState state; 1622 ASSERT_EQ(MOJO_RESULT_OK, 1623 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &state)); 1624 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1625 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 1626 state.satisfied_signals); 1627 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1628 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 1629 state.satisfiable_signals); 1630 1631 // Now send the consumer over a MP so that it's serialized. 1632 MojoHandle pipe0, pipe1; 1633 ASSERT_EQ(MOJO_RESULT_OK, MojoCreateMessagePipe(nullptr, &pipe0, &pipe1)); 1634 1635 ASSERT_EQ(MOJO_RESULT_OK, 1636 WriteMessageRaw(MessagePipeHandle(pipe0), nullptr, 0, &consumer_, 1, 1637 MOJO_WRITE_MESSAGE_FLAG_NONE)); 1638 consumer_ = MOJO_HANDLE_INVALID; 1639 ASSERT_EQ(MOJO_RESULT_OK, 1640 WaitForSignals(pipe1, MOJO_HANDLE_SIGNAL_READABLE, &state)); 1641 ASSERT_EQ(MOJO_RESULT_OK, ReadEmptyMessageWithHandles(pipe1, &consumer_, 1)); 1642 1643 ASSERT_EQ(MOJO_RESULT_OK, 1644 WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &state)); 1645 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1646 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 1647 state.satisfied_signals); 1648 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1649 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE, 1650 state.satisfiable_signals); 1651 1652 int32_t read_data; 1653 ASSERT_EQ(MOJO_RESULT_OK, ReadData(&read_data, &num_bytes)); 1654 ASSERT_EQ(sizeof(read_data), num_bytes); 1655 ASSERT_EQ(data, read_data); 1656 1657 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0)); 1658 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1)); 1659 } 1660 1661 bool WriteAllData(MojoHandle producer, 1662 const void* elements, 1663 uint32_t num_bytes) { 1664 for (size_t i = 0; i < kMaxPoll; i++) { 1665 // Write as much data as we can. 1666 uint32_t write_bytes = num_bytes; 1667 MojoResult result = 1668 MojoWriteData(producer, elements, &write_bytes, nullptr); 1669 if (result == MOJO_RESULT_OK) { 1670 num_bytes -= write_bytes; 1671 elements = static_cast<const uint8_t*>(elements) + write_bytes; 1672 if (num_bytes == 0) 1673 return true; 1674 } else { 1675 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result); 1676 } 1677 1678 MojoHandleSignalsState hss = MojoHandleSignalsState(); 1679 EXPECT_EQ(MOJO_RESULT_OK, test::MojoTestBase::WaitForSignals( 1680 producer, MOJO_HANDLE_SIGNAL_WRITABLE, &hss)); 1681 EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_WRITABLE); 1682 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED | 1683 MOJO_HANDLE_SIGNAL_PEER_REMOTE, 1684 hss.satisfiable_signals); 1685 } 1686 1687 return false; 1688 } 1689 1690 // If |expect_empty| is true, expect |consumer| to be empty after reading. 1691 bool ReadAllData(MojoHandle consumer, 1692 void* elements, 1693 uint32_t num_bytes, 1694 bool expect_empty) { 1695 for (size_t i = 0; i < kMaxPoll; i++) { 1696 // Read as much data as we can. 1697 uint32_t read_bytes = num_bytes; 1698 MojoResult result = MojoReadData(consumer, nullptr, elements, &read_bytes); 1699 if (result == MOJO_RESULT_OK) { 1700 num_bytes -= read_bytes; 1701 elements = static_cast<uint8_t*>(elements) + read_bytes; 1702 if (num_bytes == 0) { 1703 if (expect_empty) { 1704 // Expect no more data. 1705 test::Sleep(test::TinyDeadline()); 1706 MojoReadDataOptions options; 1707 options.struct_size = sizeof(options); 1708 options.flags = MOJO_READ_DATA_FLAG_QUERY; 1709 MojoReadData(consumer, &options, nullptr, &num_bytes); 1710 EXPECT_EQ(0u, num_bytes); 1711 } 1712 return true; 1713 } 1714 } else { 1715 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result); 1716 } 1717 1718 MojoHandleSignalsState hss = MojoHandleSignalsState(); 1719 EXPECT_EQ(MOJO_RESULT_OK, test::MojoTestBase::WaitForSignals( 1720 consumer, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 1721 // Peer could have become closed while we're still waiting for data. 1722 EXPECT_TRUE(MOJO_HANDLE_SIGNAL_READABLE & hss.satisfied_signals); 1723 EXPECT_TRUE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE); 1724 EXPECT_TRUE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED); 1725 } 1726 1727 return num_bytes == 0; 1728 } 1729 1730 #if !defined(OS_IOS) 1731 1732 TEST_F(DataPipeTest, Multiprocess) { 1733 const uint32_t kTestDataSize = 1734 static_cast<uint32_t>(sizeof(kMultiprocessTestData)); 1735 const MojoCreateDataPipeOptions options = { 1736 kSizeOfOptions, // |struct_size|. 1737 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 1738 1, // |element_num_bytes|. 1739 kMultiprocessCapacity // |capacity_num_bytes|. 1740 }; 1741 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1742 1743 RunTestClient("MultiprocessClient", [&](MojoHandle server_mp) { 1744 // Send some data before serialising and sending the data pipe over. 1745 // This is the first write so we don't need to use WriteAllData. 1746 uint32_t num_bytes = kTestDataSize; 1747 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kMultiprocessTestData, &num_bytes, 1748 MOJO_WRITE_DATA_FLAG_ALL_OR_NONE)); 1749 ASSERT_EQ(kTestDataSize, num_bytes); 1750 1751 // Send child process the data pipe. 1752 ASSERT_EQ(MOJO_RESULT_OK, 1753 WriteMessageRaw(MessagePipeHandle(server_mp), nullptr, 0, 1754 &consumer_, 1, MOJO_WRITE_MESSAGE_FLAG_NONE)); 1755 1756 // Send a bunch of data of varying sizes. 1757 uint8_t buffer[100]; 1758 int seq = 0; 1759 for (int i = 0; i < kMultiprocessMaxIter; ++i) { 1760 for (uint32_t size = 1; size <= kMultiprocessCapacity; size++) { 1761 for (unsigned int j = 0; j < size; ++j) 1762 buffer[j] = seq + j; 1763 EXPECT_TRUE(WriteAllData(producer_, buffer, size)); 1764 seq += size; 1765 } 1766 } 1767 1768 // Write the test string in again. 1769 ASSERT_TRUE(WriteAllData(producer_, kMultiprocessTestData, kTestDataSize)); 1770 1771 // Swap ends. 1772 ASSERT_EQ(MOJO_RESULT_OK, 1773 WriteMessageRaw(MessagePipeHandle(server_mp), nullptr, 0, 1774 &producer_, 1, MOJO_WRITE_MESSAGE_FLAG_NONE)); 1775 1776 // Receive the consumer from the other side. 1777 producer_ = MOJO_HANDLE_INVALID; 1778 MojoHandleSignalsState hss = MojoHandleSignalsState(); 1779 ASSERT_EQ(MOJO_RESULT_OK, 1780 WaitForSignals(server_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 1781 ASSERT_EQ(MOJO_RESULT_OK, 1782 ReadEmptyMessageWithHandles(server_mp, &consumer_, 1)); 1783 1784 // Read the test string twice. Once for when we sent it, and once for the 1785 // other end sending it. 1786 for (int i = 0; i < 2; ++i) { 1787 EXPECT_TRUE(ReadAllData(consumer_, buffer, kTestDataSize, i == 1)); 1788 EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize)); 1789 } 1790 1791 WriteMessage(server_mp, "quit"); 1792 1793 // Don't have to close the consumer here because it will be done for us. 1794 }); 1795 } 1796 1797 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessClient, DataPipeTest, client_mp) { 1798 const uint32_t kTestDataSize = 1799 static_cast<uint32_t>(sizeof(kMultiprocessTestData)); 1800 1801 // Receive the data pipe from the other side. 1802 MojoHandle consumer = MOJO_HANDLE_INVALID; 1803 MojoHandleSignalsState hss = MojoHandleSignalsState(); 1804 ASSERT_EQ(MOJO_RESULT_OK, 1805 WaitForSignals(client_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 1806 ASSERT_EQ(MOJO_RESULT_OK, 1807 ReadEmptyMessageWithHandles(client_mp, &consumer, 1)); 1808 1809 // Read the initial string that was sent. 1810 int32_t buffer[100]; 1811 EXPECT_TRUE(ReadAllData(consumer, buffer, kTestDataSize, false)); 1812 EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize)); 1813 1814 // Receive the main data and check it is correct. 1815 int seq = 0; 1816 uint8_t expected_buffer[100]; 1817 for (int i = 0; i < kMultiprocessMaxIter; ++i) { 1818 for (uint32_t size = 1; size <= kMultiprocessCapacity; ++size) { 1819 for (unsigned int j = 0; j < size; ++j) 1820 expected_buffer[j] = seq + j; 1821 EXPECT_TRUE(ReadAllData(consumer, buffer, size, false)); 1822 EXPECT_EQ(0, memcmp(buffer, expected_buffer, size)); 1823 1824 seq += size; 1825 } 1826 } 1827 1828 // Swap ends. 1829 ASSERT_EQ(MOJO_RESULT_OK, 1830 WriteMessageRaw(MessagePipeHandle(client_mp), nullptr, 0, &consumer, 1831 1, MOJO_WRITE_MESSAGE_FLAG_NONE)); 1832 1833 // Receive the producer from the other side. 1834 MojoHandle producer = MOJO_HANDLE_INVALID; 1835 hss = MojoHandleSignalsState(); 1836 ASSERT_EQ(MOJO_RESULT_OK, 1837 WaitForSignals(client_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss)); 1838 ASSERT_EQ(MOJO_RESULT_OK, 1839 ReadEmptyMessageWithHandles(client_mp, &producer, 1)); 1840 1841 // Write the test string one more time. 1842 EXPECT_TRUE(WriteAllData(producer, kMultiprocessTestData, kTestDataSize)); 1843 1844 // We swapped ends, so close the producer. 1845 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(producer)); 1846 1847 // Wait to receive a "quit" message before exiting. 1848 EXPECT_EQ("quit", ReadMessage(client_mp)); 1849 } 1850 1851 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(WriteAndCloseProducer, DataPipeTest, h) { 1852 MojoHandle p; 1853 std::string message = ReadMessageWithHandles(h, &p, 1); 1854 1855 // Write some data to the producer and close it. 1856 uint32_t num_bytes = static_cast<uint32_t>(message.size()); 1857 EXPECT_EQ(MOJO_RESULT_OK, 1858 MojoWriteData(p, message.data(), &num_bytes, nullptr)); 1859 EXPECT_EQ(num_bytes, static_cast<uint32_t>(message.size())); 1860 1861 // Close the producer before quitting. 1862 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p)); 1863 1864 // Wait for a quit message. 1865 EXPECT_EQ("quit", ReadMessage(h)); 1866 } 1867 1868 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReadAndCloseConsumer, DataPipeTest, h) { 1869 MojoHandle c; 1870 std::string expected_message = ReadMessageWithHandles(h, &c, 1); 1871 1872 // Wait for the consumer to become readable. 1873 EXPECT_EQ(MOJO_RESULT_OK, WaitForSignals(c, MOJO_HANDLE_SIGNAL_READABLE)); 1874 1875 // Drain the consumer and expect to find the given message. 1876 uint32_t num_bytes = static_cast<uint32_t>(expected_message.size()); 1877 std::vector<char> bytes(expected_message.size()); 1878 EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(c, nullptr, bytes.data(), &num_bytes)); 1879 EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size())); 1880 1881 std::string message(bytes.data(), bytes.size()); 1882 EXPECT_EQ(expected_message, message); 1883 1884 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c)); 1885 1886 // Wait for a quit message. 1887 EXPECT_EQ("quit", ReadMessage(h)); 1888 } 1889 1890 TEST_F(DataPipeTest, SendConsumerAndCloseProducer) { 1891 // Create a new data pipe. 1892 MojoHandle p, c; 1893 EXPECT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(nullptr, &p, &c)); 1894 1895 RunTestClient("WriteAndCloseProducer", [&](MojoHandle producer_client) { 1896 RunTestClient("ReadAndCloseConsumer", [&](MojoHandle consumer_client) { 1897 const std::string kMessage = "Hello, world!"; 1898 WriteMessageWithHandles(producer_client, kMessage, &p, 1); 1899 WriteMessageWithHandles(consumer_client, kMessage, &c, 1); 1900 1901 WriteMessage(consumer_client, "quit"); 1902 }); 1903 1904 WriteMessage(producer_client, "quit"); 1905 }); 1906 } 1907 1908 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndWrite, DataPipeTest, h) { 1909 const MojoCreateDataPipeOptions options = { 1910 kSizeOfOptions, // |struct_size|. 1911 MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|. 1912 1, // |element_num_bytes|. 1913 kMultiprocessCapacity // |capacity_num_bytes|. 1914 }; 1915 1916 MojoHandle p, c; 1917 ASSERT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(&options, &p, &c)); 1918 1919 const std::string kMessage = "Hello, world!"; 1920 WriteMessageWithHandles(h, kMessage, &c, 1); 1921 1922 // Write some data to the producer and close it. 1923 uint32_t num_bytes = static_cast<uint32_t>(kMessage.size()); 1924 EXPECT_EQ(MOJO_RESULT_OK, 1925 MojoWriteData(p, kMessage.data(), &num_bytes, nullptr)); 1926 EXPECT_EQ(num_bytes, static_cast<uint32_t>(kMessage.size())); 1927 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p)); 1928 1929 // Wait for a quit message. 1930 EXPECT_EQ("quit", ReadMessage(h)); 1931 } 1932 1933 TEST_F(DataPipeTest, CreateInChild) { 1934 RunTestClient("CreateAndWrite", [&](MojoHandle child) { 1935 MojoHandle c; 1936 std::string expected_message = ReadMessageWithHandles(child, &c, 1); 1937 1938 // Wait for the consumer to become readable. 1939 EXPECT_EQ(MOJO_RESULT_OK, WaitForSignals(c, MOJO_HANDLE_SIGNAL_READABLE)); 1940 1941 // Drain the consumer and expect to find the given message. 1942 uint32_t num_bytes = static_cast<uint32_t>(expected_message.size()); 1943 std::vector<char> bytes(expected_message.size()); 1944 EXPECT_EQ(MOJO_RESULT_OK, 1945 MojoReadData(c, nullptr, bytes.data(), &num_bytes)); 1946 EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size())); 1947 1948 std::string message(bytes.data(), bytes.size()); 1949 EXPECT_EQ(expected_message, message); 1950 1951 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c)); 1952 WriteMessage(child, "quit"); 1953 }); 1954 } 1955 1956 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(DataPipeStatusChangeInTransitClient, 1957 DataPipeTest, 1958 parent) { 1959 // This test verifies that peer closure is detectable through various 1960 // mechanisms when it races with handle transfer. 1961 1962 MojoHandle handles[6]; 1963 EXPECT_EQ("o_O", ReadMessageWithHandles(parent, handles, 6)); 1964 MojoHandle* producers = &handles[0]; 1965 MojoHandle* consumers = &handles[3]; 1966 1967 // Wait on producer 0 1968 EXPECT_EQ(MOJO_RESULT_OK, 1969 WaitForSignals(producers[0], MOJO_HANDLE_SIGNAL_PEER_CLOSED)); 1970 1971 // Wait on consumer 0 1972 EXPECT_EQ(MOJO_RESULT_OK, 1973 WaitForSignals(consumers[0], MOJO_HANDLE_SIGNAL_PEER_CLOSED)); 1974 1975 base::MessageLoop message_loop; 1976 1977 // Wait on producer 1 and consumer 1 using SimpleWatchers. 1978 { 1979 base::RunLoop run_loop; 1980 int count = 0; 1981 auto callback = base::Bind( 1982 [](base::RunLoop* loop, int* count, MojoResult result) { 1983 EXPECT_EQ(MOJO_RESULT_OK, result); 1984 if (++*count == 2) 1985 loop->Quit(); 1986 }, 1987 &run_loop, &count); 1988 SimpleWatcher producer_watcher(FROM_HERE, 1989 SimpleWatcher::ArmingPolicy::AUTOMATIC, 1990 base::SequencedTaskRunnerHandle::Get()); 1991 SimpleWatcher consumer_watcher(FROM_HERE, 1992 SimpleWatcher::ArmingPolicy::AUTOMATIC, 1993 base::SequencedTaskRunnerHandle::Get()); 1994 producer_watcher.Watch(Handle(producers[1]), MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1995 callback); 1996 consumer_watcher.Watch(Handle(consumers[1]), MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1997 callback); 1998 run_loop.Run(); 1999 EXPECT_EQ(2, count); 2000 } 2001 2002 // Wait on producer 2 by polling with MojoWriteData. 2003 MojoResult result; 2004 do { 2005 uint32_t num_bytes = 0; 2006 result = MojoWriteData(producers[2], nullptr, &num_bytes, nullptr); 2007 } while (result == MOJO_RESULT_OK); 2008 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); 2009 2010 // Wait on consumer 2 by polling with MojoReadData. 2011 do { 2012 char byte; 2013 uint32_t num_bytes = 1; 2014 result = MojoReadData(consumers[2], nullptr, &byte, &num_bytes); 2015 } while (result == MOJO_RESULT_SHOULD_WAIT); 2016 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); 2017 2018 for (size_t i = 0; i < 6; ++i) 2019 CloseHandle(handles[i]); 2020 } 2021 2022 TEST_F(DataPipeTest, StatusChangeInTransit) { 2023 MojoHandle producers[6]; 2024 MojoHandle consumers[6]; 2025 for (size_t i = 0; i < 6; ++i) 2026 CreateDataPipe(&producers[i], &consumers[i], 1); 2027 2028 RunTestClient("DataPipeStatusChangeInTransitClient", [&](MojoHandle child) { 2029 MojoHandle handles[] = {producers[0], producers[1], producers[2], 2030 consumers[3], consumers[4], consumers[5]}; 2031 2032 // Send 3 producers and 3 consumers, and let their transfer race with their 2033 // peers' closure. 2034 WriteMessageWithHandles(child, "o_O", handles, 6); 2035 2036 for (size_t i = 0; i < 3; ++i) 2037 CloseHandle(consumers[i]); 2038 for (size_t i = 3; i < 6; ++i) 2039 CloseHandle(producers[i]); 2040 }); 2041 } 2042 2043 #endif // !defined(OS_IOS) 2044 2045 } // namespace 2046 } // namespace core 2047 } // namespace mojo 2048