1 // Copyright 2015 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include <stddef.h> 6 #include <stdint.h> 7 8 #include <memory> 9 10 #include "base/bind.h" 11 #include "base/location.h" 12 #include "base/logging.h" 13 #include "base/macros.h" 14 #include "base/message_loop/message_loop.h" 15 #include "mojo/edk/embedder/embedder.h" 16 #include "mojo/edk/embedder/platform_channel_pair.h" 17 #include "mojo/edk/system/test_utils.h" 18 #include "mojo/edk/system/waiter.h" 19 #include "mojo/edk/test/mojo_test_base.h" 20 #include "mojo/public/c/system/data_pipe.h" 21 #include "mojo/public/c/system/functions.h" 22 #include "mojo/public/c/system/message_pipe.h" 23 #include "testing/gtest/include/gtest/gtest.h" 24 25 namespace mojo { 26 namespace edk { 27 namespace { 28 29 const uint32_t kSizeOfOptions = 30 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)); 31 32 // In various places, we have to poll (since, e.g., we can't yet wait for a 33 // certain amount of data to be available). This is the maximum number of 34 // iterations (separated by a short sleep). 35 // TODO(vtl): Get rid of this. 36 const size_t kMaxPoll = 100; 37 38 // Used in Multiprocess test. 39 const size_t kMultiprocessCapacity = 37; 40 const char kMultiprocessTestData[] = "hello i'm a string that is 36 bytes"; 41 const int kMultiprocessMaxIter = 5; 42 43 class DataPipeTest : public test::MojoTestBase { 44 public: 45 DataPipeTest() : producer_(MOJO_HANDLE_INVALID), 46 consumer_(MOJO_HANDLE_INVALID) {} 47 48 ~DataPipeTest() override { 49 if (producer_ != MOJO_HANDLE_INVALID) 50 CHECK_EQ(MOJO_RESULT_OK, MojoClose(producer_)); 51 if (consumer_ != MOJO_HANDLE_INVALID) 52 CHECK_EQ(MOJO_RESULT_OK, MojoClose(consumer_)); 53 } 54 55 MojoResult Create(const MojoCreateDataPipeOptions* options) { 56 return MojoCreateDataPipe(options, &producer_, &consumer_); 57 } 58 59 MojoResult WriteData(const void* elements, 60 uint32_t* num_bytes, 61 bool all_or_none = false) { 62 return MojoWriteData(producer_, elements, num_bytes, 63 all_or_none ? MOJO_WRITE_DATA_FLAG_ALL_OR_NONE 64 : MOJO_WRITE_DATA_FLAG_NONE); 65 } 66 67 MojoResult ReadData(void* elements, 68 uint32_t* num_bytes, 69 bool all_or_none = false, 70 bool peek = false) { 71 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE; 72 if (all_or_none) 73 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE; 74 if (peek) 75 flags |= MOJO_READ_DATA_FLAG_PEEK; 76 return MojoReadData(consumer_, elements, num_bytes, flags); 77 } 78 79 MojoResult QueryData(uint32_t* num_bytes) { 80 return MojoReadData(consumer_, nullptr, num_bytes, 81 MOJO_READ_DATA_FLAG_QUERY); 82 } 83 84 MojoResult DiscardData(uint32_t* num_bytes, bool all_or_none = false) { 85 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_DISCARD; 86 if (all_or_none) 87 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE; 88 return MojoReadData(consumer_, nullptr, num_bytes, flags); 89 } 90 91 MojoResult BeginReadData(const void** elements, 92 uint32_t* num_bytes, 93 bool all_or_none = false) { 94 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE; 95 if (all_or_none) 96 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE; 97 return MojoBeginReadData(consumer_, elements, num_bytes, flags); 98 } 99 100 MojoResult EndReadData(uint32_t num_bytes_read) { 101 return MojoEndReadData(consumer_, num_bytes_read); 102 } 103 104 MojoResult BeginWriteData(void** elements, 105 uint32_t* num_bytes, 106 bool all_or_none = false) { 107 MojoReadDataFlags flags = MOJO_WRITE_DATA_FLAG_NONE; 108 if (all_or_none) 109 flags |= MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; 110 return MojoBeginWriteData(producer_, elements, num_bytes, flags); 111 } 112 113 MojoResult EndWriteData(uint32_t num_bytes_written) { 114 return MojoEndWriteData(producer_, num_bytes_written); 115 } 116 117 MojoResult CloseProducer() { 118 MojoResult rv = MojoClose(producer_); 119 producer_ = MOJO_HANDLE_INVALID; 120 return rv; 121 } 122 123 MojoResult CloseConsumer() { 124 MojoResult rv = MojoClose(consumer_); 125 consumer_ = MOJO_HANDLE_INVALID; 126 return rv; 127 } 128 129 MojoHandle producer_, consumer_; 130 131 private: 132 DISALLOW_COPY_AND_ASSIGN(DataPipeTest); 133 }; 134 135 TEST_F(DataPipeTest, Basic) { 136 const MojoCreateDataPipeOptions options = { 137 kSizeOfOptions, // |struct_size|. 138 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 139 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 140 1000 * sizeof(int32_t) // |capacity_num_bytes|. 141 }; 142 143 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 144 145 // We can write to a data pipe handle immediately. 146 int32_t elements[10] = {}; 147 uint32_t num_bytes = 0; 148 149 num_bytes = 150 static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0])); 151 152 elements[0] = 123; 153 elements[1] = 456; 154 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 155 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&elements[0], &num_bytes)); 156 157 // Now wait for the other side to become readable. 158 MojoHandleSignalsState state; 159 ASSERT_EQ(MOJO_RESULT_OK, 160 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 161 MOJO_DEADLINE_INDEFINITE, &state)); 162 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, state.satisfied_signals); 163 164 elements[0] = -1; 165 elements[1] = -1; 166 ASSERT_EQ(MOJO_RESULT_OK, ReadData(&elements[0], &num_bytes)); 167 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes); 168 ASSERT_EQ(elements[0], 123); 169 ASSERT_EQ(elements[1], 456); 170 } 171 172 // Tests creation of data pipes with various (valid) options. 173 TEST_F(DataPipeTest, CreateAndMaybeTransfer) { 174 MojoCreateDataPipeOptions test_options[] = { 175 // Default options. 176 {}, 177 // Trivial element size, non-default capacity. 178 {kSizeOfOptions, // |struct_size|. 179 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 180 1, // |element_num_bytes|. 181 1000}, // |capacity_num_bytes|. 182 // Nontrivial element size, non-default capacity. 183 {kSizeOfOptions, // |struct_size|. 184 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 185 4, // |element_num_bytes|. 186 4000}, // |capacity_num_bytes|. 187 // Nontrivial element size, default capacity. 188 {kSizeOfOptions, // |struct_size|. 189 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 190 100, // |element_num_bytes|. 191 0} // |capacity_num_bytes|. 192 }; 193 for (size_t i = 0; i < arraysize(test_options); i++) { 194 MojoHandle producer_handle, consumer_handle; 195 MojoCreateDataPipeOptions* options = 196 i ? &test_options[i] : nullptr; 197 ASSERT_EQ(MOJO_RESULT_OK, 198 MojoCreateDataPipe(options, &producer_handle, &consumer_handle)); 199 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(producer_handle)); 200 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(consumer_handle)); 201 } 202 } 203 204 TEST_F(DataPipeTest, SimpleReadWrite) { 205 const MojoCreateDataPipeOptions options = { 206 kSizeOfOptions, // |struct_size|. 207 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 208 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 209 1000 * sizeof(int32_t) // |capacity_num_bytes|. 210 }; 211 212 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 213 MojoHandleSignalsState hss; 214 215 int32_t elements[10] = {}; 216 uint32_t num_bytes = 0; 217 218 // Try reading; nothing there yet. 219 num_bytes = 220 static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0])); 221 ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, ReadData(elements, &num_bytes)); 222 223 // Query; nothing there yet. 224 num_bytes = 0; 225 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 226 ASSERT_EQ(0u, num_bytes); 227 228 // Discard; nothing there yet. 229 num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0])); 230 ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, DiscardData(&num_bytes)); 231 232 // Read with invalid |num_bytes|. 233 num_bytes = sizeof(elements[0]) + 1; 234 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, ReadData(elements, &num_bytes)); 235 236 // Write two elements. 237 elements[0] = 123; 238 elements[1] = 456; 239 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 240 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes)); 241 // It should have written everything (even without "all or none"). 242 ASSERT_EQ(2u * sizeof(elements[0]), num_bytes); 243 244 // Wait. 245 ASSERT_EQ(MOJO_RESULT_OK, 246 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 247 MOJO_DEADLINE_INDEFINITE, &hss)); 248 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 249 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 250 hss.satisfiable_signals); 251 252 // Query. 253 // TODO(vtl): It's theoretically possible (though not with the current 254 // implementation/configured limits) that not all the data has arrived yet. 255 // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...| 256 // or |2 * ...|.) 257 num_bytes = 0; 258 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 259 ASSERT_EQ(2 * sizeof(elements[0]), num_bytes); 260 261 // Read one element. 262 elements[0] = -1; 263 elements[1] = -1; 264 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 265 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes)); 266 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes); 267 ASSERT_EQ(123, elements[0]); 268 ASSERT_EQ(-1, elements[1]); 269 270 // Query. 271 // TODO(vtl): See previous TODO. (If we got 2 elements there, however, we 272 // should get 1 here.) 273 num_bytes = 0; 274 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 275 ASSERT_EQ(1 * sizeof(elements[0]), num_bytes); 276 277 // Peek one element. 278 elements[0] = -1; 279 elements[1] = -1; 280 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 281 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, true)); 282 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes); 283 ASSERT_EQ(456, elements[0]); 284 ASSERT_EQ(-1, elements[1]); 285 286 // Query. Still has 1 element remaining. 287 num_bytes = 0; 288 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 289 ASSERT_EQ(1 * sizeof(elements[0]), num_bytes); 290 291 // Try to read two elements, with "all or none". 292 elements[0] = -1; 293 elements[1] = -1; 294 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 295 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, 296 ReadData(elements, &num_bytes, true, false)); 297 ASSERT_EQ(-1, elements[0]); 298 ASSERT_EQ(-1, elements[1]); 299 300 // Try to read two elements, without "all or none". 301 elements[0] = -1; 302 elements[1] = -1; 303 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 304 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, false)); 305 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes); 306 ASSERT_EQ(456, elements[0]); 307 ASSERT_EQ(-1, elements[1]); 308 309 // Query. 310 num_bytes = 0; 311 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 312 ASSERT_EQ(0u, num_bytes); 313 } 314 315 // Note: The "basic" waiting tests test that the "wait states" are correct in 316 // various situations; they don't test that waiters are properly awoken on state 317 // changes. (For that, we need to use multiple threads.) 318 TEST_F(DataPipeTest, BasicProducerWaiting) { 319 // Note: We take advantage of the fact that current for current 320 // implementations capacities are strict maximums. This is not guaranteed by 321 // the API. 322 323 const MojoCreateDataPipeOptions options = { 324 kSizeOfOptions, // |struct_size|. 325 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 326 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 327 2 * sizeof(int32_t) // |capacity_num_bytes|. 328 }; 329 Create(&options); 330 MojoHandleSignalsState hss; 331 332 // Never readable. 333 hss = MojoHandleSignalsState(); 334 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 335 MojoWait(producer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); 336 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); 337 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 338 hss.satisfiable_signals); 339 340 // Already writable. 341 hss = MojoHandleSignalsState(); 342 ASSERT_EQ(MOJO_RESULT_OK, 343 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss)); 344 345 // Write two elements. 346 int32_t elements[2] = {123, 456}; 347 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 348 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true)); 349 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes); 350 351 // Wait for data to become available to the consumer. 352 ASSERT_EQ(MOJO_RESULT_OK, 353 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 354 MOJO_DEADLINE_INDEFINITE, &hss)); 355 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 356 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 357 hss.satisfiable_signals); 358 359 // Peek one element. 360 elements[0] = -1; 361 elements[1] = -1; 362 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 363 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true)); 364 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 365 ASSERT_EQ(123, elements[0]); 366 ASSERT_EQ(-1, elements[1]); 367 368 // Read one element. 369 elements[0] = -1; 370 elements[1] = -1; 371 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 372 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, false)); 373 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 374 ASSERT_EQ(123, elements[0]); 375 ASSERT_EQ(-1, elements[1]); 376 377 // Try writing, using a two-phase write. 378 void* buffer = nullptr; 379 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0])); 380 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes)); 381 EXPECT_TRUE(buffer); 382 ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0]))); 383 384 static_cast<int32_t*>(buffer)[0] = 789; 385 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(static_cast<uint32_t>( 386 1u * sizeof(elements[0])))); 387 388 // Read one element, using a two-phase read. 389 const void* read_buffer = nullptr; 390 num_bytes = 0u; 391 ASSERT_EQ(MOJO_RESULT_OK, 392 BeginReadData(&read_buffer, &num_bytes, false)); 393 EXPECT_TRUE(read_buffer); 394 // The two-phase read should be able to read at least one element. 395 ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0]))); 396 ASSERT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]); 397 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(static_cast<uint32_t>( 398 1u * sizeof(elements[0])))); 399 400 // Write one element. 401 elements[0] = 123; 402 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 403 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes)); 404 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 405 406 // Close the consumer. 407 CloseConsumer(); 408 409 // It should now be never-writable. 410 hss = MojoHandleSignalsState(); 411 ASSERT_EQ(MOJO_RESULT_OK, 412 MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 413 MOJO_DEADLINE_INDEFINITE, &hss)); 414 hss = MojoHandleSignalsState(); 415 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 416 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss)); 417 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 418 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 419 } 420 421 TEST_F(DataPipeTest, PeerClosedProducerWaiting) { 422 const MojoCreateDataPipeOptions options = { 423 kSizeOfOptions, // |struct_size|. 424 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 425 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 426 2 * sizeof(int32_t) // |capacity_num_bytes|. 427 }; 428 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 429 MojoHandleSignalsState hss; 430 431 // Close the consumer. 432 CloseConsumer(); 433 434 // It should be signaled. 435 hss = MojoHandleSignalsState(); 436 ASSERT_EQ(MOJO_RESULT_OK, 437 MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 438 MOJO_DEADLINE_INDEFINITE, &hss)); 439 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 440 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 441 } 442 443 TEST_F(DataPipeTest, PeerClosedConsumerWaiting) { 444 const MojoCreateDataPipeOptions options = { 445 kSizeOfOptions, // |struct_size|. 446 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 447 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 448 2 * sizeof(int32_t) // |capacity_num_bytes|. 449 }; 450 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 451 MojoHandleSignalsState hss; 452 453 // Close the producer. 454 CloseProducer(); 455 456 // It should be signaled. 457 hss = MojoHandleSignalsState(); 458 ASSERT_EQ(MOJO_RESULT_OK, 459 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 460 MOJO_DEADLINE_INDEFINITE, &hss)); 461 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 462 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 463 } 464 465 TEST_F(DataPipeTest, BasicConsumerWaiting) { 466 const MojoCreateDataPipeOptions options = { 467 kSizeOfOptions, // |struct_size|. 468 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 469 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 470 1000 * sizeof(int32_t) // |capacity_num_bytes|. 471 }; 472 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 473 MojoHandleSignalsState hss; 474 475 // Never writable. 476 hss = MojoHandleSignalsState(); 477 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 478 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_WRITABLE, 479 MOJO_DEADLINE_INDEFINITE, &hss)); 480 ASSERT_EQ(0u, hss.satisfied_signals); 481 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 482 hss.satisfiable_signals); 483 484 // Write two elements. 485 int32_t elements[2] = {123, 456}; 486 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 487 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true)); 488 489 // Wait for readability. 490 hss = MojoHandleSignalsState(); 491 ASSERT_EQ(MOJO_RESULT_OK, 492 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 493 MOJO_DEADLINE_INDEFINITE, &hss)); 494 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 495 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 496 hss.satisfiable_signals); 497 498 // Discard one element. 499 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 500 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true)); 501 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 502 503 // Should still be readable. 504 hss = MojoHandleSignalsState(); 505 ASSERT_EQ(MOJO_RESULT_OK, 506 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 507 MOJO_DEADLINE_INDEFINITE, &hss)); 508 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 509 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 510 hss.satisfiable_signals); 511 512 // Peek one element. 513 elements[0] = -1; 514 elements[1] = -1; 515 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 516 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true)); 517 ASSERT_EQ(456, elements[0]); 518 ASSERT_EQ(-1, elements[1]); 519 520 // Should still be readable. 521 hss = MojoHandleSignalsState(); 522 ASSERT_EQ(MOJO_RESULT_OK, 523 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 524 MOJO_DEADLINE_INDEFINITE, &hss)); 525 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 526 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 527 hss.satisfiable_signals); 528 529 // Read one element. 530 elements[0] = -1; 531 elements[1] = -1; 532 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 533 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true)); 534 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 535 ASSERT_EQ(456, elements[0]); 536 ASSERT_EQ(-1, elements[1]); 537 538 // Write one element. 539 elements[0] = 789; 540 elements[1] = -1; 541 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 542 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true)); 543 544 // Waiting should now succeed. 545 hss = MojoHandleSignalsState(); 546 ASSERT_EQ(MOJO_RESULT_OK, 547 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 548 MOJO_DEADLINE_INDEFINITE, &hss)); 549 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 550 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 551 hss.satisfiable_signals); 552 553 // Close the producer. 554 CloseProducer(); 555 556 // Should still be readable. 557 hss = MojoHandleSignalsState(); 558 ASSERT_EQ(MOJO_RESULT_OK, 559 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 560 MOJO_DEADLINE_INDEFINITE, &hss)); 561 ASSERT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE) != 0); 562 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 563 hss.satisfiable_signals); 564 565 // Wait for the peer closed signal. 566 hss = MojoHandleSignalsState(); 567 ASSERT_EQ(MOJO_RESULT_OK, 568 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 569 MOJO_DEADLINE_INDEFINITE, &hss)); 570 ASSERT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED) != 0); 571 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 572 hss.satisfiable_signals); 573 574 // Read one element. 575 elements[0] = -1; 576 elements[1] = -1; 577 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); 578 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true)); 579 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 580 ASSERT_EQ(789, elements[0]); 581 ASSERT_EQ(-1, elements[1]); 582 583 // Should be never-readable. 584 hss = MojoHandleSignalsState(); 585 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 586 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 587 MOJO_DEADLINE_INDEFINITE, &hss)); 588 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 589 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 590 } 591 592 // Test with two-phase APIs and also closing the producer with an active 593 // consumer waiter. 594 TEST_F(DataPipeTest, ConsumerWaitingTwoPhase) { 595 const MojoCreateDataPipeOptions options = { 596 kSizeOfOptions, // |struct_size|. 597 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 598 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 599 1000 * sizeof(int32_t) // |capacity_num_bytes|. 600 }; 601 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 602 MojoHandleSignalsState hss; 603 604 // Write two elements. 605 int32_t* elements = nullptr; 606 void* buffer = nullptr; 607 // Request room for three (but we'll only write two). 608 uint32_t num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0])); 609 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes, true)); 610 EXPECT_TRUE(buffer); 611 EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0]))); 612 elements = static_cast<int32_t*>(buffer); 613 elements[0] = 123; 614 elements[1] = 456; 615 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(2u * sizeof(elements[0]))); 616 617 // Wait for readability. 618 hss = MojoHandleSignalsState(); 619 ASSERT_EQ(MOJO_RESULT_OK, 620 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 621 MOJO_DEADLINE_INDEFINITE, &hss)); 622 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 623 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 624 hss.satisfiable_signals); 625 626 // Read one element. 627 // Request two in all-or-none mode, but only read one. 628 const void* read_buffer = nullptr; 629 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); 630 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes, true)); 631 EXPECT_TRUE(read_buffer); 632 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes); 633 const int32_t* read_elements = static_cast<const int32_t*>(read_buffer); 634 ASSERT_EQ(123, read_elements[0]); 635 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0]))); 636 637 // Should still be readable. 638 hss = MojoHandleSignalsState(); 639 ASSERT_EQ(MOJO_RESULT_OK, 640 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 641 MOJO_DEADLINE_INDEFINITE, &hss)); 642 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 643 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 644 hss.satisfiable_signals); 645 646 // Read one element. 647 // Request three, but not in all-or-none mode. 648 read_buffer = nullptr; 649 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0])); 650 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes)); 651 EXPECT_TRUE(read_buffer); 652 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); 653 read_elements = static_cast<const int32_t*>(read_buffer); 654 ASSERT_EQ(456, read_elements[0]); 655 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0]))); 656 657 // Close the producer. 658 CloseProducer(); 659 660 // Should be never-readable. 661 hss = MojoHandleSignalsState(); 662 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 663 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 664 MOJO_DEADLINE_INDEFINITE, &hss)); 665 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 666 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 667 } 668 669 // Tests that data pipes aren't writable/readable during two-phase writes/reads. 670 TEST_F(DataPipeTest, BasicTwoPhaseWaiting) { 671 const MojoCreateDataPipeOptions options = { 672 kSizeOfOptions, // |struct_size|. 673 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 674 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 675 1000 * sizeof(int32_t) // |capacity_num_bytes|. 676 }; 677 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 678 MojoHandleSignalsState hss; 679 680 // It should be writable. 681 hss = MojoHandleSignalsState(); 682 ASSERT_EQ(MOJO_RESULT_OK, 683 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss)); 684 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); 685 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 686 hss.satisfiable_signals); 687 688 uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t)); 689 void* write_ptr = nullptr; 690 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes)); 691 EXPECT_TRUE(write_ptr); 692 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t))); 693 694 // At this point, it shouldn't be writable. 695 hss = MojoHandleSignalsState(); 696 ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, 697 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss)); 698 ASSERT_EQ(0u, hss.satisfied_signals); 699 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 700 hss.satisfiable_signals); 701 702 // It shouldn't be readable yet either (we'll wait later). 703 hss = MojoHandleSignalsState(); 704 ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, 705 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); 706 ASSERT_EQ(0u, hss.satisfied_signals); 707 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 708 hss.satisfiable_signals); 709 710 static_cast<int32_t*>(write_ptr)[0] = 123; 711 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(1u * sizeof(int32_t))); 712 713 // It should immediately be writable again. 714 hss = MojoHandleSignalsState(); 715 ASSERT_EQ(MOJO_RESULT_OK, 716 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss)); 717 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); 718 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 719 hss.satisfiable_signals); 720 721 // It should become readable. 722 hss = MojoHandleSignalsState(); 723 ASSERT_EQ(MOJO_RESULT_OK, 724 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 725 MOJO_DEADLINE_INDEFINITE, &hss)); 726 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 727 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 728 hss.satisfiable_signals); 729 730 // Start another two-phase write and check that it's readable even in the 731 // middle of it. 732 num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t)); 733 write_ptr = nullptr; 734 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes)); 735 EXPECT_TRUE(write_ptr); 736 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t))); 737 738 // It should be readable. 739 hss = MojoHandleSignalsState(); 740 ASSERT_EQ(MOJO_RESULT_OK, 741 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 742 MOJO_DEADLINE_INDEFINITE, &hss)); 743 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 744 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 745 hss.satisfiable_signals); 746 747 // End the two-phase write without writing anything. 748 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0u)); 749 750 // Start a two-phase read. 751 num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t)); 752 const void* read_ptr = nullptr; 753 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes)); 754 EXPECT_TRUE(read_ptr); 755 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes); 756 757 // At this point, it should still be writable. 758 hss = MojoHandleSignalsState(); 759 ASSERT_EQ(MOJO_RESULT_OK, 760 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss)); 761 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); 762 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 763 hss.satisfiable_signals); 764 765 // But not readable. 766 hss = MojoHandleSignalsState(); 767 ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, 768 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); 769 ASSERT_EQ(0u, hss.satisfied_signals); 770 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 771 hss.satisfiable_signals); 772 773 // End the two-phase read without reading anything. 774 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0u)); 775 776 // It should be readable again. 777 hss = MojoHandleSignalsState(); 778 ASSERT_EQ(MOJO_RESULT_OK, 779 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); 780 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 781 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 782 hss.satisfiable_signals); 783 } 784 785 void Seq(int32_t start, size_t count, int32_t* out) { 786 for (size_t i = 0; i < count; i++) 787 out[i] = start + static_cast<int32_t>(i); 788 } 789 790 TEST_F(DataPipeTest, AllOrNone) { 791 const MojoCreateDataPipeOptions options = { 792 kSizeOfOptions, // |struct_size|. 793 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 794 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 795 10 * sizeof(int32_t) // |capacity_num_bytes|. 796 }; 797 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 798 MojoHandleSignalsState hss; 799 800 // Try writing way too much. 801 uint32_t num_bytes = 20u * sizeof(int32_t); 802 int32_t buffer[100]; 803 Seq(0, arraysize(buffer), buffer); 804 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true)); 805 806 // Should still be empty. 807 num_bytes = ~0u; 808 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 809 ASSERT_EQ(0u, num_bytes); 810 811 // Write some data. 812 num_bytes = 5u * sizeof(int32_t); 813 Seq(100, arraysize(buffer), buffer); 814 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true)); 815 ASSERT_EQ(5u * sizeof(int32_t), num_bytes); 816 817 // Wait for data. 818 // TODO(vtl): There's no real guarantee that all the data will become 819 // available at once (except that in current implementations, with reasonable 820 // limits, it will). Eventually, we'll be able to wait for a specified amount 821 // of data to become available. 822 hss = MojoHandleSignalsState(); 823 ASSERT_EQ(MOJO_RESULT_OK, 824 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 825 MOJO_DEADLINE_INDEFINITE, &hss)); 826 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 827 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 828 hss.satisfiable_signals); 829 830 // Half full. 831 num_bytes = 0u; 832 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 833 ASSERT_EQ(5u * sizeof(int32_t), num_bytes); 834 835 /* TODO(jam): enable if we end up observing max capacity 836 // Too much. 837 num_bytes = 6u * sizeof(int32_t); 838 Seq(200, arraysize(buffer), buffer); 839 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true)); 840 */ 841 842 // Try reading too much. 843 num_bytes = 11u * sizeof(int32_t); 844 memset(buffer, 0xab, sizeof(buffer)); 845 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true)); 846 int32_t expected_buffer[100]; 847 memset(expected_buffer, 0xab, sizeof(expected_buffer)); 848 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); 849 850 // Try discarding too much. 851 num_bytes = 11u * sizeof(int32_t); 852 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true)); 853 854 // Just a little. 855 num_bytes = 2u * sizeof(int32_t); 856 Seq(300, arraysize(buffer), buffer); 857 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true)); 858 ASSERT_EQ(2u * sizeof(int32_t), num_bytes); 859 860 // Just right. 861 num_bytes = 3u * sizeof(int32_t); 862 Seq(400, arraysize(buffer), buffer); 863 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true)); 864 ASSERT_EQ(3u * sizeof(int32_t), num_bytes); 865 866 // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a 867 // specified amount of data to be available, so poll. 868 for (size_t i = 0; i < kMaxPoll; i++) { 869 num_bytes = 0u; 870 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 871 if (num_bytes >= 10u * sizeof(int32_t)) 872 break; 873 874 test::Sleep(test::EpsilonDeadline()); 875 } 876 ASSERT_EQ(10u * sizeof(int32_t), num_bytes); 877 878 // Read half. 879 num_bytes = 5u * sizeof(int32_t); 880 memset(buffer, 0xab, sizeof(buffer)); 881 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true)); 882 ASSERT_EQ(5u * sizeof(int32_t), num_bytes); 883 memset(expected_buffer, 0xab, sizeof(expected_buffer)); 884 Seq(100, 5, expected_buffer); 885 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); 886 887 // Try reading too much again. 888 num_bytes = 6u * sizeof(int32_t); 889 memset(buffer, 0xab, sizeof(buffer)); 890 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true)); 891 memset(expected_buffer, 0xab, sizeof(expected_buffer)); 892 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); 893 894 // Try discarding too much again. 895 num_bytes = 6u * sizeof(int32_t); 896 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true)); 897 898 // Discard a little. 899 num_bytes = 2u * sizeof(int32_t); 900 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true)); 901 ASSERT_EQ(2u * sizeof(int32_t), num_bytes); 902 903 // Three left. 904 num_bytes = 0u; 905 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 906 ASSERT_EQ(3u * sizeof(int32_t), num_bytes); 907 908 // Close the producer, then test producer-closed cases. 909 CloseProducer(); 910 911 // Wait. 912 hss = MojoHandleSignalsState(); 913 ASSERT_EQ(MOJO_RESULT_OK, 914 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 915 MOJO_DEADLINE_INDEFINITE, &hss)); 916 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 917 hss.satisfied_signals); 918 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 919 hss.satisfiable_signals); 920 921 // Try reading too much; "failed precondition" since the producer is closed. 922 num_bytes = 4u * sizeof(int32_t); 923 memset(buffer, 0xab, sizeof(buffer)); 924 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 925 ReadData(buffer, &num_bytes, true)); 926 memset(expected_buffer, 0xab, sizeof(expected_buffer)); 927 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); 928 929 // Try discarding too much; "failed precondition" again. 930 num_bytes = 4u * sizeof(int32_t); 931 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes, true)); 932 933 // Read a little. 934 num_bytes = 2u * sizeof(int32_t); 935 memset(buffer, 0xab, sizeof(buffer)); 936 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true)); 937 ASSERT_EQ(2u * sizeof(int32_t), num_bytes); 938 memset(expected_buffer, 0xab, sizeof(expected_buffer)); 939 Seq(400, 2, expected_buffer); 940 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); 941 942 // Discard the remaining element. 943 num_bytes = 1u * sizeof(int32_t); 944 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true)); 945 ASSERT_EQ(1u * sizeof(int32_t), num_bytes); 946 947 // Empty again. 948 num_bytes = ~0u; 949 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 950 ASSERT_EQ(0u, num_bytes); 951 } 952 953 // Tests that |ProducerWriteData()| and |ConsumerReadData()| writes and reads, 954 // respectively, as much as possible, even if it may have to "wrap around" the 955 // internal circular buffer. (Note that the two-phase write and read need not do 956 // this.) 957 TEST_F(DataPipeTest, WrapAround) { 958 unsigned char test_data[1000]; 959 for (size_t i = 0; i < arraysize(test_data); i++) 960 test_data[i] = static_cast<unsigned char>(i); 961 962 const MojoCreateDataPipeOptions options = { 963 kSizeOfOptions, // |struct_size|. 964 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 965 1u, // |element_num_bytes|. 966 100u // |capacity_num_bytes|. 967 }; 968 969 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 970 MojoHandleSignalsState hss; 971 972 // Write 20 bytes. 973 uint32_t num_bytes = 20u; 974 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&test_data[0], &num_bytes, true)); 975 ASSERT_EQ(20u, num_bytes); 976 977 // Wait for data. 978 ASSERT_EQ(MOJO_RESULT_OK, 979 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 980 MOJO_DEADLINE_INDEFINITE, &hss)); 981 ASSERT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE) != 0); 982 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 983 hss.satisfiable_signals); 984 985 // Read 10 bytes. 986 unsigned char read_buffer[1000] = {0}; 987 num_bytes = 10u; 988 ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes, true)); 989 ASSERT_EQ(10u, num_bytes); 990 ASSERT_EQ(0, memcmp(read_buffer, &test_data[0], 10u)); 991 992 // Check that a two-phase write can now only write (at most) 80 bytes. (This 993 // checks an implementation detail; this behavior is not guaranteed.) 994 void* write_buffer_ptr = nullptr; 995 num_bytes = 0u; 996 ASSERT_EQ(MOJO_RESULT_OK, 997 BeginWriteData(&write_buffer_ptr, &num_bytes, false)); 998 EXPECT_TRUE(write_buffer_ptr); 999 ASSERT_EQ(80u, num_bytes); 1000 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0)); 1001 1002 size_t total_num_bytes = 0; 1003 while (total_num_bytes < 90) { 1004 // Wait to write. 1005 ASSERT_EQ(MOJO_RESULT_OK, 1006 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 1007 MOJO_DEADLINE_INDEFINITE, &hss)); 1008 ASSERT_EQ(hss.satisfied_signals, MOJO_HANDLE_SIGNAL_WRITABLE); 1009 ASSERT_EQ(hss.satisfiable_signals, 1010 MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED); 1011 1012 // Write as much as we can. 1013 num_bytes = 100; 1014 ASSERT_EQ(MOJO_RESULT_OK, 1015 WriteData(&test_data[20 + total_num_bytes], &num_bytes, false)); 1016 total_num_bytes += num_bytes; 1017 } 1018 1019 ASSERT_EQ(90u, total_num_bytes); 1020 1021 num_bytes = 0; 1022 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1023 ASSERT_EQ(100u, num_bytes); 1024 1025 // Check that a two-phase read can now only read (at most) 90 bytes. (This 1026 // checks an implementation detail; this behavior is not guaranteed.) 1027 const void* read_buffer_ptr = nullptr; 1028 num_bytes = 0; 1029 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes, false)); 1030 EXPECT_TRUE(read_buffer_ptr); 1031 ASSERT_EQ(90u, num_bytes); 1032 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0)); 1033 1034 // Read as much as possible. We should read 100 bytes. 1035 num_bytes = static_cast<uint32_t>(arraysize(read_buffer) * 1036 sizeof(read_buffer[0])); 1037 memset(read_buffer, 0, num_bytes); 1038 ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes)); 1039 ASSERT_EQ(100u, num_bytes); 1040 ASSERT_EQ(0, memcmp(read_buffer, &test_data[10], 100u)); 1041 } 1042 1043 // Tests the behavior of writing (simple and two-phase), closing the producer, 1044 // then reading (simple and two-phase). 1045 TEST_F(DataPipeTest, WriteCloseProducerRead) { 1046 const char kTestData[] = "hello world"; 1047 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData)); 1048 1049 const MojoCreateDataPipeOptions options = { 1050 kSizeOfOptions, // |struct_size|. 1051 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1052 1u, // |element_num_bytes|. 1053 1000u // |capacity_num_bytes|. 1054 }; 1055 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1056 1057 // Write some data, so we'll have something to read. 1058 uint32_t num_bytes = kTestDataSize; 1059 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false)); 1060 ASSERT_EQ(kTestDataSize, num_bytes); 1061 1062 // Write it again, so we'll have something left over. 1063 num_bytes = kTestDataSize; 1064 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false)); 1065 ASSERT_EQ(kTestDataSize, num_bytes); 1066 1067 // Start two-phase write. 1068 void* write_buffer_ptr = nullptr; 1069 num_bytes = 0u; 1070 ASSERT_EQ(MOJO_RESULT_OK, 1071 BeginWriteData(&write_buffer_ptr, &num_bytes, false)); 1072 EXPECT_TRUE(write_buffer_ptr); 1073 EXPECT_GT(num_bytes, 0u); 1074 1075 // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.) 1076 for (size_t i = 0; i < kMaxPoll; i++) { 1077 num_bytes = 0u; 1078 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1079 if (num_bytes >= 2u * kTestDataSize) 1080 break; 1081 1082 test::Sleep(test::EpsilonDeadline()); 1083 } 1084 ASSERT_EQ(2u * kTestDataSize, num_bytes); 1085 1086 // Start two-phase read. 1087 const void* read_buffer_ptr = nullptr; 1088 num_bytes = 0u; 1089 ASSERT_EQ(MOJO_RESULT_OK, 1090 BeginReadData(&read_buffer_ptr, &num_bytes)); 1091 EXPECT_TRUE(read_buffer_ptr); 1092 ASSERT_EQ(2u * kTestDataSize, num_bytes); 1093 1094 // Close the producer. 1095 CloseProducer(); 1096 1097 // The consumer can finish its two-phase read. 1098 ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize)); 1099 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(kTestDataSize)); 1100 1101 // And start another. 1102 read_buffer_ptr = nullptr; 1103 num_bytes = 0u; 1104 ASSERT_EQ(MOJO_RESULT_OK, 1105 BeginReadData(&read_buffer_ptr, &num_bytes)); 1106 EXPECT_TRUE(read_buffer_ptr); 1107 ASSERT_EQ(kTestDataSize, num_bytes); 1108 } 1109 1110 1111 // Tests the behavior of interrupting a two-phase read and write by closing the 1112 // consumer. 1113 TEST_F(DataPipeTest, TwoPhaseWriteReadCloseConsumer) { 1114 const char kTestData[] = "hello world"; 1115 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData)); 1116 1117 const MojoCreateDataPipeOptions options = { 1118 kSizeOfOptions, // |struct_size|. 1119 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1120 1u, // |element_num_bytes|. 1121 1000u // |capacity_num_bytes|. 1122 }; 1123 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1124 MojoHandleSignalsState hss; 1125 1126 // Write some data, so we'll have something to read. 1127 uint32_t num_bytes = kTestDataSize; 1128 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes)); 1129 ASSERT_EQ(kTestDataSize, num_bytes); 1130 1131 // Start two-phase write. 1132 void* write_buffer_ptr = nullptr; 1133 num_bytes = 0u; 1134 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes)); 1135 EXPECT_TRUE(write_buffer_ptr); 1136 ASSERT_GT(num_bytes, kTestDataSize); 1137 1138 // Wait for data. 1139 // TODO(vtl): (See corresponding TODO in AllOrNone.) 1140 hss = MojoHandleSignalsState(); 1141 ASSERT_EQ(MOJO_RESULT_OK, 1142 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 1143 MOJO_DEADLINE_INDEFINITE, &hss)); 1144 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 1145 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1146 hss.satisfiable_signals); 1147 1148 // Start two-phase read. 1149 const void* read_buffer_ptr = nullptr; 1150 num_bytes = 0u; 1151 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes)); 1152 EXPECT_TRUE(read_buffer_ptr); 1153 ASSERT_EQ(kTestDataSize, num_bytes); 1154 1155 // Close the consumer. 1156 CloseConsumer(); 1157 1158 // Wait for producer to know that the consumer is closed. 1159 hss = MojoHandleSignalsState(); 1160 ASSERT_EQ(MOJO_RESULT_OK, 1161 MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1162 MOJO_DEADLINE_INDEFINITE, &hss)); 1163 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 1164 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); 1165 1166 // Actually write some data. (Note: Premature freeing of the buffer would 1167 // probably only be detected under ASAN or similar.) 1168 memcpy(write_buffer_ptr, kTestData, kTestDataSize); 1169 // Note: Even though the consumer has been closed, ending the two-phase 1170 // write will report success. 1171 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(kTestDataSize)); 1172 1173 // But trying to write should result in failure. 1174 num_bytes = kTestDataSize; 1175 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, WriteData(kTestData, &num_bytes)); 1176 1177 // As will trying to start another two-phase write. 1178 write_buffer_ptr = nullptr; 1179 num_bytes = 0u; 1180 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 1181 BeginWriteData(&write_buffer_ptr, &num_bytes)); 1182 } 1183 1184 // Tests the behavior of "interrupting" a two-phase write by closing both the 1185 // producer and the consumer. 1186 TEST_F(DataPipeTest, TwoPhaseWriteCloseBoth) { 1187 const uint32_t kTestDataSize = 15u; 1188 1189 const MojoCreateDataPipeOptions options = { 1190 kSizeOfOptions, // |struct_size|. 1191 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1192 1u, // |element_num_bytes|. 1193 1000u // |capacity_num_bytes|. 1194 }; 1195 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1196 1197 // Start two-phase write. 1198 void* write_buffer_ptr = nullptr; 1199 uint32_t num_bytes = 0u; 1200 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes)); 1201 EXPECT_TRUE(write_buffer_ptr); 1202 ASSERT_GT(num_bytes, kTestDataSize); 1203 } 1204 1205 // Tests the behavior of writing, closing the producer, and then reading (with 1206 // and without data remaining). 1207 TEST_F(DataPipeTest, WriteCloseProducerReadNoData) { 1208 const char kTestData[] = "hello world"; 1209 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData)); 1210 1211 const MojoCreateDataPipeOptions options = { 1212 kSizeOfOptions, // |struct_size|. 1213 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1214 1u, // |element_num_bytes|. 1215 1000u // |capacity_num_bytes|. 1216 }; 1217 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1218 MojoHandleSignalsState hss; 1219 1220 // Write some data, so we'll have something to read. 1221 uint32_t num_bytes = kTestDataSize; 1222 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes)); 1223 ASSERT_EQ(kTestDataSize, num_bytes); 1224 1225 // Close the producer. 1226 CloseProducer(); 1227 1228 // Wait. (Note that once the consumer knows that the producer is closed, it 1229 // must also know about all the data that was sent.) 1230 hss = MojoHandleSignalsState(); 1231 ASSERT_EQ(MOJO_RESULT_OK, 1232 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1233 MOJO_DEADLINE_INDEFINITE, &hss)); 1234 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1235 hss.satisfied_signals); 1236 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1237 hss.satisfiable_signals); 1238 1239 // Peek that data. 1240 char buffer[1000]; 1241 num_bytes = static_cast<uint32_t>(sizeof(buffer)); 1242 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, false, true)); 1243 ASSERT_EQ(kTestDataSize, num_bytes); 1244 ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize)); 1245 1246 // Read that data. 1247 memset(buffer, 0, 1000); 1248 num_bytes = static_cast<uint32_t>(sizeof(buffer)); 1249 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes)); 1250 ASSERT_EQ(kTestDataSize, num_bytes); 1251 ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize)); 1252 1253 // A second read should fail. 1254 num_bytes = static_cast<uint32_t>(sizeof(buffer)); 1255 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ReadData(buffer, &num_bytes)); 1256 1257 // A two-phase read should also fail. 1258 const void* read_buffer_ptr = nullptr; 1259 num_bytes = 0u; 1260 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 1261 BeginReadData(&read_buffer_ptr, &num_bytes)); 1262 1263 // Ditto for discard. 1264 num_bytes = 10u; 1265 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes)); 1266 } 1267 1268 // Test that during a two phase read the memory stays valid even if more data 1269 // comes in. 1270 TEST_F(DataPipeTest, TwoPhaseReadMemoryStable) { 1271 const char kTestData[] = "hello world"; 1272 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData)); 1273 1274 const MojoCreateDataPipeOptions options = { 1275 kSizeOfOptions, // |struct_size|. 1276 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1277 1u, // |element_num_bytes|. 1278 1000u // |capacity_num_bytes|. 1279 }; 1280 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1281 MojoHandleSignalsState hss; 1282 1283 // Write some data. 1284 uint32_t num_bytes = kTestDataSize; 1285 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes)); 1286 ASSERT_EQ(kTestDataSize, num_bytes); 1287 1288 // Wait for the data. 1289 hss = MojoHandleSignalsState(); 1290 ASSERT_EQ(MOJO_RESULT_OK, 1291 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 1292 MOJO_DEADLINE_INDEFINITE, &hss)); 1293 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 1294 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1295 hss.satisfiable_signals); 1296 1297 // Begin a two-phase read. 1298 const void* read_buffer_ptr = nullptr; 1299 uint32_t read_buffer_size = 0u; 1300 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &read_buffer_size)); 1301 1302 // Write more data. 1303 const char kExtraData[] = "bye world"; 1304 const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData)); 1305 num_bytes = kExtraDataSize; 1306 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes)); 1307 ASSERT_EQ(kExtraDataSize, num_bytes); 1308 1309 // Close the producer. 1310 CloseProducer(); 1311 1312 // Wait. (Note that once the consumer knows that the producer is closed, it 1313 // must also have received the extra data). 1314 hss = MojoHandleSignalsState(); 1315 ASSERT_EQ(MOJO_RESULT_OK, 1316 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1317 MOJO_DEADLINE_INDEFINITE, &hss)); 1318 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); 1319 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1320 hss.satisfiable_signals); 1321 1322 // Read the two phase memory to check it's still valid. 1323 ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize)); 1324 EndReadData(read_buffer_size); 1325 } 1326 1327 // Test that two-phase reads/writes behave correctly when given invalid 1328 // arguments. 1329 TEST_F(DataPipeTest, TwoPhaseMoreInvalidArguments) { 1330 const MojoCreateDataPipeOptions options = { 1331 kSizeOfOptions, // |struct_size|. 1332 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1333 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 1334 10 * sizeof(int32_t) // |capacity_num_bytes|. 1335 }; 1336 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1337 MojoHandleSignalsState hss; 1338 1339 // No data. 1340 uint32_t num_bytes = 1000u; 1341 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1342 ASSERT_EQ(0u, num_bytes); 1343 1344 // Try "ending" a two-phase write when one isn't active. 1345 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 1346 EndWriteData(1u * sizeof(int32_t))); 1347 1348 // Wait a bit, to make sure that if a signal were (incorrectly) sent, it'd 1349 // have time to propagate. 1350 test::Sleep(test::EpsilonDeadline()); 1351 1352 // Still no data. 1353 num_bytes = 1000u; 1354 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1355 ASSERT_EQ(0u, num_bytes); 1356 1357 // Try ending a two-phase write with an invalid amount (too much). 1358 num_bytes = 0u; 1359 void* write_ptr = nullptr; 1360 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes)); 1361 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, 1362 EndWriteData(num_bytes + static_cast<uint32_t>(sizeof(int32_t)))); 1363 1364 // But the two-phase write still ended. 1365 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u)); 1366 1367 // Wait a bit (as above). 1368 test::Sleep(test::EpsilonDeadline()); 1369 1370 // Still no data. 1371 num_bytes = 1000u; 1372 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1373 ASSERT_EQ(0u, num_bytes); 1374 1375 // Try ending a two-phase write with an invalid amount (not a multiple of the 1376 // element size). 1377 num_bytes = 0u; 1378 write_ptr = nullptr; 1379 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes)); 1380 EXPECT_GE(num_bytes, 1u); 1381 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndWriteData(1u)); 1382 1383 // But the two-phase write still ended. 1384 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u)); 1385 1386 // Wait a bit (as above). 1387 test::Sleep(test::EpsilonDeadline()); 1388 1389 // Still no data. 1390 num_bytes = 1000u; 1391 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1392 ASSERT_EQ(0u, num_bytes); 1393 1394 // Now write some data, so we'll be able to try reading. 1395 int32_t element = 123; 1396 num_bytes = 1u * sizeof(int32_t); 1397 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&element, &num_bytes)); 1398 1399 // Wait for data. 1400 // TODO(vtl): (See corresponding TODO in AllOrNone.) 1401 hss = MojoHandleSignalsState(); 1402 ASSERT_EQ(MOJO_RESULT_OK, 1403 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 1404 MOJO_DEADLINE_INDEFINITE, &hss)); 1405 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 1406 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1407 hss.satisfiable_signals); 1408 1409 // One element available. 1410 num_bytes = 0u; 1411 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1412 ASSERT_EQ(1u * sizeof(int32_t), num_bytes); 1413 1414 // Try "ending" a two-phase read when one isn't active. 1415 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndReadData(1u * sizeof(int32_t))); 1416 1417 // Still one element available. 1418 num_bytes = 0u; 1419 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1420 ASSERT_EQ(1u * sizeof(int32_t), num_bytes); 1421 1422 // Try ending a two-phase read with an invalid amount (too much). 1423 num_bytes = 0u; 1424 const void* read_ptr = nullptr; 1425 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes)); 1426 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, 1427 EndReadData(num_bytes + static_cast<uint32_t>(sizeof(int32_t)))); 1428 1429 // Still one element available. 1430 num_bytes = 0u; 1431 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1432 ASSERT_EQ(1u * sizeof(int32_t), num_bytes); 1433 1434 // Try ending a two-phase read with an invalid amount (not a multiple of the 1435 // element size). 1436 num_bytes = 0u; 1437 read_ptr = nullptr; 1438 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes)); 1439 ASSERT_EQ(1u * sizeof(int32_t), num_bytes); 1440 ASSERT_EQ(123, static_cast<const int32_t*>(read_ptr)[0]); 1441 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndReadData(1u)); 1442 1443 // Still one element available. 1444 num_bytes = 0u; 1445 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes)); 1446 ASSERT_EQ(1u * sizeof(int32_t), num_bytes); 1447 } 1448 1449 // Test that a producer can be sent over a MP. 1450 TEST_F(DataPipeTest, SendProducer) { 1451 const char kTestData[] = "hello world"; 1452 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData)); 1453 1454 const MojoCreateDataPipeOptions options = { 1455 kSizeOfOptions, // |struct_size|. 1456 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1457 1u, // |element_num_bytes|. 1458 1000u // |capacity_num_bytes|. 1459 }; 1460 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1461 MojoHandleSignalsState hss; 1462 1463 // Write some data. 1464 uint32_t num_bytes = kTestDataSize; 1465 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes)); 1466 ASSERT_EQ(kTestDataSize, num_bytes); 1467 1468 // Wait for the data. 1469 hss = MojoHandleSignalsState(); 1470 ASSERT_EQ(MOJO_RESULT_OK, 1471 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 1472 MOJO_DEADLINE_INDEFINITE, &hss)); 1473 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 1474 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1475 hss.satisfiable_signals); 1476 1477 // Check the data. 1478 const void* read_buffer = nullptr; 1479 num_bytes = 0u; 1480 ASSERT_EQ(MOJO_RESULT_OK, 1481 BeginReadData(&read_buffer, &num_bytes, false)); 1482 ASSERT_EQ(0, memcmp(read_buffer, kTestData, kTestDataSize)); 1483 EndReadData(num_bytes); 1484 1485 // Now send the producer over a MP so that it's serialized. 1486 MojoHandle pipe0, pipe1; 1487 ASSERT_EQ(MOJO_RESULT_OK, 1488 MojoCreateMessagePipe(nullptr, &pipe0, &pipe1)); 1489 1490 ASSERT_EQ(MOJO_RESULT_OK, 1491 MojoWriteMessage(pipe0, nullptr, 0, &producer_, 1, 1492 MOJO_WRITE_MESSAGE_FLAG_NONE)); 1493 producer_ = MOJO_HANDLE_INVALID; 1494 ASSERT_EQ(MOJO_RESULT_OK, MojoWait(pipe1, MOJO_HANDLE_SIGNAL_READABLE, 1495 MOJO_DEADLINE_INDEFINITE, &hss)); 1496 uint32_t num_handles = 1; 1497 ASSERT_EQ(MOJO_RESULT_OK, 1498 MojoReadMessage(pipe1, nullptr, 0, &producer_, &num_handles, 1499 MOJO_READ_MESSAGE_FLAG_NONE)); 1500 ASSERT_EQ(num_handles, 1u); 1501 1502 // Write more data. 1503 const char kExtraData[] = "bye world"; 1504 const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData)); 1505 num_bytes = kExtraDataSize; 1506 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes)); 1507 ASSERT_EQ(kExtraDataSize, num_bytes); 1508 1509 // Wait for it. 1510 hss = MojoHandleSignalsState(); 1511 ASSERT_EQ(MOJO_RESULT_OK, 1512 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 1513 MOJO_DEADLINE_INDEFINITE, &hss)); 1514 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 1515 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1516 hss.satisfiable_signals); 1517 1518 // Check the second write. 1519 num_bytes = 0u; 1520 ASSERT_EQ(MOJO_RESULT_OK, 1521 BeginReadData(&read_buffer, &num_bytes, false)); 1522 ASSERT_EQ(0, memcmp(read_buffer, kExtraData, kExtraDataSize)); 1523 EndReadData(num_bytes); 1524 1525 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0)); 1526 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1)); 1527 } 1528 1529 // Ensures that if a data pipe consumer whose producer has closed is passed over 1530 // a message pipe, the deserialized dispatcher is also marked as having a closed 1531 // peer. 1532 TEST_F(DataPipeTest, ConsumerWithClosedProducerSent) { 1533 const MojoCreateDataPipeOptions options = { 1534 kSizeOfOptions, // |struct_size|. 1535 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1536 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. 1537 1000 * sizeof(int32_t) // |capacity_num_bytes|. 1538 }; 1539 1540 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1541 1542 // We can write to a data pipe handle immediately. 1543 int32_t data = 123; 1544 uint32_t num_bytes = sizeof(data); 1545 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&data, &num_bytes)); 1546 ASSERT_EQ(MOJO_RESULT_OK, CloseProducer()); 1547 1548 // Now wait for the other side to become readable and to see the peer closed. 1549 MojoHandleSignalsState state; 1550 ASSERT_EQ(MOJO_RESULT_OK, 1551 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1552 MOJO_DEADLINE_INDEFINITE, &state)); 1553 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1554 state.satisfied_signals); 1555 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1556 state.satisfiable_signals); 1557 1558 // Now send the consumer over a MP so that it's serialized. 1559 MojoHandle pipe0, pipe1; 1560 ASSERT_EQ(MOJO_RESULT_OK, 1561 MojoCreateMessagePipe(nullptr, &pipe0, &pipe1)); 1562 1563 ASSERT_EQ(MOJO_RESULT_OK, 1564 MojoWriteMessage(pipe0, nullptr, 0, &consumer_, 1, 1565 MOJO_WRITE_MESSAGE_FLAG_NONE)); 1566 consumer_ = MOJO_HANDLE_INVALID; 1567 ASSERT_EQ(MOJO_RESULT_OK, MojoWait(pipe1, MOJO_HANDLE_SIGNAL_READABLE, 1568 MOJO_DEADLINE_INDEFINITE, &state)); 1569 uint32_t num_handles = 1; 1570 ASSERT_EQ(MOJO_RESULT_OK, 1571 MojoReadMessage(pipe1, nullptr, 0, &consumer_, &num_handles, 1572 MOJO_READ_MESSAGE_FLAG_NONE)); 1573 ASSERT_EQ(num_handles, 1u); 1574 1575 ASSERT_EQ(MOJO_RESULT_OK, 1576 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1577 MOJO_DEADLINE_INDEFINITE, &state)); 1578 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1579 state.satisfied_signals); 1580 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1581 state.satisfiable_signals); 1582 1583 int32_t read_data; 1584 ASSERT_EQ(MOJO_RESULT_OK, ReadData(&read_data, &num_bytes)); 1585 ASSERT_EQ(sizeof(read_data), num_bytes); 1586 ASSERT_EQ(data, read_data); 1587 1588 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0)); 1589 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1)); 1590 } 1591 1592 bool WriteAllData(MojoHandle producer, 1593 const void* elements, 1594 uint32_t num_bytes) { 1595 for (size_t i = 0; i < kMaxPoll; i++) { 1596 // Write as much data as we can. 1597 uint32_t write_bytes = num_bytes; 1598 MojoResult result = MojoWriteData(producer, elements, &write_bytes, 1599 MOJO_WRITE_DATA_FLAG_NONE); 1600 if (result == MOJO_RESULT_OK) { 1601 num_bytes -= write_bytes; 1602 elements = static_cast<const uint8_t*>(elements) + write_bytes; 1603 if (num_bytes == 0) 1604 return true; 1605 } else { 1606 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result); 1607 } 1608 1609 MojoHandleSignalsState hss = MojoHandleSignalsState(); 1610 EXPECT_EQ(MOJO_RESULT_OK, MojoWait(producer, MOJO_HANDLE_SIGNAL_WRITABLE, 1611 MOJO_DEADLINE_INDEFINITE, &hss)); 1612 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); 1613 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1614 hss.satisfiable_signals); 1615 } 1616 1617 return false; 1618 } 1619 1620 // If |expect_empty| is true, expect |consumer| to be empty after reading. 1621 bool ReadAllData(MojoHandle consumer, 1622 void* elements, 1623 uint32_t num_bytes, 1624 bool expect_empty) { 1625 for (size_t i = 0; i < kMaxPoll; i++) { 1626 // Read as much data as we can. 1627 uint32_t read_bytes = num_bytes; 1628 MojoResult result = 1629 MojoReadData(consumer, elements, &read_bytes, MOJO_READ_DATA_FLAG_NONE); 1630 if (result == MOJO_RESULT_OK) { 1631 num_bytes -= read_bytes; 1632 elements = static_cast<uint8_t*>(elements) + read_bytes; 1633 if (num_bytes == 0) { 1634 if (expect_empty) { 1635 // Expect no more data. 1636 test::Sleep(test::TinyDeadline()); 1637 MojoReadData(consumer, nullptr, &num_bytes, 1638 MOJO_READ_DATA_FLAG_QUERY); 1639 EXPECT_EQ(0u, num_bytes); 1640 } 1641 return true; 1642 } 1643 } else { 1644 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result); 1645 } 1646 1647 MojoHandleSignalsState hss = MojoHandleSignalsState(); 1648 EXPECT_EQ(MOJO_RESULT_OK, MojoWait(consumer, MOJO_HANDLE_SIGNAL_READABLE, 1649 MOJO_DEADLINE_INDEFINITE, &hss)); 1650 // Peer could have become closed while we're still waiting for data. 1651 EXPECT_TRUE(MOJO_HANDLE_SIGNAL_READABLE & hss.satisfied_signals); 1652 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, 1653 hss.satisfiable_signals); 1654 } 1655 1656 return num_bytes == 0; 1657 } 1658 1659 #if !defined(OS_IOS) 1660 1661 TEST_F(DataPipeTest, Multiprocess) { 1662 const uint32_t kTestDataSize = 1663 static_cast<uint32_t>(sizeof(kMultiprocessTestData)); 1664 const MojoCreateDataPipeOptions options = { 1665 kSizeOfOptions, // |struct_size|. 1666 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1667 1, // |element_num_bytes|. 1668 kMultiprocessCapacity // |capacity_num_bytes|. 1669 }; 1670 ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); 1671 1672 RUN_CHILD_ON_PIPE(MultiprocessClient, server_mp) 1673 // Send some data before serialising and sending the data pipe over. 1674 // This is the first write so we don't need to use WriteAllData. 1675 uint32_t num_bytes = kTestDataSize; 1676 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kMultiprocessTestData, &num_bytes, 1677 MOJO_WRITE_DATA_FLAG_ALL_OR_NONE)); 1678 ASSERT_EQ(kTestDataSize, num_bytes); 1679 1680 // Send child process the data pipe. 1681 ASSERT_EQ(MOJO_RESULT_OK, 1682 MojoWriteMessage(server_mp, nullptr, 0, &consumer_, 1, 1683 MOJO_WRITE_MESSAGE_FLAG_NONE)); 1684 1685 // Send a bunch of data of varying sizes. 1686 uint8_t buffer[100]; 1687 int seq = 0; 1688 for (int i = 0; i < kMultiprocessMaxIter; ++i) { 1689 for (uint32_t size = 1; size <= kMultiprocessCapacity; size++) { 1690 for (unsigned int j = 0; j < size; ++j) 1691 buffer[j] = seq + j; 1692 EXPECT_TRUE(WriteAllData(producer_, buffer, size)); 1693 seq += size; 1694 } 1695 } 1696 1697 // Write the test string in again. 1698 ASSERT_TRUE(WriteAllData(producer_, kMultiprocessTestData, kTestDataSize)); 1699 1700 // Swap ends. 1701 ASSERT_EQ(MOJO_RESULT_OK, 1702 MojoWriteMessage(server_mp, nullptr, 0, &producer_, 1, 1703 MOJO_WRITE_MESSAGE_FLAG_NONE)); 1704 1705 // Receive the consumer from the other side. 1706 producer_ = MOJO_HANDLE_INVALID; 1707 MojoHandleSignalsState hss = MojoHandleSignalsState(); 1708 ASSERT_EQ(MOJO_RESULT_OK, MojoWait(server_mp, MOJO_HANDLE_SIGNAL_READABLE, 1709 MOJO_DEADLINE_INDEFINITE, &hss)); 1710 MojoHandle handles[2]; 1711 uint32_t num_handles = arraysize(handles); 1712 ASSERT_EQ(MOJO_RESULT_OK, 1713 MojoReadMessage(server_mp, nullptr, 0, handles, &num_handles, 1714 MOJO_READ_MESSAGE_FLAG_NONE)); 1715 ASSERT_EQ(1u, num_handles); 1716 consumer_ = handles[0]; 1717 1718 // Read the test string twice. Once for when we sent it, and once for the 1719 // other end sending it. 1720 for (int i = 0; i < 2; ++i) { 1721 EXPECT_TRUE(ReadAllData(consumer_, buffer, kTestDataSize, i == 1)); 1722 EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize)); 1723 } 1724 1725 WriteMessage(server_mp, "quit"); 1726 1727 // Don't have to close the consumer here because it will be done for us. 1728 END_CHILD() 1729 } 1730 1731 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessClient, DataPipeTest, client_mp) { 1732 const uint32_t kTestDataSize = 1733 static_cast<uint32_t>(sizeof(kMultiprocessTestData)); 1734 1735 // Receive the data pipe from the other side. 1736 MojoHandle consumer = MOJO_HANDLE_INVALID; 1737 MojoHandleSignalsState hss = MojoHandleSignalsState(); 1738 ASSERT_EQ(MOJO_RESULT_OK, MojoWait(client_mp, MOJO_HANDLE_SIGNAL_READABLE, 1739 MOJO_DEADLINE_INDEFINITE, &hss)); 1740 MojoHandle handles[2]; 1741 uint32_t num_handles = arraysize(handles); 1742 ASSERT_EQ(MOJO_RESULT_OK, 1743 MojoReadMessage(client_mp, nullptr, 0, handles, &num_handles, 1744 MOJO_READ_MESSAGE_FLAG_NONE)); 1745 ASSERT_EQ(1u, num_handles); 1746 consumer = handles[0]; 1747 1748 // Read the initial string that was sent. 1749 int32_t buffer[100]; 1750 EXPECT_TRUE(ReadAllData(consumer, buffer, kTestDataSize, false)); 1751 EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize)); 1752 1753 // Receive the main data and check it is correct. 1754 int seq = 0; 1755 uint8_t expected_buffer[100]; 1756 for (int i = 0; i < kMultiprocessMaxIter; ++i) { 1757 for (uint32_t size = 1; size <= kMultiprocessCapacity; ++size) { 1758 for (unsigned int j = 0; j < size; ++j) 1759 expected_buffer[j] = seq + j; 1760 EXPECT_TRUE(ReadAllData(consumer, buffer, size, false)); 1761 EXPECT_EQ(0, memcmp(buffer, expected_buffer, size)); 1762 1763 seq += size; 1764 } 1765 } 1766 1767 // Swap ends. 1768 ASSERT_EQ(MOJO_RESULT_OK, MojoWriteMessage(client_mp, nullptr, 0, &consumer, 1769 1, MOJO_WRITE_MESSAGE_FLAG_NONE)); 1770 1771 // Receive the producer from the other side. 1772 MojoHandle producer = MOJO_HANDLE_INVALID; 1773 hss = MojoHandleSignalsState(); 1774 ASSERT_EQ(MOJO_RESULT_OK, MojoWait(client_mp, MOJO_HANDLE_SIGNAL_READABLE, 1775 MOJO_DEADLINE_INDEFINITE, &hss)); 1776 num_handles = arraysize(handles); 1777 ASSERT_EQ(MOJO_RESULT_OK, 1778 MojoReadMessage(client_mp, nullptr, 0, handles, &num_handles, 1779 MOJO_READ_MESSAGE_FLAG_NONE)); 1780 ASSERT_EQ(1u, num_handles); 1781 producer = handles[0]; 1782 1783 // Write the test string one more time. 1784 EXPECT_TRUE(WriteAllData(producer, kMultiprocessTestData, kTestDataSize)); 1785 1786 // We swapped ends, so close the producer. 1787 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(producer)); 1788 1789 // Wait to receive a "quit" message before exiting. 1790 EXPECT_EQ("quit", ReadMessage(client_mp)); 1791 } 1792 1793 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(WriteAndCloseProducer, DataPipeTest, h) { 1794 MojoHandle p; 1795 std::string message = ReadMessageWithHandles(h, &p, 1); 1796 1797 // Write some data to the producer and close it. 1798 uint32_t num_bytes = static_cast<uint32_t>(message.size()); 1799 EXPECT_EQ(MOJO_RESULT_OK, MojoWriteData(p, message.data(), &num_bytes, 1800 MOJO_WRITE_DATA_FLAG_NONE)); 1801 EXPECT_EQ(num_bytes, static_cast<uint32_t>(message.size())); 1802 1803 // Close the producer before quitting. 1804 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p)); 1805 1806 // Wait for a quit message. 1807 EXPECT_EQ("quit", ReadMessage(h)); 1808 } 1809 1810 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReadAndCloseConsumer, DataPipeTest, h) { 1811 MojoHandle c; 1812 std::string expected_message = ReadMessageWithHandles(h, &c, 1); 1813 1814 // Wait for the consumer to become readable. 1815 EXPECT_EQ(MOJO_RESULT_OK, MojoWait(c, MOJO_HANDLE_SIGNAL_READABLE, 1816 MOJO_DEADLINE_INDEFINITE, nullptr)); 1817 1818 // Drain the consumer and expect to find the given message. 1819 uint32_t num_bytes = static_cast<uint32_t>(expected_message.size()); 1820 std::vector<char> bytes(expected_message.size()); 1821 EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(c, bytes.data(), &num_bytes, 1822 MOJO_READ_DATA_FLAG_NONE)); 1823 EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size())); 1824 1825 std::string message(bytes.data(), bytes.size()); 1826 EXPECT_EQ(expected_message, message); 1827 1828 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c)); 1829 1830 // Wait for a quit message. 1831 EXPECT_EQ("quit", ReadMessage(h)); 1832 } 1833 1834 TEST_F(DataPipeTest, SendConsumerAndCloseProducer) { 1835 // Create a new data pipe. 1836 MojoHandle p, c; 1837 EXPECT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(nullptr, &p ,&c)); 1838 1839 RUN_CHILD_ON_PIPE(WriteAndCloseProducer, producer_client) 1840 RUN_CHILD_ON_PIPE(ReadAndCloseConsumer, consumer_client) 1841 const std::string kMessage = "Hello, world!"; 1842 WriteMessageWithHandles(producer_client, kMessage, &p, 1); 1843 WriteMessageWithHandles(consumer_client, kMessage, &c, 1); 1844 1845 WriteMessage(consumer_client, "quit"); 1846 END_CHILD() 1847 1848 WriteMessage(producer_client, "quit"); 1849 END_CHILD() 1850 } 1851 1852 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndWrite, DataPipeTest, h) { 1853 const MojoCreateDataPipeOptions options = { 1854 kSizeOfOptions, // |struct_size|. 1855 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1856 1, // |element_num_bytes|. 1857 kMultiprocessCapacity // |capacity_num_bytes|. 1858 }; 1859 1860 MojoHandle p, c; 1861 ASSERT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(&options, &p, &c)); 1862 1863 const std::string kMessage = "Hello, world!"; 1864 WriteMessageWithHandles(h, kMessage, &c, 1); 1865 1866 // Write some data to the producer and close it. 1867 uint32_t num_bytes = static_cast<uint32_t>(kMessage.size()); 1868 EXPECT_EQ(MOJO_RESULT_OK, MojoWriteData(p, kMessage.data(), &num_bytes, 1869 MOJO_WRITE_DATA_FLAG_NONE)); 1870 EXPECT_EQ(num_bytes, static_cast<uint32_t>(kMessage.size())); 1871 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p)); 1872 1873 // Wait for a quit message. 1874 EXPECT_EQ("quit", ReadMessage(h)); 1875 } 1876 1877 TEST_F(DataPipeTest, CreateInChild) { 1878 RUN_CHILD_ON_PIPE(CreateAndWrite, child) 1879 MojoHandle c; 1880 std::string expected_message = ReadMessageWithHandles(child, &c, 1); 1881 1882 // Wait for the consumer to become readable. 1883 EXPECT_EQ(MOJO_RESULT_OK, MojoWait(c, MOJO_HANDLE_SIGNAL_READABLE, 1884 MOJO_DEADLINE_INDEFINITE, nullptr)); 1885 1886 // Drain the consumer and expect to find the given message. 1887 uint32_t num_bytes = static_cast<uint32_t>(expected_message.size()); 1888 std::vector<char> bytes(expected_message.size()); 1889 EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(c, bytes.data(), &num_bytes, 1890 MOJO_READ_DATA_FLAG_NONE)); 1891 EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size())); 1892 1893 std::string message(bytes.data(), bytes.size()); 1894 EXPECT_EQ(expected_message, message); 1895 1896 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c)); 1897 WriteMessage(child, "quit"); 1898 END_CHILD() 1899 } 1900 1901 #endif // !defined(OS_IOS) 1902 1903 } // namespace 1904 } // namespace edk 1905 } // namespace mojo 1906