Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 // NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a
      6 // heavily-loaded system). Sorry. |test::EpsilonTimeout()| may be increased to
      7 // increase tolerance and reduce observed flakiness (though doing so reduces the
      8 // meaningfulness of the test).
      9 
     10 #include "mojo/system/message_pipe_dispatcher.h"
     11 
     12 #include <string.h>
     13 
     14 #include <limits>
     15 
     16 #include "base/memory/ref_counted.h"
     17 #include "base/memory/scoped_vector.h"
     18 #include "base/rand_util.h"
     19 #include "base/threading/platform_thread.h"  // For |Sleep()|.
     20 #include "base/threading/simple_thread.h"
     21 #include "base/time/time.h"
     22 #include "mojo/system/message_pipe.h"
     23 #include "mojo/system/test_utils.h"
     24 #include "mojo/system/waiter.h"
     25 #include "mojo/system/waiter_test_utils.h"
     26 #include "testing/gtest/include/gtest/gtest.h"
     27 
     28 namespace mojo {
     29 namespace system {
     30 namespace {
     31 
     32 TEST(MessagePipeDispatcherTest, Basic) {
     33   test::Stopwatch stopwatch;
     34   int32_t buffer[1];
     35   const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
     36   uint32_t buffer_size;
     37 
     38   // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
     39   for (unsigned i = 0; i < 2; i++) {
     40     scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
     41         MessagePipeDispatcher::kDefaultCreateOptions));
     42     EXPECT_EQ(Dispatcher::kTypeMessagePipe, d0->GetType());
     43     scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
     44         MessagePipeDispatcher::kDefaultCreateOptions));
     45     {
     46       scoped_refptr<MessagePipe> mp(new MessagePipe());
     47       d0->Init(mp, i);  // 0, 1.
     48       d1->Init(mp, i ^ 1);  // 1, 0.
     49     }
     50     Waiter w;
     51     uint32_t context = 0;
     52 
     53     // Try adding a writable waiter when already writable.
     54     w.Init();
     55     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
     56               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0));
     57     // Shouldn't need to remove the waiter (it was not added).
     58 
     59     // Add a readable waiter to |d0|, then make it readable (by writing to
     60     // |d1|), then wait.
     61     w.Init();
     62     EXPECT_EQ(MOJO_RESULT_OK,
     63               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1));
     64     buffer[0] = 123456789;
     65     EXPECT_EQ(MOJO_RESULT_OK,
     66               d1->WriteMessage(buffer, kBufferSize,
     67                                NULL,
     68                                MOJO_WRITE_MESSAGE_FLAG_NONE));
     69     stopwatch.Start();
     70     EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, &context));
     71     EXPECT_EQ(1u, context);
     72     EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
     73     d0->RemoveWaiter(&w);
     74 
     75     // Try adding a readable waiter when already readable (from above).
     76     w.Init();
     77     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
     78               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2));
     79     // Shouldn't need to remove the waiter (it was not added).
     80 
     81     // Make |d0| no longer readable (by reading from it).
     82     buffer[0] = 0;
     83     buffer_size = kBufferSize;
     84     EXPECT_EQ(MOJO_RESULT_OK,
     85               d0->ReadMessage(buffer, &buffer_size,
     86                               0, NULL,
     87                               MOJO_READ_MESSAGE_FLAG_NONE));
     88     EXPECT_EQ(kBufferSize, buffer_size);
     89     EXPECT_EQ(123456789, buffer[0]);
     90 
     91     // Wait for zero time for readability on |d0| (will time out).
     92     w.Init();
     93     EXPECT_EQ(MOJO_RESULT_OK,
     94               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3));
     95     stopwatch.Start();
     96     EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, NULL));
     97     EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
     98     d0->RemoveWaiter(&w);
     99 
    100     // Wait for non-zero, finite time for readability on |d0| (will time out).
    101     w.Init();
    102     EXPECT_EQ(MOJO_RESULT_OK,
    103               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3));
    104     stopwatch.Start();
    105     EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
    106               w.Wait(2 * test::EpsilonTimeout().InMicroseconds(), NULL));
    107     base::TimeDelta elapsed = stopwatch.Elapsed();
    108     EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout());
    109     EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout());
    110     d0->RemoveWaiter(&w);
    111 
    112     EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
    113     EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
    114   }
    115 }
    116 
    117 TEST(MessagePipeDispatcherTest, InvalidParams) {
    118   char buffer[1];
    119 
    120   scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
    121         MessagePipeDispatcher::kDefaultCreateOptions));
    122   scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
    123         MessagePipeDispatcher::kDefaultCreateOptions));
    124   {
    125     scoped_refptr<MessagePipe> mp(new MessagePipe());
    126     d0->Init(mp, 0);
    127     d1->Init(mp, 1);
    128   }
    129 
    130   // |WriteMessage|:
    131   // Null buffer with nonzero buffer size.
    132   EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
    133             d0->WriteMessage(NULL, 1,
    134                              NULL,
    135                              MOJO_WRITE_MESSAGE_FLAG_NONE));
    136   // Huge buffer size.
    137   EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
    138             d0->WriteMessage(buffer, std::numeric_limits<uint32_t>::max(),
    139                              NULL,
    140                              MOJO_WRITE_MESSAGE_FLAG_NONE));
    141 
    142   // |ReadMessage|:
    143   // Null buffer with nonzero buffer size.
    144   uint32_t buffer_size = 1;
    145   EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
    146             d0->ReadMessage(NULL, &buffer_size,
    147                             0, NULL,
    148                             MOJO_READ_MESSAGE_FLAG_NONE));
    149 
    150   EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
    151   EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
    152 }
    153 
    154 // Test what happens when one end is closed (single-threaded test).
    155 TEST(MessagePipeDispatcherTest, BasicClosed) {
    156   int32_t buffer[1];
    157   const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
    158   uint32_t buffer_size;
    159 
    160   // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
    161   for (unsigned i = 0; i < 2; i++) {
    162     scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
    163         MessagePipeDispatcher::kDefaultCreateOptions));
    164     scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
    165         MessagePipeDispatcher::kDefaultCreateOptions));
    166     {
    167       scoped_refptr<MessagePipe> mp(new MessagePipe());
    168       d0->Init(mp, i);  // 0, 1.
    169       d1->Init(mp, i ^ 1);  // 1, 0.
    170     }
    171     Waiter w;
    172 
    173     // Write (twice) to |d1|.
    174     buffer[0] = 123456789;
    175     EXPECT_EQ(MOJO_RESULT_OK,
    176               d1->WriteMessage(buffer, kBufferSize,
    177                                NULL,
    178                                MOJO_WRITE_MESSAGE_FLAG_NONE));
    179     buffer[0] = 234567890;
    180     EXPECT_EQ(MOJO_RESULT_OK,
    181               d1->WriteMessage(buffer, kBufferSize,
    182                                NULL,
    183                                MOJO_WRITE_MESSAGE_FLAG_NONE));
    184 
    185     // Try waiting for readable on |d0|; should fail (already satisfied).
    186     w.Init();
    187     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
    188               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0));
    189 
    190     // Try reading from |d1|; should fail (nothing to read).
    191     buffer[0] = 0;
    192     buffer_size = kBufferSize;
    193     EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
    194               d1->ReadMessage(buffer, &buffer_size,
    195                               0, NULL,
    196                               MOJO_READ_MESSAGE_FLAG_NONE));
    197 
    198     // Close |d1|.
    199     EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
    200 
    201     // Try waiting for readable on |d0|; should fail (already satisfied).
    202     w.Init();
    203     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
    204               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1));
    205 
    206     // Read from |d0|.
    207     buffer[0] = 0;
    208     buffer_size = kBufferSize;
    209     EXPECT_EQ(MOJO_RESULT_OK,
    210               d0->ReadMessage(buffer, &buffer_size,
    211                               0, NULL,
    212                               MOJO_READ_MESSAGE_FLAG_NONE));
    213     EXPECT_EQ(kBufferSize, buffer_size);
    214     EXPECT_EQ(123456789, buffer[0]);
    215 
    216     // Try waiting for readable on |d0|; should fail (already satisfied).
    217     w.Init();
    218     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
    219               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2));
    220 
    221     // Read again from |d0|.
    222     buffer[0] = 0;
    223     buffer_size = kBufferSize;
    224     EXPECT_EQ(MOJO_RESULT_OK,
    225               d0->ReadMessage(buffer, &buffer_size,
    226                               0, NULL,
    227                               MOJO_READ_MESSAGE_FLAG_NONE));
    228     EXPECT_EQ(kBufferSize, buffer_size);
    229     EXPECT_EQ(234567890, buffer[0]);
    230 
    231     // Try waiting for readable on |d0|; should fail (unsatisfiable).
    232     w.Init();
    233     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    234               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3));
    235 
    236     // Try waiting for writable on |d0|; should fail (unsatisfiable).
    237     w.Init();
    238     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    239               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 4));
    240 
    241     // Try reading from |d0|; should fail (nothing to read and other end
    242     // closed).
    243     buffer[0] = 0;
    244     buffer_size = kBufferSize;
    245     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    246               d0->ReadMessage(buffer, &buffer_size,
    247                               0, NULL,
    248                               MOJO_READ_MESSAGE_FLAG_NONE));
    249 
    250     // Try writing to |d0|; should fail (other end closed).
    251     buffer[0] = 345678901;
    252     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    253               d0->WriteMessage(buffer, kBufferSize,
    254                                NULL,
    255                                MOJO_WRITE_MESSAGE_FLAG_NONE));
    256 
    257     EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
    258   }
    259 }
    260 
    261 TEST(MessagePipeDispatcherTest, BasicThreaded) {
    262   test::Stopwatch stopwatch;
    263   int32_t buffer[1];
    264   const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
    265   uint32_t buffer_size;
    266   base::TimeDelta elapsed;
    267   bool did_wait;
    268   MojoResult result;
    269   uint32_t context;
    270 
    271   // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
    272   for (unsigned i = 0; i < 2; i++) {
    273     scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
    274         MessagePipeDispatcher::kDefaultCreateOptions));
    275     scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
    276         MessagePipeDispatcher::kDefaultCreateOptions));
    277     {
    278       scoped_refptr<MessagePipe> mp(new MessagePipe());
    279       d0->Init(mp, i);  // 0, 1.
    280       d1->Init(mp, i ^ 1);  // 1, 0.
    281     }
    282 
    283     // Wait for readable on |d1|, which will become readable after some time.
    284     {
    285       test::WaiterThread thread(d1,
    286                                MOJO_HANDLE_SIGNAL_READABLE,
    287                                MOJO_DEADLINE_INDEFINITE,
    288                                1,
    289                                &did_wait, &result, &context);
    290       stopwatch.Start();
    291       thread.Start();
    292       base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
    293       // Wake it up by writing to |d0|.
    294       buffer[0] = 123456789;
    295       EXPECT_EQ(MOJO_RESULT_OK,
    296                 d0->WriteMessage(buffer, kBufferSize,
    297                                  NULL,
    298                                  MOJO_WRITE_MESSAGE_FLAG_NONE));
    299     }  // Joins the thread.
    300     elapsed = stopwatch.Elapsed();
    301     EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout());
    302     EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout());
    303     EXPECT_TRUE(did_wait);
    304     EXPECT_EQ(MOJO_RESULT_OK, result);
    305     EXPECT_EQ(1u, context);
    306 
    307     // Now |d1| is already readable. Try waiting for it again.
    308     {
    309       test::WaiterThread thread(d1,
    310                                 MOJO_HANDLE_SIGNAL_READABLE,
    311                                 MOJO_DEADLINE_INDEFINITE,
    312                                 2,
    313                                 &did_wait, &result, &context);
    314       stopwatch.Start();
    315       thread.Start();
    316     }  // Joins the thread.
    317     EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
    318     EXPECT_FALSE(did_wait);
    319     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
    320 
    321     // Consume what we wrote to |d0|.
    322     buffer[0] = 0;
    323     buffer_size = kBufferSize;
    324     EXPECT_EQ(MOJO_RESULT_OK,
    325               d1->ReadMessage(buffer, &buffer_size,
    326                               0, NULL,
    327                               MOJO_READ_MESSAGE_FLAG_NONE));
    328     EXPECT_EQ(kBufferSize, buffer_size);
    329     EXPECT_EQ(123456789, buffer[0]);
    330 
    331     // Wait for readable on |d1| and close |d0| after some time, which should
    332     // cancel that wait.
    333     {
    334       test::WaiterThread thread(d1,
    335                                 MOJO_HANDLE_SIGNAL_READABLE,
    336                                 MOJO_DEADLINE_INDEFINITE,
    337                                 3,
    338                                 &did_wait, &result, &context);
    339       stopwatch.Start();
    340       thread.Start();
    341       base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
    342       EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
    343     }  // Joins the thread.
    344     elapsed = stopwatch.Elapsed();
    345     EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout());
    346     EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout());
    347     EXPECT_TRUE(did_wait);
    348     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
    349     EXPECT_EQ(3u, context);
    350 
    351     EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
    352   }
    353 
    354   for (unsigned i = 0; i < 2; i++) {
    355     scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
    356         MessagePipeDispatcher::kDefaultCreateOptions));
    357     scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
    358         MessagePipeDispatcher::kDefaultCreateOptions));
    359     {
    360       scoped_refptr<MessagePipe> mp(new MessagePipe());
    361       d0->Init(mp, i);  // 0, 1.
    362       d1->Init(mp, i ^ 1);  // 1, 0.
    363     }
    364 
    365     // Wait for readable on |d1| and close |d1| after some time, which should
    366     // cancel that wait.
    367     {
    368       test::WaiterThread thread(d1,
    369                                 MOJO_HANDLE_SIGNAL_READABLE,
    370                                 MOJO_DEADLINE_INDEFINITE,
    371                                 4,
    372                                 &did_wait, &result, &context);
    373       stopwatch.Start();
    374       thread.Start();
    375       base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
    376       EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
    377     }  // Joins the thread.
    378     elapsed = stopwatch.Elapsed();
    379     EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout());
    380     EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout());
    381     EXPECT_TRUE(did_wait);
    382     EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
    383     EXPECT_EQ(4u, context);
    384 
    385     EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
    386   }
    387 }
    388 
    389 // Stress test -----------------------------------------------------------------
    390 
    391 const size_t kMaxMessageSize = 2000;
    392 
    393 class WriterThread : public base::SimpleThread {
    394  public:
    395   // |*messages_written| and |*bytes_written| belong to the thread while it's
    396   // alive.
    397   WriterThread(scoped_refptr<Dispatcher> write_dispatcher,
    398                size_t* messages_written, size_t* bytes_written)
    399       : base::SimpleThread("writer_thread"),
    400         write_dispatcher_(write_dispatcher),
    401         messages_written_(messages_written),
    402         bytes_written_(bytes_written) {
    403     *messages_written_ = 0;
    404     *bytes_written_ = 0;
    405   }
    406 
    407   virtual ~WriterThread() {
    408     Join();
    409   }
    410 
    411  private:
    412   virtual void Run() OVERRIDE {
    413     // Make some data to write.
    414     unsigned char buffer[kMaxMessageSize];
    415     for (size_t i = 0; i < kMaxMessageSize; i++)
    416       buffer[i] = static_cast<unsigned char>(i);
    417 
    418     // Number of messages to write.
    419     *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000));
    420 
    421     // Write messages.
    422     for (size_t i = 0; i < *messages_written_; i++) {
    423       uint32_t bytes_to_write = static_cast<uint32_t>(
    424           base::RandInt(1, static_cast<int>(kMaxMessageSize)));
    425       EXPECT_EQ(MOJO_RESULT_OK,
    426                 write_dispatcher_->WriteMessage(buffer, bytes_to_write,
    427                                                 NULL,
    428                                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
    429       *bytes_written_ += bytes_to_write;
    430     }
    431 
    432     // Write one last "quit" message.
    433     EXPECT_EQ(MOJO_RESULT_OK,
    434               write_dispatcher_->WriteMessage("quit", 4,
    435                                               NULL,
    436                                               MOJO_WRITE_MESSAGE_FLAG_NONE));
    437   }
    438 
    439   const scoped_refptr<Dispatcher> write_dispatcher_;
    440   size_t* const messages_written_;
    441   size_t* const bytes_written_;
    442 
    443   DISALLOW_COPY_AND_ASSIGN(WriterThread);
    444 };
    445 
    446 class ReaderThread : public base::SimpleThread {
    447  public:
    448   // |*messages_read| and |*bytes_read| belong to the thread while it's alive.
    449   ReaderThread(scoped_refptr<Dispatcher> read_dispatcher,
    450                size_t* messages_read, size_t* bytes_read)
    451       : base::SimpleThread("reader_thread"),
    452         read_dispatcher_(read_dispatcher),
    453         messages_read_(messages_read),
    454         bytes_read_(bytes_read) {
    455     *messages_read_ = 0;
    456     *bytes_read_ = 0;
    457   }
    458 
    459   virtual ~ReaderThread() {
    460     Join();
    461   }
    462 
    463  private:
    464   virtual void Run() OVERRIDE {
    465     unsigned char buffer[kMaxMessageSize];
    466     MojoResult result;
    467     Waiter w;
    468 
    469     // Read messages.
    470     for (;;) {
    471       // Wait for it to be readable.
    472       w.Init();
    473       result = read_dispatcher_->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0);
    474       EXPECT_TRUE(result == MOJO_RESULT_OK ||
    475                   result == MOJO_RESULT_ALREADY_EXISTS) << "result: " << result;
    476       if (result == MOJO_RESULT_OK) {
    477         // Actually need to wait.
    478         EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, NULL));
    479         read_dispatcher_->RemoveWaiter(&w);
    480       }
    481 
    482       // Now, try to do the read.
    483       // Clear the buffer so that we can check the result.
    484       memset(buffer, 0, sizeof(buffer));
    485       uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
    486       result = read_dispatcher_->ReadMessage(buffer, &buffer_size,
    487                                              0, NULL,
    488                                              MOJO_READ_MESSAGE_FLAG_NONE);
    489       EXPECT_TRUE(result == MOJO_RESULT_OK ||
    490                   result == MOJO_RESULT_SHOULD_WAIT) << "result: " << result;
    491       // We're racing with others to read, so maybe we failed.
    492       if (result == MOJO_RESULT_SHOULD_WAIT)
    493         continue;  // In which case, try again.
    494       // Check for quit.
    495       if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0)
    496         return;
    497       EXPECT_GE(buffer_size, 1u);
    498       EXPECT_LE(buffer_size, kMaxMessageSize);
    499       EXPECT_TRUE(IsValidMessage(buffer, buffer_size));
    500 
    501       (*messages_read_)++;
    502       *bytes_read_ += buffer_size;
    503     }
    504   }
    505 
    506   static bool IsValidMessage(const unsigned char* buffer,
    507                              uint32_t message_size) {
    508     size_t i;
    509     for (i = 0; i < message_size; i++) {
    510       if (buffer[i] != static_cast<unsigned char>(i))
    511         return false;
    512     }
    513     // Check that the remaining bytes weren't stomped on.
    514     for (; i < kMaxMessageSize; i++) {
    515       if (buffer[i] != 0)
    516         return false;
    517     }
    518     return true;
    519   }
    520 
    521   const scoped_refptr<Dispatcher> read_dispatcher_;
    522   size_t* const messages_read_;
    523   size_t* const bytes_read_;
    524 
    525   DISALLOW_COPY_AND_ASSIGN(ReaderThread);
    526 };
    527 
    528 TEST(MessagePipeDispatcherTest, Stress) {
    529   static const size_t kNumWriters = 30;
    530   static const size_t kNumReaders = kNumWriters;
    531 
    532   scoped_refptr<MessagePipeDispatcher> d_write(new MessagePipeDispatcher(
    533         MessagePipeDispatcher::kDefaultCreateOptions));
    534   scoped_refptr<MessagePipeDispatcher> d_read(new MessagePipeDispatcher(
    535         MessagePipeDispatcher::kDefaultCreateOptions));
    536   {
    537     scoped_refptr<MessagePipe> mp(new MessagePipe());
    538     d_write->Init(mp, 0);
    539     d_read->Init(mp, 1);
    540   }
    541 
    542   size_t messages_written[kNumWriters];
    543   size_t bytes_written[kNumWriters];
    544   size_t messages_read[kNumReaders];
    545   size_t bytes_read[kNumReaders];
    546   {
    547     // Make writers.
    548     ScopedVector<WriterThread> writers;
    549     for (size_t i = 0; i < kNumWriters; i++) {
    550       writers.push_back(
    551           new WriterThread(d_write, &messages_written[i], &bytes_written[i]));
    552     }
    553 
    554     // Make readers.
    555     ScopedVector<ReaderThread> readers;
    556     for (size_t i = 0; i < kNumReaders; i++) {
    557       readers.push_back(
    558           new ReaderThread(d_read, &messages_read[i], &bytes_read[i]));
    559     }
    560 
    561     // Start writers.
    562     for (size_t i = 0; i < kNumWriters; i++)
    563       writers[i]->Start();
    564 
    565     // Start readers.
    566     for (size_t i = 0; i < kNumReaders; i++)
    567       readers[i]->Start();
    568 
    569     // TODO(vtl): Maybe I should have an event that triggers all the threads to
    570     // start doing stuff for real (so that the first ones created/started aren't
    571     // advantaged).
    572   }  // Joins all the threads.
    573 
    574   size_t total_messages_written = 0;
    575   size_t total_bytes_written = 0;
    576   for (size_t i = 0; i < kNumWriters; i++) {
    577     total_messages_written += messages_written[i];
    578     total_bytes_written += bytes_written[i];
    579   }
    580   size_t total_messages_read = 0;
    581   size_t total_bytes_read = 0;
    582   for (size_t i = 0; i < kNumReaders; i++) {
    583     total_messages_read += messages_read[i];
    584     total_bytes_read += bytes_read[i];
    585     // We'd have to be really unlucky to have read no messages on a thread.
    586     EXPECT_GT(messages_read[i], 0u) << "reader: " << i;
    587     EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i;
    588   }
    589   EXPECT_EQ(total_messages_written, total_messages_read);
    590   EXPECT_EQ(total_bytes_written, total_bytes_read);
    591 
    592   EXPECT_EQ(MOJO_RESULT_OK, d_write->Close());
    593   EXPECT_EQ(MOJO_RESULT_OK, d_read->Close());
    594 }
    595 
    596 }  // namespace
    597 }  // namespace system
    598 }  // namespace mojo
    599