1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 // NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a 6 // heavily-loaded system). Sorry. |test::EpsilonTimeout()| may be increased to 7 // increase tolerance and reduce observed flakiness (though doing so reduces the 8 // meaningfulness of the test). 9 10 #include "mojo/system/message_pipe_dispatcher.h" 11 12 #include <string.h> 13 14 #include <limits> 15 16 #include "base/memory/ref_counted.h" 17 #include "base/memory/scoped_vector.h" 18 #include "base/rand_util.h" 19 #include "base/threading/platform_thread.h" // For |Sleep()|. 20 #include "base/threading/simple_thread.h" 21 #include "base/time/time.h" 22 #include "mojo/system/message_pipe.h" 23 #include "mojo/system/test_utils.h" 24 #include "mojo/system/waiter.h" 25 #include "mojo/system/waiter_test_utils.h" 26 #include "testing/gtest/include/gtest/gtest.h" 27 28 namespace mojo { 29 namespace system { 30 namespace { 31 32 TEST(MessagePipeDispatcherTest, Basic) { 33 test::Stopwatch stopwatch; 34 int32_t buffer[1]; 35 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); 36 uint32_t buffer_size; 37 38 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. 39 for (unsigned i = 0; i < 2; i++) { 40 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( 41 MessagePipeDispatcher::kDefaultCreateOptions)); 42 EXPECT_EQ(Dispatcher::kTypeMessagePipe, d0->GetType()); 43 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( 44 MessagePipeDispatcher::kDefaultCreateOptions)); 45 { 46 scoped_refptr<MessagePipe> mp(new MessagePipe()); 47 d0->Init(mp, i); // 0, 1. 48 d1->Init(mp, i ^ 1); // 1, 0. 49 } 50 Waiter w; 51 uint32_t context = 0; 52 53 // Try adding a writable waiter when already writable. 54 w.Init(); 55 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 56 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0)); 57 // Shouldn't need to remove the waiter (it was not added). 58 59 // Add a readable waiter to |d0|, then make it readable (by writing to 60 // |d1|), then wait. 61 w.Init(); 62 EXPECT_EQ(MOJO_RESULT_OK, 63 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1)); 64 buffer[0] = 123456789; 65 EXPECT_EQ(MOJO_RESULT_OK, 66 d1->WriteMessage(buffer, kBufferSize, 67 NULL, 68 MOJO_WRITE_MESSAGE_FLAG_NONE)); 69 stopwatch.Start(); 70 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 71 EXPECT_EQ(1u, context); 72 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); 73 d0->RemoveWaiter(&w); 74 75 // Try adding a readable waiter when already readable (from above). 76 w.Init(); 77 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 78 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2)); 79 // Shouldn't need to remove the waiter (it was not added). 80 81 // Make |d0| no longer readable (by reading from it). 82 buffer[0] = 0; 83 buffer_size = kBufferSize; 84 EXPECT_EQ(MOJO_RESULT_OK, 85 d0->ReadMessage(buffer, &buffer_size, 86 0, NULL, 87 MOJO_READ_MESSAGE_FLAG_NONE)); 88 EXPECT_EQ(kBufferSize, buffer_size); 89 EXPECT_EQ(123456789, buffer[0]); 90 91 // Wait for zero time for readability on |d0| (will time out). 92 w.Init(); 93 EXPECT_EQ(MOJO_RESULT_OK, 94 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3)); 95 stopwatch.Start(); 96 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, NULL)); 97 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); 98 d0->RemoveWaiter(&w); 99 100 // Wait for non-zero, finite time for readability on |d0| (will time out). 101 w.Init(); 102 EXPECT_EQ(MOJO_RESULT_OK, 103 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3)); 104 stopwatch.Start(); 105 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, 106 w.Wait(2 * test::EpsilonTimeout().InMicroseconds(), NULL)); 107 base::TimeDelta elapsed = stopwatch.Elapsed(); 108 EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout()); 109 EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout()); 110 d0->RemoveWaiter(&w); 111 112 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); 113 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); 114 } 115 } 116 117 TEST(MessagePipeDispatcherTest, InvalidParams) { 118 char buffer[1]; 119 120 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( 121 MessagePipeDispatcher::kDefaultCreateOptions)); 122 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( 123 MessagePipeDispatcher::kDefaultCreateOptions)); 124 { 125 scoped_refptr<MessagePipe> mp(new MessagePipe()); 126 d0->Init(mp, 0); 127 d1->Init(mp, 1); 128 } 129 130 // |WriteMessage|: 131 // Null buffer with nonzero buffer size. 132 EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, 133 d0->WriteMessage(NULL, 1, 134 NULL, 135 MOJO_WRITE_MESSAGE_FLAG_NONE)); 136 // Huge buffer size. 137 EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, 138 d0->WriteMessage(buffer, std::numeric_limits<uint32_t>::max(), 139 NULL, 140 MOJO_WRITE_MESSAGE_FLAG_NONE)); 141 142 // |ReadMessage|: 143 // Null buffer with nonzero buffer size. 144 uint32_t buffer_size = 1; 145 EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, 146 d0->ReadMessage(NULL, &buffer_size, 147 0, NULL, 148 MOJO_READ_MESSAGE_FLAG_NONE)); 149 150 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); 151 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); 152 } 153 154 // Test what happens when one end is closed (single-threaded test). 155 TEST(MessagePipeDispatcherTest, BasicClosed) { 156 int32_t buffer[1]; 157 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); 158 uint32_t buffer_size; 159 160 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. 161 for (unsigned i = 0; i < 2; i++) { 162 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( 163 MessagePipeDispatcher::kDefaultCreateOptions)); 164 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( 165 MessagePipeDispatcher::kDefaultCreateOptions)); 166 { 167 scoped_refptr<MessagePipe> mp(new MessagePipe()); 168 d0->Init(mp, i); // 0, 1. 169 d1->Init(mp, i ^ 1); // 1, 0. 170 } 171 Waiter w; 172 173 // Write (twice) to |d1|. 174 buffer[0] = 123456789; 175 EXPECT_EQ(MOJO_RESULT_OK, 176 d1->WriteMessage(buffer, kBufferSize, 177 NULL, 178 MOJO_WRITE_MESSAGE_FLAG_NONE)); 179 buffer[0] = 234567890; 180 EXPECT_EQ(MOJO_RESULT_OK, 181 d1->WriteMessage(buffer, kBufferSize, 182 NULL, 183 MOJO_WRITE_MESSAGE_FLAG_NONE)); 184 185 // Try waiting for readable on |d0|; should fail (already satisfied). 186 w.Init(); 187 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 188 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0)); 189 190 // Try reading from |d1|; should fail (nothing to read). 191 buffer[0] = 0; 192 buffer_size = kBufferSize; 193 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, 194 d1->ReadMessage(buffer, &buffer_size, 195 0, NULL, 196 MOJO_READ_MESSAGE_FLAG_NONE)); 197 198 // Close |d1|. 199 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); 200 201 // Try waiting for readable on |d0|; should fail (already satisfied). 202 w.Init(); 203 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 204 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1)); 205 206 // Read from |d0|. 207 buffer[0] = 0; 208 buffer_size = kBufferSize; 209 EXPECT_EQ(MOJO_RESULT_OK, 210 d0->ReadMessage(buffer, &buffer_size, 211 0, NULL, 212 MOJO_READ_MESSAGE_FLAG_NONE)); 213 EXPECT_EQ(kBufferSize, buffer_size); 214 EXPECT_EQ(123456789, buffer[0]); 215 216 // Try waiting for readable on |d0|; should fail (already satisfied). 217 w.Init(); 218 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 219 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2)); 220 221 // Read again from |d0|. 222 buffer[0] = 0; 223 buffer_size = kBufferSize; 224 EXPECT_EQ(MOJO_RESULT_OK, 225 d0->ReadMessage(buffer, &buffer_size, 226 0, NULL, 227 MOJO_READ_MESSAGE_FLAG_NONE)); 228 EXPECT_EQ(kBufferSize, buffer_size); 229 EXPECT_EQ(234567890, buffer[0]); 230 231 // Try waiting for readable on |d0|; should fail (unsatisfiable). 232 w.Init(); 233 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 234 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3)); 235 236 // Try waiting for writable on |d0|; should fail (unsatisfiable). 237 w.Init(); 238 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 239 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 4)); 240 241 // Try reading from |d0|; should fail (nothing to read and other end 242 // closed). 243 buffer[0] = 0; 244 buffer_size = kBufferSize; 245 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 246 d0->ReadMessage(buffer, &buffer_size, 247 0, NULL, 248 MOJO_READ_MESSAGE_FLAG_NONE)); 249 250 // Try writing to |d0|; should fail (other end closed). 251 buffer[0] = 345678901; 252 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 253 d0->WriteMessage(buffer, kBufferSize, 254 NULL, 255 MOJO_WRITE_MESSAGE_FLAG_NONE)); 256 257 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); 258 } 259 } 260 261 TEST(MessagePipeDispatcherTest, BasicThreaded) { 262 test::Stopwatch stopwatch; 263 int32_t buffer[1]; 264 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); 265 uint32_t buffer_size; 266 base::TimeDelta elapsed; 267 bool did_wait; 268 MojoResult result; 269 uint32_t context; 270 271 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. 272 for (unsigned i = 0; i < 2; i++) { 273 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( 274 MessagePipeDispatcher::kDefaultCreateOptions)); 275 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( 276 MessagePipeDispatcher::kDefaultCreateOptions)); 277 { 278 scoped_refptr<MessagePipe> mp(new MessagePipe()); 279 d0->Init(mp, i); // 0, 1. 280 d1->Init(mp, i ^ 1); // 1, 0. 281 } 282 283 // Wait for readable on |d1|, which will become readable after some time. 284 { 285 test::WaiterThread thread(d1, 286 MOJO_HANDLE_SIGNAL_READABLE, 287 MOJO_DEADLINE_INDEFINITE, 288 1, 289 &did_wait, &result, &context); 290 stopwatch.Start(); 291 thread.Start(); 292 base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); 293 // Wake it up by writing to |d0|. 294 buffer[0] = 123456789; 295 EXPECT_EQ(MOJO_RESULT_OK, 296 d0->WriteMessage(buffer, kBufferSize, 297 NULL, 298 MOJO_WRITE_MESSAGE_FLAG_NONE)); 299 } // Joins the thread. 300 elapsed = stopwatch.Elapsed(); 301 EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout()); 302 EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout()); 303 EXPECT_TRUE(did_wait); 304 EXPECT_EQ(MOJO_RESULT_OK, result); 305 EXPECT_EQ(1u, context); 306 307 // Now |d1| is already readable. Try waiting for it again. 308 { 309 test::WaiterThread thread(d1, 310 MOJO_HANDLE_SIGNAL_READABLE, 311 MOJO_DEADLINE_INDEFINITE, 312 2, 313 &did_wait, &result, &context); 314 stopwatch.Start(); 315 thread.Start(); 316 } // Joins the thread. 317 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); 318 EXPECT_FALSE(did_wait); 319 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result); 320 321 // Consume what we wrote to |d0|. 322 buffer[0] = 0; 323 buffer_size = kBufferSize; 324 EXPECT_EQ(MOJO_RESULT_OK, 325 d1->ReadMessage(buffer, &buffer_size, 326 0, NULL, 327 MOJO_READ_MESSAGE_FLAG_NONE)); 328 EXPECT_EQ(kBufferSize, buffer_size); 329 EXPECT_EQ(123456789, buffer[0]); 330 331 // Wait for readable on |d1| and close |d0| after some time, which should 332 // cancel that wait. 333 { 334 test::WaiterThread thread(d1, 335 MOJO_HANDLE_SIGNAL_READABLE, 336 MOJO_DEADLINE_INDEFINITE, 337 3, 338 &did_wait, &result, &context); 339 stopwatch.Start(); 340 thread.Start(); 341 base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); 342 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); 343 } // Joins the thread. 344 elapsed = stopwatch.Elapsed(); 345 EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout()); 346 EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout()); 347 EXPECT_TRUE(did_wait); 348 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); 349 EXPECT_EQ(3u, context); 350 351 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); 352 } 353 354 for (unsigned i = 0; i < 2; i++) { 355 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( 356 MessagePipeDispatcher::kDefaultCreateOptions)); 357 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( 358 MessagePipeDispatcher::kDefaultCreateOptions)); 359 { 360 scoped_refptr<MessagePipe> mp(new MessagePipe()); 361 d0->Init(mp, i); // 0, 1. 362 d1->Init(mp, i ^ 1); // 1, 0. 363 } 364 365 // Wait for readable on |d1| and close |d1| after some time, which should 366 // cancel that wait. 367 { 368 test::WaiterThread thread(d1, 369 MOJO_HANDLE_SIGNAL_READABLE, 370 MOJO_DEADLINE_INDEFINITE, 371 4, 372 &did_wait, &result, &context); 373 stopwatch.Start(); 374 thread.Start(); 375 base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); 376 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); 377 } // Joins the thread. 378 elapsed = stopwatch.Elapsed(); 379 EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout()); 380 EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout()); 381 EXPECT_TRUE(did_wait); 382 EXPECT_EQ(MOJO_RESULT_CANCELLED, result); 383 EXPECT_EQ(4u, context); 384 385 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); 386 } 387 } 388 389 // Stress test ----------------------------------------------------------------- 390 391 const size_t kMaxMessageSize = 2000; 392 393 class WriterThread : public base::SimpleThread { 394 public: 395 // |*messages_written| and |*bytes_written| belong to the thread while it's 396 // alive. 397 WriterThread(scoped_refptr<Dispatcher> write_dispatcher, 398 size_t* messages_written, size_t* bytes_written) 399 : base::SimpleThread("writer_thread"), 400 write_dispatcher_(write_dispatcher), 401 messages_written_(messages_written), 402 bytes_written_(bytes_written) { 403 *messages_written_ = 0; 404 *bytes_written_ = 0; 405 } 406 407 virtual ~WriterThread() { 408 Join(); 409 } 410 411 private: 412 virtual void Run() OVERRIDE { 413 // Make some data to write. 414 unsigned char buffer[kMaxMessageSize]; 415 for (size_t i = 0; i < kMaxMessageSize; i++) 416 buffer[i] = static_cast<unsigned char>(i); 417 418 // Number of messages to write. 419 *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000)); 420 421 // Write messages. 422 for (size_t i = 0; i < *messages_written_; i++) { 423 uint32_t bytes_to_write = static_cast<uint32_t>( 424 base::RandInt(1, static_cast<int>(kMaxMessageSize))); 425 EXPECT_EQ(MOJO_RESULT_OK, 426 write_dispatcher_->WriteMessage(buffer, bytes_to_write, 427 NULL, 428 MOJO_WRITE_MESSAGE_FLAG_NONE)); 429 *bytes_written_ += bytes_to_write; 430 } 431 432 // Write one last "quit" message. 433 EXPECT_EQ(MOJO_RESULT_OK, 434 write_dispatcher_->WriteMessage("quit", 4, 435 NULL, 436 MOJO_WRITE_MESSAGE_FLAG_NONE)); 437 } 438 439 const scoped_refptr<Dispatcher> write_dispatcher_; 440 size_t* const messages_written_; 441 size_t* const bytes_written_; 442 443 DISALLOW_COPY_AND_ASSIGN(WriterThread); 444 }; 445 446 class ReaderThread : public base::SimpleThread { 447 public: 448 // |*messages_read| and |*bytes_read| belong to the thread while it's alive. 449 ReaderThread(scoped_refptr<Dispatcher> read_dispatcher, 450 size_t* messages_read, size_t* bytes_read) 451 : base::SimpleThread("reader_thread"), 452 read_dispatcher_(read_dispatcher), 453 messages_read_(messages_read), 454 bytes_read_(bytes_read) { 455 *messages_read_ = 0; 456 *bytes_read_ = 0; 457 } 458 459 virtual ~ReaderThread() { 460 Join(); 461 } 462 463 private: 464 virtual void Run() OVERRIDE { 465 unsigned char buffer[kMaxMessageSize]; 466 MojoResult result; 467 Waiter w; 468 469 // Read messages. 470 for (;;) { 471 // Wait for it to be readable. 472 w.Init(); 473 result = read_dispatcher_->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0); 474 EXPECT_TRUE(result == MOJO_RESULT_OK || 475 result == MOJO_RESULT_ALREADY_EXISTS) << "result: " << result; 476 if (result == MOJO_RESULT_OK) { 477 // Actually need to wait. 478 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, NULL)); 479 read_dispatcher_->RemoveWaiter(&w); 480 } 481 482 // Now, try to do the read. 483 // Clear the buffer so that we can check the result. 484 memset(buffer, 0, sizeof(buffer)); 485 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer)); 486 result = read_dispatcher_->ReadMessage(buffer, &buffer_size, 487 0, NULL, 488 MOJO_READ_MESSAGE_FLAG_NONE); 489 EXPECT_TRUE(result == MOJO_RESULT_OK || 490 result == MOJO_RESULT_SHOULD_WAIT) << "result: " << result; 491 // We're racing with others to read, so maybe we failed. 492 if (result == MOJO_RESULT_SHOULD_WAIT) 493 continue; // In which case, try again. 494 // Check for quit. 495 if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0) 496 return; 497 EXPECT_GE(buffer_size, 1u); 498 EXPECT_LE(buffer_size, kMaxMessageSize); 499 EXPECT_TRUE(IsValidMessage(buffer, buffer_size)); 500 501 (*messages_read_)++; 502 *bytes_read_ += buffer_size; 503 } 504 } 505 506 static bool IsValidMessage(const unsigned char* buffer, 507 uint32_t message_size) { 508 size_t i; 509 for (i = 0; i < message_size; i++) { 510 if (buffer[i] != static_cast<unsigned char>(i)) 511 return false; 512 } 513 // Check that the remaining bytes weren't stomped on. 514 for (; i < kMaxMessageSize; i++) { 515 if (buffer[i] != 0) 516 return false; 517 } 518 return true; 519 } 520 521 const scoped_refptr<Dispatcher> read_dispatcher_; 522 size_t* const messages_read_; 523 size_t* const bytes_read_; 524 525 DISALLOW_COPY_AND_ASSIGN(ReaderThread); 526 }; 527 528 TEST(MessagePipeDispatcherTest, Stress) { 529 static const size_t kNumWriters = 30; 530 static const size_t kNumReaders = kNumWriters; 531 532 scoped_refptr<MessagePipeDispatcher> d_write(new MessagePipeDispatcher( 533 MessagePipeDispatcher::kDefaultCreateOptions)); 534 scoped_refptr<MessagePipeDispatcher> d_read(new MessagePipeDispatcher( 535 MessagePipeDispatcher::kDefaultCreateOptions)); 536 { 537 scoped_refptr<MessagePipe> mp(new MessagePipe()); 538 d_write->Init(mp, 0); 539 d_read->Init(mp, 1); 540 } 541 542 size_t messages_written[kNumWriters]; 543 size_t bytes_written[kNumWriters]; 544 size_t messages_read[kNumReaders]; 545 size_t bytes_read[kNumReaders]; 546 { 547 // Make writers. 548 ScopedVector<WriterThread> writers; 549 for (size_t i = 0; i < kNumWriters; i++) { 550 writers.push_back( 551 new WriterThread(d_write, &messages_written[i], &bytes_written[i])); 552 } 553 554 // Make readers. 555 ScopedVector<ReaderThread> readers; 556 for (size_t i = 0; i < kNumReaders; i++) { 557 readers.push_back( 558 new ReaderThread(d_read, &messages_read[i], &bytes_read[i])); 559 } 560 561 // Start writers. 562 for (size_t i = 0; i < kNumWriters; i++) 563 writers[i]->Start(); 564 565 // Start readers. 566 for (size_t i = 0; i < kNumReaders; i++) 567 readers[i]->Start(); 568 569 // TODO(vtl): Maybe I should have an event that triggers all the threads to 570 // start doing stuff for real (so that the first ones created/started aren't 571 // advantaged). 572 } // Joins all the threads. 573 574 size_t total_messages_written = 0; 575 size_t total_bytes_written = 0; 576 for (size_t i = 0; i < kNumWriters; i++) { 577 total_messages_written += messages_written[i]; 578 total_bytes_written += bytes_written[i]; 579 } 580 size_t total_messages_read = 0; 581 size_t total_bytes_read = 0; 582 for (size_t i = 0; i < kNumReaders; i++) { 583 total_messages_read += messages_read[i]; 584 total_bytes_read += bytes_read[i]; 585 // We'd have to be really unlucky to have read no messages on a thread. 586 EXPECT_GT(messages_read[i], 0u) << "reader: " << i; 587 EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i; 588 } 589 EXPECT_EQ(total_messages_written, total_messages_read); 590 EXPECT_EQ(total_bytes_written, total_bytes_read); 591 592 EXPECT_EQ(MOJO_RESULT_OK, d_write->Close()); 593 EXPECT_EQ(MOJO_RESULT_OK, d_read->Close()); 594 } 595 596 } // namespace 597 } // namespace system 598 } // namespace mojo 599