Home | History | Annotate | Download | only in system
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 // NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a
      6 // heavily-loaded system). Sorry. |test::EpsilonTimeout()| may be increased to
      7 // increase tolerance and reduce observed flakiness (though doing so reduces the
      8 // meaningfulness of the test).
      9 
     10 #include "mojo/system/message_pipe_dispatcher.h"
     11 
     12 #include <string.h>
     13 
     14 #include <limits>
     15 
     16 #include "base/memory/ref_counted.h"
     17 #include "base/memory/scoped_vector.h"
     18 #include "base/rand_util.h"
     19 #include "base/threading/platform_thread.h"  // For |Sleep()|.
     20 #include "base/threading/simple_thread.h"
     21 #include "base/time/time.h"
     22 #include "mojo/system/message_pipe.h"
     23 #include "mojo/system/test_utils.h"
     24 #include "mojo/system/waiter.h"
     25 #include "mojo/system/waiter_test_utils.h"
     26 #include "testing/gtest/include/gtest/gtest.h"
     27 
     28 namespace mojo {
     29 namespace system {
     30 namespace {
     31 
     32 TEST(MessagePipeDispatcherTest, Basic) {
     33   test::Stopwatch stopwatch;
     34   int32_t buffer[1];
     35   const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
     36   uint32_t buffer_size;
     37 
     38   // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
     39   for (unsigned i = 0; i < 2; i++) {
     40     scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
     41         MessagePipeDispatcher::kDefaultCreateOptions));
     42     EXPECT_EQ(Dispatcher::kTypeMessagePipe, d0->GetType());
     43     scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
     44         MessagePipeDispatcher::kDefaultCreateOptions));
     45     {
     46       scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
     47       d0->Init(mp, i);      // 0, 1.
     48       d1->Init(mp, i ^ 1);  // 1, 0.
     49     }
     50     Waiter w;
     51     uint32_t context = 0;
     52     HandleSignalsState hss;
     53 
     54     // Try adding a writable waiter when already writable.
     55     w.Init();
     56     hss = HandleSignalsState();
     57     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
     58               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
     59     EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
     60     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
     61               hss.satisfiable_signals);
     62     // Shouldn't need to remove the waiter (it was not added).
     63 
     64     // Add a readable waiter to |d0|, then make it readable (by writing to
     65     // |d1|), then wait.
     66     w.Init();
     67     ASSERT_EQ(MOJO_RESULT_OK,
     68               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, nullptr));
     69     buffer[0] = 123456789;
     70     EXPECT_EQ(MOJO_RESULT_OK,
     71               d1->WriteMessage(UserPointer<const void>(buffer),
     72                                kBufferSize,
     73                                nullptr,
     74                                MOJO_WRITE_MESSAGE_FLAG_NONE));
     75     stopwatch.Start();
     76     EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, &context));
     77     EXPECT_EQ(1u, context);
     78     EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
     79     hss = HandleSignalsState();
     80     d0->RemoveWaiter(&w, &hss);
     81     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
     82               hss.satisfied_signals);
     83     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
     84               hss.satisfiable_signals);
     85 
     86     // Try adding a readable waiter when already readable (from above).
     87     w.Init();
     88     hss = HandleSignalsState();
     89     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
     90               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss));
     91     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
     92               hss.satisfied_signals);
     93     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
     94               hss.satisfiable_signals);
     95     // Shouldn't need to remove the waiter (it was not added).
     96 
     97     // Make |d0| no longer readable (by reading from it).
     98     buffer[0] = 0;
     99     buffer_size = kBufferSize;
    100     EXPECT_EQ(MOJO_RESULT_OK,
    101               d0->ReadMessage(UserPointer<void>(buffer),
    102                               MakeUserPointer(&buffer_size),
    103                               0,
    104                               nullptr,
    105                               MOJO_READ_MESSAGE_FLAG_NONE));
    106     EXPECT_EQ(kBufferSize, buffer_size);
    107     EXPECT_EQ(123456789, buffer[0]);
    108 
    109     // Wait for zero time for readability on |d0| (will time out).
    110     w.Init();
    111     ASSERT_EQ(MOJO_RESULT_OK,
    112               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr));
    113     stopwatch.Start();
    114     EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr));
    115     EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
    116     hss = HandleSignalsState();
    117     d0->RemoveWaiter(&w, &hss);
    118     EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
    119     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    120               hss.satisfiable_signals);
    121 
    122     // Wait for non-zero, finite time for readability on |d0| (will time out).
    123     w.Init();
    124     ASSERT_EQ(MOJO_RESULT_OK,
    125               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr));
    126     stopwatch.Start();
    127     EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
    128               w.Wait(2 * test::EpsilonTimeout().InMicroseconds(), nullptr));
    129     base::TimeDelta elapsed = stopwatch.Elapsed();
    130     EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
    131     EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
    132     hss = HandleSignalsState();
    133     d0->RemoveWaiter(&w, &hss);
    134     EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
    135     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    136               hss.satisfiable_signals);
    137 
    138     EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
    139     EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
    140   }
    141 }
    142 
    143 TEST(MessagePipeDispatcherTest, InvalidParams) {
    144   char buffer[1];
    145 
    146   scoped_refptr<MessagePipeDispatcher> d0(
    147       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
    148   scoped_refptr<MessagePipeDispatcher> d1(
    149       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
    150   {
    151     scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
    152     d0->Init(mp, 0);
    153     d1->Init(mp, 1);
    154   }
    155 
    156   // |WriteMessage|:
    157   // Huge buffer size.
    158   EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
    159             d0->WriteMessage(UserPointer<const void>(buffer),
    160                              std::numeric_limits<uint32_t>::max(),
    161                              nullptr,
    162                              MOJO_WRITE_MESSAGE_FLAG_NONE));
    163 
    164   EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
    165   EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
    166 }
    167 
    168 // These test invalid arguments that should cause death if we're being paranoid
    169 // about checking arguments (which we would want to do if, e.g., we were in a
    170 // true "kernel" situation, but we might not want to do otherwise for
    171 // performance reasons). Probably blatant errors like passing in null pointers
    172 // (for required pointer arguments) will still cause death, but perhaps not
    173 // predictably.
    174 TEST(MessagePipeDispatcherTest, InvalidParamsDeath) {
    175   const char kMemoryCheckFailedRegex[] = "Check failed";
    176 
    177   scoped_refptr<MessagePipeDispatcher> d0(
    178       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
    179   scoped_refptr<MessagePipeDispatcher> d1(
    180       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
    181   {
    182     scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
    183     d0->Init(mp, 0);
    184     d1->Init(mp, 1);
    185   }
    186 
    187   // |WriteMessage|:
    188   // Null buffer with nonzero buffer size.
    189   EXPECT_DEATH_IF_SUPPORTED(
    190       d0->WriteMessage(
    191           NullUserPointer(), 1, nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE),
    192       kMemoryCheckFailedRegex);
    193 
    194   // |ReadMessage|:
    195   // Null buffer with nonzero buffer size.
    196   // First write something so that we actually have something to read.
    197   EXPECT_EQ(MOJO_RESULT_OK,
    198             d1->WriteMessage(UserPointer<const void>("x"),
    199                              1,
    200                              nullptr,
    201                              MOJO_WRITE_MESSAGE_FLAG_NONE));
    202   uint32_t buffer_size = 1;
    203   EXPECT_DEATH_IF_SUPPORTED(d0->ReadMessage(NullUserPointer(),
    204                                             MakeUserPointer(&buffer_size),
    205                                             0,
    206                                             nullptr,
    207                                             MOJO_READ_MESSAGE_FLAG_NONE),
    208                             kMemoryCheckFailedRegex);
    209 
    210   EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
    211   EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
    212 }
    213 
    214 // Test what happens when one end is closed (single-threaded test).
    215 TEST(MessagePipeDispatcherTest, BasicClosed) {
    216   int32_t buffer[1];
    217   const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
    218   uint32_t buffer_size;
    219 
    220   // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
    221   for (unsigned i = 0; i < 2; i++) {
    222     scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
    223         MessagePipeDispatcher::kDefaultCreateOptions));
    224     scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
    225         MessagePipeDispatcher::kDefaultCreateOptions));
    226     {
    227       scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
    228       d0->Init(mp, i);      // 0, 1.
    229       d1->Init(mp, i ^ 1);  // 1, 0.
    230     }
    231     Waiter w;
    232     HandleSignalsState hss;
    233 
    234     // Write (twice) to |d1|.
    235     buffer[0] = 123456789;
    236     EXPECT_EQ(MOJO_RESULT_OK,
    237               d1->WriteMessage(UserPointer<const void>(buffer),
    238                                kBufferSize,
    239                                nullptr,
    240                                MOJO_WRITE_MESSAGE_FLAG_NONE));
    241     buffer[0] = 234567890;
    242     EXPECT_EQ(MOJO_RESULT_OK,
    243               d1->WriteMessage(UserPointer<const void>(buffer),
    244                                kBufferSize,
    245                                nullptr,
    246                                MOJO_WRITE_MESSAGE_FLAG_NONE));
    247 
    248     // Try waiting for readable on |d0|; should fail (already satisfied).
    249     w.Init();
    250     hss = HandleSignalsState();
    251     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
    252               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
    253     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    254               hss.satisfied_signals);
    255     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    256               hss.satisfiable_signals);
    257 
    258     // Try reading from |d1|; should fail (nothing to read).
    259     buffer[0] = 0;
    260     buffer_size = kBufferSize;
    261     EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
    262               d1->ReadMessage(UserPointer<void>(buffer),
    263                               MakeUserPointer(&buffer_size),
    264                               0,
    265                               nullptr,
    266                               MOJO_READ_MESSAGE_FLAG_NONE));
    267 
    268     // Close |d1|.
    269     EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
    270 
    271     // Try waiting for readable on |d0|; should fail (already satisfied).
    272     w.Init();
    273     hss = HandleSignalsState();
    274     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
    275               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, &hss));
    276     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    277     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
    278 
    279     // Read from |d0|.
    280     buffer[0] = 0;
    281     buffer_size = kBufferSize;
    282     EXPECT_EQ(MOJO_RESULT_OK,
    283               d0->ReadMessage(UserPointer<void>(buffer),
    284                               MakeUserPointer(&buffer_size),
    285                               0,
    286                               nullptr,
    287                               MOJO_READ_MESSAGE_FLAG_NONE));
    288     EXPECT_EQ(kBufferSize, buffer_size);
    289     EXPECT_EQ(123456789, buffer[0]);
    290 
    291     // Try waiting for readable on |d0|; should fail (already satisfied).
    292     w.Init();
    293     hss = HandleSignalsState();
    294     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
    295               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss));
    296     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
    297     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
    298 
    299     // Read again from |d0|.
    300     buffer[0] = 0;
    301     buffer_size = kBufferSize;
    302     EXPECT_EQ(MOJO_RESULT_OK,
    303               d0->ReadMessage(UserPointer<void>(buffer),
    304                               MakeUserPointer(&buffer_size),
    305                               0,
    306                               nullptr,
    307                               MOJO_READ_MESSAGE_FLAG_NONE));
    308     EXPECT_EQ(kBufferSize, buffer_size);
    309     EXPECT_EQ(234567890, buffer[0]);
    310 
    311     // Try waiting for readable on |d0|; should fail (unsatisfiable).
    312     w.Init();
    313     hss = HandleSignalsState();
    314     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    315               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, &hss));
    316     EXPECT_EQ(0u, hss.satisfied_signals);
    317     EXPECT_EQ(0u, hss.satisfiable_signals);
    318 
    319     // Try waiting for writable on |d0|; should fail (unsatisfiable).
    320     w.Init();
    321     hss = HandleSignalsState();
    322     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    323               d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 4, &hss));
    324     EXPECT_EQ(0u, hss.satisfied_signals);
    325     EXPECT_EQ(0u, hss.satisfiable_signals);
    326 
    327     // Try reading from |d0|; should fail (nothing to read and other end
    328     // closed).
    329     buffer[0] = 0;
    330     buffer_size = kBufferSize;
    331     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    332               d0->ReadMessage(UserPointer<void>(buffer),
    333                               MakeUserPointer(&buffer_size),
    334                               0,
    335                               nullptr,
    336                               MOJO_READ_MESSAGE_FLAG_NONE));
    337 
    338     // Try writing to |d0|; should fail (other end closed).
    339     buffer[0] = 345678901;
    340     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
    341               d0->WriteMessage(UserPointer<const void>(buffer),
    342                                kBufferSize,
    343                                nullptr,
    344                                MOJO_WRITE_MESSAGE_FLAG_NONE));
    345 
    346     EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
    347   }
    348 }
    349 
    350 #if defined(OS_WIN)
    351 // http://crbug.com/396386
    352 #define MAYBE_BasicThreaded DISABLED_BasicThreaded
    353 #else
    354 #define MAYBE_BasicThreaded BasicThreaded
    355 #endif
    356 TEST(MessagePipeDispatcherTest, MAYBE_BasicThreaded) {
    357   test::Stopwatch stopwatch;
    358   int32_t buffer[1];
    359   const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
    360   uint32_t buffer_size;
    361   base::TimeDelta elapsed;
    362   bool did_wait;
    363   MojoResult result;
    364   uint32_t context;
    365   HandleSignalsState hss;
    366 
    367   // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
    368   for (unsigned i = 0; i < 2; i++) {
    369     scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
    370         MessagePipeDispatcher::kDefaultCreateOptions));
    371     scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
    372         MessagePipeDispatcher::kDefaultCreateOptions));
    373     {
    374       scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
    375       d0->Init(mp, i);      // 0, 1.
    376       d1->Init(mp, i ^ 1);  // 1, 0.
    377     }
    378 
    379     // Wait for readable on |d1|, which will become readable after some time.
    380     {
    381       test::WaiterThread thread(d1,
    382                                 MOJO_HANDLE_SIGNAL_READABLE,
    383                                 MOJO_DEADLINE_INDEFINITE,
    384                                 1,
    385                                 &did_wait,
    386                                 &result,
    387                                 &context,
    388                                 &hss);
    389       stopwatch.Start();
    390       thread.Start();
    391       base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
    392       // Wake it up by writing to |d0|.
    393       buffer[0] = 123456789;
    394       EXPECT_EQ(MOJO_RESULT_OK,
    395                 d0->WriteMessage(UserPointer<const void>(buffer),
    396                                  kBufferSize,
    397                                  nullptr,
    398                                  MOJO_WRITE_MESSAGE_FLAG_NONE));
    399     }  // Joins the thread.
    400     elapsed = stopwatch.Elapsed();
    401     EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
    402     EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
    403     EXPECT_TRUE(did_wait);
    404     EXPECT_EQ(MOJO_RESULT_OK, result);
    405     EXPECT_EQ(1u, context);
    406     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    407               hss.satisfied_signals);
    408     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    409               hss.satisfiable_signals);
    410 
    411     // Now |d1| is already readable. Try waiting for it again.
    412     {
    413       test::WaiterThread thread(d1,
    414                                 MOJO_HANDLE_SIGNAL_READABLE,
    415                                 MOJO_DEADLINE_INDEFINITE,
    416                                 2,
    417                                 &did_wait,
    418                                 &result,
    419                                 &context,
    420                                 &hss);
    421       stopwatch.Start();
    422       thread.Start();
    423     }  // Joins the thread.
    424     EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
    425     EXPECT_FALSE(did_wait);
    426     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
    427     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    428               hss.satisfied_signals);
    429     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
    430               hss.satisfiable_signals);
    431 
    432     // Consume what we wrote to |d0|.
    433     buffer[0] = 0;
    434     buffer_size = kBufferSize;
    435     EXPECT_EQ(MOJO_RESULT_OK,
    436               d1->ReadMessage(UserPointer<void>(buffer),
    437                               MakeUserPointer(&buffer_size),
    438                               0,
    439                               nullptr,
    440                               MOJO_READ_MESSAGE_FLAG_NONE));
    441     EXPECT_EQ(kBufferSize, buffer_size);
    442     EXPECT_EQ(123456789, buffer[0]);
    443 
    444     // Wait for readable on |d1| and close |d0| after some time, which should
    445     // cancel that wait.
    446     {
    447       test::WaiterThread thread(d1,
    448                                 MOJO_HANDLE_SIGNAL_READABLE,
    449                                 MOJO_DEADLINE_INDEFINITE,
    450                                 3,
    451                                 &did_wait,
    452                                 &result,
    453                                 &context,
    454                                 &hss);
    455       stopwatch.Start();
    456       thread.Start();
    457       base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
    458       EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
    459     }  // Joins the thread.
    460     elapsed = stopwatch.Elapsed();
    461     EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
    462     EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
    463     EXPECT_TRUE(did_wait);
    464     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
    465     EXPECT_EQ(3u, context);
    466     EXPECT_EQ(0u, hss.satisfied_signals);
    467     EXPECT_EQ(0u, hss.satisfiable_signals);
    468 
    469     EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
    470   }
    471 
    472   for (unsigned i = 0; i < 2; i++) {
    473     scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
    474         MessagePipeDispatcher::kDefaultCreateOptions));
    475     scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
    476         MessagePipeDispatcher::kDefaultCreateOptions));
    477     {
    478       scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
    479       d0->Init(mp, i);      // 0, 1.
    480       d1->Init(mp, i ^ 1);  // 1, 0.
    481     }
    482 
    483     // Wait for readable on |d1| and close |d1| after some time, which should
    484     // cancel that wait.
    485     {
    486       test::WaiterThread thread(d1,
    487                                 MOJO_HANDLE_SIGNAL_READABLE,
    488                                 MOJO_DEADLINE_INDEFINITE,
    489                                 4,
    490                                 &did_wait,
    491                                 &result,
    492                                 &context,
    493                                 &hss);
    494       stopwatch.Start();
    495       thread.Start();
    496       base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
    497       EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
    498     }  // Joins the thread.
    499     elapsed = stopwatch.Elapsed();
    500     EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
    501     EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
    502     EXPECT_TRUE(did_wait);
    503     EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
    504     EXPECT_EQ(4u, context);
    505     EXPECT_EQ(0u, hss.satisfied_signals);
    506     EXPECT_EQ(0u, hss.satisfiable_signals);
    507 
    508     EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
    509   }
    510 }
    511 
    512 // Stress test -----------------------------------------------------------------
    513 
    514 const size_t kMaxMessageSize = 2000;
    515 
    516 class WriterThread : public base::SimpleThread {
    517  public:
    518   // |*messages_written| and |*bytes_written| belong to the thread while it's
    519   // alive.
    520   WriterThread(scoped_refptr<Dispatcher> write_dispatcher,
    521                size_t* messages_written,
    522                size_t* bytes_written)
    523       : base::SimpleThread("writer_thread"),
    524         write_dispatcher_(write_dispatcher),
    525         messages_written_(messages_written),
    526         bytes_written_(bytes_written) {
    527     *messages_written_ = 0;
    528     *bytes_written_ = 0;
    529   }
    530 
    531   virtual ~WriterThread() { Join(); }
    532 
    533  private:
    534   virtual void Run() OVERRIDE {
    535     // Make some data to write.
    536     unsigned char buffer[kMaxMessageSize];
    537     for (size_t i = 0; i < kMaxMessageSize; i++)
    538       buffer[i] = static_cast<unsigned char>(i);
    539 
    540     // Number of messages to write.
    541     *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000));
    542 
    543     // Write messages.
    544     for (size_t i = 0; i < *messages_written_; i++) {
    545       uint32_t bytes_to_write = static_cast<uint32_t>(
    546           base::RandInt(1, static_cast<int>(kMaxMessageSize)));
    547       EXPECT_EQ(MOJO_RESULT_OK,
    548                 write_dispatcher_->WriteMessage(UserPointer<const void>(buffer),
    549                                                 bytes_to_write,
    550                                                 nullptr,
    551                                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
    552       *bytes_written_ += bytes_to_write;
    553     }
    554 
    555     // Write one last "quit" message.
    556     EXPECT_EQ(MOJO_RESULT_OK,
    557               write_dispatcher_->WriteMessage(UserPointer<const void>("quit"),
    558                                               4,
    559                                               nullptr,
    560                                               MOJO_WRITE_MESSAGE_FLAG_NONE));
    561   }
    562 
    563   const scoped_refptr<Dispatcher> write_dispatcher_;
    564   size_t* const messages_written_;
    565   size_t* const bytes_written_;
    566 
    567   DISALLOW_COPY_AND_ASSIGN(WriterThread);
    568 };
    569 
    570 class ReaderThread : public base::SimpleThread {
    571  public:
    572   // |*messages_read| and |*bytes_read| belong to the thread while it's alive.
    573   ReaderThread(scoped_refptr<Dispatcher> read_dispatcher,
    574                size_t* messages_read,
    575                size_t* bytes_read)
    576       : base::SimpleThread("reader_thread"),
    577         read_dispatcher_(read_dispatcher),
    578         messages_read_(messages_read),
    579         bytes_read_(bytes_read) {
    580     *messages_read_ = 0;
    581     *bytes_read_ = 0;
    582   }
    583 
    584   virtual ~ReaderThread() { Join(); }
    585 
    586  private:
    587   virtual void Run() OVERRIDE {
    588     unsigned char buffer[kMaxMessageSize];
    589     Waiter w;
    590     HandleSignalsState hss;
    591     MojoResult result;
    592 
    593     // Read messages.
    594     for (;;) {
    595       // Wait for it to be readable.
    596       w.Init();
    597       hss = HandleSignalsState();
    598       result =
    599           read_dispatcher_->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss);
    600       EXPECT_TRUE(result == MOJO_RESULT_OK ||
    601                   result == MOJO_RESULT_ALREADY_EXISTS)
    602           << "result: " << result;
    603       if (result == MOJO_RESULT_OK) {
    604         // Actually need to wait.
    605         EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, nullptr));
    606         read_dispatcher_->RemoveWaiter(&w, &hss);
    607       }
    608       // We may not actually be readable, since we're racing with other threads.
    609       EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
    610 
    611       // Now, try to do the read.
    612       // Clear the buffer so that we can check the result.
    613       memset(buffer, 0, sizeof(buffer));
    614       uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
    615       result = read_dispatcher_->ReadMessage(UserPointer<void>(buffer),
    616                                              MakeUserPointer(&buffer_size),
    617                                              0,
    618                                              nullptr,
    619                                              MOJO_READ_MESSAGE_FLAG_NONE);
    620       EXPECT_TRUE(result == MOJO_RESULT_OK || result == MOJO_RESULT_SHOULD_WAIT)
    621           << "result: " << result;
    622       // We're racing with others to read, so maybe we failed.
    623       if (result == MOJO_RESULT_SHOULD_WAIT)
    624         continue;  // In which case, try again.
    625       // Check for quit.
    626       if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0)
    627         return;
    628       EXPECT_GE(buffer_size, 1u);
    629       EXPECT_LE(buffer_size, kMaxMessageSize);
    630       EXPECT_TRUE(IsValidMessage(buffer, buffer_size));
    631 
    632       (*messages_read_)++;
    633       *bytes_read_ += buffer_size;
    634     }
    635   }
    636 
    637   static bool IsValidMessage(const unsigned char* buffer,
    638                              uint32_t message_size) {
    639     size_t i;
    640     for (i = 0; i < message_size; i++) {
    641       if (buffer[i] != static_cast<unsigned char>(i))
    642         return false;
    643     }
    644     // Check that the remaining bytes weren't stomped on.
    645     for (; i < kMaxMessageSize; i++) {
    646       if (buffer[i] != 0)
    647         return false;
    648     }
    649     return true;
    650   }
    651 
    652   const scoped_refptr<Dispatcher> read_dispatcher_;
    653   size_t* const messages_read_;
    654   size_t* const bytes_read_;
    655 
    656   DISALLOW_COPY_AND_ASSIGN(ReaderThread);
    657 };
    658 
    659 TEST(MessagePipeDispatcherTest, Stress) {
    660   static const size_t kNumWriters = 30;
    661   static const size_t kNumReaders = kNumWriters;
    662 
    663   scoped_refptr<MessagePipeDispatcher> d_write(
    664       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
    665   scoped_refptr<MessagePipeDispatcher> d_read(
    666       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
    667   {
    668     scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
    669     d_write->Init(mp, 0);
    670     d_read->Init(mp, 1);
    671   }
    672 
    673   size_t messages_written[kNumWriters];
    674   size_t bytes_written[kNumWriters];
    675   size_t messages_read[kNumReaders];
    676   size_t bytes_read[kNumReaders];
    677   {
    678     // Make writers.
    679     ScopedVector<WriterThread> writers;
    680     for (size_t i = 0; i < kNumWriters; i++) {
    681       writers.push_back(
    682           new WriterThread(d_write, &messages_written[i], &bytes_written[i]));
    683     }
    684 
    685     // Make readers.
    686     ScopedVector<ReaderThread> readers;
    687     for (size_t i = 0; i < kNumReaders; i++) {
    688       readers.push_back(
    689           new ReaderThread(d_read, &messages_read[i], &bytes_read[i]));
    690     }
    691 
    692     // Start writers.
    693     for (size_t i = 0; i < kNumWriters; i++)
    694       writers[i]->Start();
    695 
    696     // Start readers.
    697     for (size_t i = 0; i < kNumReaders; i++)
    698       readers[i]->Start();
    699 
    700     // TODO(vtl): Maybe I should have an event that triggers all the threads to
    701     // start doing stuff for real (so that the first ones created/started aren't
    702     // advantaged).
    703   }  // Joins all the threads.
    704 
    705   size_t total_messages_written = 0;
    706   size_t total_bytes_written = 0;
    707   for (size_t i = 0; i < kNumWriters; i++) {
    708     total_messages_written += messages_written[i];
    709     total_bytes_written += bytes_written[i];
    710   }
    711   size_t total_messages_read = 0;
    712   size_t total_bytes_read = 0;
    713   for (size_t i = 0; i < kNumReaders; i++) {
    714     total_messages_read += messages_read[i];
    715     total_bytes_read += bytes_read[i];
    716     // We'd have to be really unlucky to have read no messages on a thread.
    717     EXPECT_GT(messages_read[i], 0u) << "reader: " << i;
    718     EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i;
    719   }
    720   EXPECT_EQ(total_messages_written, total_messages_read);
    721   EXPECT_EQ(total_bytes_written, total_bytes_read);
    722 
    723   EXPECT_EQ(MOJO_RESULT_OK, d_write->Close());
    724   EXPECT_EQ(MOJO_RESULT_OK, d_read->Close());
    725 }
    726 
    727 }  // namespace
    728 }  // namespace system
    729 }  // namespace mojo
    730