Home | History | Annotate | Download | only in message_pump
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/message_pump/handle_watcher.h"
      6 
      7 #include <memory>
      8 #include <string>
      9 
     10 #include "base/at_exit.h"
     11 #include "base/auto_reset.h"
     12 #include "base/bind.h"
     13 #include "base/macros.h"
     14 #include "base/memory/scoped_vector.h"
     15 #include "base/run_loop.h"
     16 #include "base/test/simple_test_tick_clock.h"
     17 #include "base/threading/thread.h"
     18 #include "mojo/message_pump/message_pump_mojo.h"
     19 #include "mojo/message_pump/time_helper.h"
     20 #include "mojo/public/cpp/system/core.h"
     21 #include "mojo/public/cpp/test_support/test_utils.h"
     22 #include "testing/gtest/include/gtest/gtest.h"
     23 
     24 namespace mojo {
     25 namespace common {
     26 namespace test {
     27 
     28 enum MessageLoopConfig {
     29   MESSAGE_LOOP_CONFIG_DEFAULT = 0,
     30   MESSAGE_LOOP_CONFIG_MOJO = 1
     31 };
     32 
     33 void ObserveCallback(bool* was_signaled,
     34                      MojoResult* result_observed,
     35                      MojoResult result) {
     36   *was_signaled = true;
     37   *result_observed = result;
     38 }
     39 
     40 void RunUntilIdle() {
     41   base::RunLoop run_loop;
     42   run_loop.RunUntilIdle();
     43 }
     44 
     45 void DeleteWatcherAndForwardResult(
     46     HandleWatcher* watcher,
     47     base::Callback<void(MojoResult)> next_callback,
     48     MojoResult result) {
     49   delete watcher;
     50   next_callback.Run(result);
     51 }
     52 
     53 std::unique_ptr<base::MessageLoop> CreateMessageLoop(MessageLoopConfig config) {
     54   std::unique_ptr<base::MessageLoop> loop;
     55   if (config == MESSAGE_LOOP_CONFIG_DEFAULT)
     56     loop.reset(new base::MessageLoop());
     57   else
     58     loop.reset(new base::MessageLoop(MessagePumpMojo::Create()));
     59   return loop;
     60 }
     61 
     62 // Helper class to manage the callback and running the message loop waiting for
     63 // message to be received. Typical usage is something like:
     64 //   Schedule callback returned from GetCallback().
     65 //   RunUntilGotCallback();
     66 //   EXPECT_TRUE(got_callback());
     67 //   clear_callback();
     68 class CallbackHelper {
     69  public:
     70   CallbackHelper()
     71       : got_callback_(false),
     72         run_loop_(NULL),
     73         weak_factory_(this) {}
     74   ~CallbackHelper() {}
     75 
     76   // See description above |got_callback_|.
     77   bool got_callback() const { return got_callback_; }
     78   void clear_callback() { got_callback_ = false; }
     79 
     80   // Runs the current MessageLoop until the callback returned from GetCallback()
     81   // is notified.
     82   void RunUntilGotCallback() {
     83     ASSERT_TRUE(run_loop_ == NULL);
     84     base::RunLoop run_loop;
     85     base::AutoReset<base::RunLoop*> reseter(&run_loop_, &run_loop);
     86     run_loop.Run();
     87   }
     88 
     89   base::Callback<void(MojoResult)> GetCallback() {
     90     return base::Bind(&CallbackHelper::OnCallback, weak_factory_.GetWeakPtr());
     91   }
     92 
     93   void Start(HandleWatcher* watcher, const MessagePipeHandle& handle) {
     94     StartWithCallback(watcher, handle, GetCallback());
     95   }
     96 
     97   void StartWithCallback(HandleWatcher* watcher,
     98                          const MessagePipeHandle& handle,
     99                          const base::Callback<void(MojoResult)>& callback) {
    100     watcher->Start(handle, MOJO_HANDLE_SIGNAL_READABLE,
    101                    MOJO_DEADLINE_INDEFINITE, callback);
    102   }
    103 
    104  private:
    105   void OnCallback(MojoResult result) {
    106     got_callback_ = true;
    107     if (run_loop_)
    108       run_loop_->Quit();
    109   }
    110 
    111   // Set to true when the callback is called.
    112   bool got_callback_;
    113 
    114   // If non-NULL we're in RunUntilGotCallback().
    115   base::RunLoop* run_loop_;
    116 
    117   base::WeakPtrFactory<CallbackHelper> weak_factory_;
    118 
    119  private:
    120   DISALLOW_COPY_AND_ASSIGN(CallbackHelper);
    121 };
    122 
    123 class HandleWatcherTest : public testing::TestWithParam<MessageLoopConfig> {
    124  public:
    125   HandleWatcherTest()
    126       : at_exit_(new base::ShadowingAtExitManager),
    127         message_loop_(CreateMessageLoop(GetParam())) {}
    128   virtual ~HandleWatcherTest() {
    129     // By explicitly destroying |at_exit_| before resetting the tick clock, it
    130     // ensures that the handle watcher thread (if there is one) is shut down,
    131     // preventing a race with users of the tick clock in MessagePumpMojo.
    132     at_exit_.reset();
    133     test::SetTickClockForTest(NULL);
    134   }
    135 
    136  protected:
    137   void TearDownMessageLoop() {
    138     message_loop_.reset();
    139   }
    140 
    141   // This should be called at the beginning of any test that needs it, so that
    142   // it is installed before the handle watcher thread starts.
    143   void InstallTickClock() {
    144     test::SetTickClockForTest(&tick_clock_);
    145   }
    146 
    147   base::SimpleTestTickClock tick_clock_;
    148 
    149  private:
    150   std::unique_ptr<base::ShadowingAtExitManager> at_exit_;
    151   std::unique_ptr<base::MessageLoop> message_loop_;
    152 
    153   DISALLOW_COPY_AND_ASSIGN(HandleWatcherTest);
    154 };
    155 
    156 INSTANTIATE_TEST_CASE_P(
    157     MultipleMessageLoopConfigs, HandleWatcherTest,
    158     testing::Values(MESSAGE_LOOP_CONFIG_DEFAULT, MESSAGE_LOOP_CONFIG_MOJO));
    159 
    160 // Trivial test case with a single handle to watch.
    161 TEST_P(HandleWatcherTest, SingleHandler) {
    162   MessagePipe test_pipe;
    163   ASSERT_TRUE(test_pipe.handle0.is_valid());
    164   CallbackHelper callback_helper;
    165   HandleWatcher watcher;
    166   callback_helper.Start(&watcher, test_pipe.handle0.get());
    167   RunUntilIdle();
    168   EXPECT_FALSE(callback_helper.got_callback());
    169   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle1.get(),
    170                                            std::string()));
    171   callback_helper.RunUntilGotCallback();
    172   EXPECT_TRUE(callback_helper.got_callback());
    173 }
    174 
    175 // Creates three handles and notfies them in reverse order ensuring each one is
    176 // notified appropriately.
    177 TEST_P(HandleWatcherTest, ThreeHandles) {
    178   MessagePipe test_pipe1;
    179   MessagePipe test_pipe2;
    180   MessagePipe test_pipe3;
    181   CallbackHelper callback_helper1;
    182   CallbackHelper callback_helper2;
    183   CallbackHelper callback_helper3;
    184   ASSERT_TRUE(test_pipe1.handle0.is_valid());
    185   ASSERT_TRUE(test_pipe2.handle0.is_valid());
    186   ASSERT_TRUE(test_pipe3.handle0.is_valid());
    187 
    188   HandleWatcher watcher1;
    189   callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
    190   RunUntilIdle();
    191   EXPECT_FALSE(callback_helper1.got_callback());
    192   EXPECT_FALSE(callback_helper2.got_callback());
    193   EXPECT_FALSE(callback_helper3.got_callback());
    194 
    195   HandleWatcher watcher2;
    196   callback_helper2.Start(&watcher2, test_pipe2.handle0.get());
    197   RunUntilIdle();
    198   EXPECT_FALSE(callback_helper1.got_callback());
    199   EXPECT_FALSE(callback_helper2.got_callback());
    200   EXPECT_FALSE(callback_helper3.got_callback());
    201 
    202   HandleWatcher watcher3;
    203   callback_helper3.Start(&watcher3, test_pipe3.handle0.get());
    204   RunUntilIdle();
    205   EXPECT_FALSE(callback_helper1.got_callback());
    206   EXPECT_FALSE(callback_helper2.got_callback());
    207   EXPECT_FALSE(callback_helper3.got_callback());
    208 
    209   // Write to 3 and make sure it's notified.
    210   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe3.handle1.get(),
    211                                            std::string()));
    212   callback_helper3.RunUntilGotCallback();
    213   EXPECT_FALSE(callback_helper1.got_callback());
    214   EXPECT_FALSE(callback_helper2.got_callback());
    215   EXPECT_TRUE(callback_helper3.got_callback());
    216   callback_helper3.clear_callback();
    217 
    218   // Write to 1 and 3. Only 1 should be notified since 3 was is no longer
    219   // running.
    220   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
    221                                            std::string()));
    222   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe3.handle1.get(),
    223                                            std::string()));
    224   callback_helper1.RunUntilGotCallback();
    225   EXPECT_TRUE(callback_helper1.got_callback());
    226   EXPECT_FALSE(callback_helper2.got_callback());
    227   EXPECT_FALSE(callback_helper3.got_callback());
    228   callback_helper1.clear_callback();
    229 
    230   // Write to 1 and 2. Only 2 should be notified (since 1 was already notified).
    231   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
    232                                            std::string()));
    233   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe2.handle1.get(),
    234                                            std::string()));
    235   callback_helper2.RunUntilGotCallback();
    236   EXPECT_FALSE(callback_helper1.got_callback());
    237   EXPECT_TRUE(callback_helper2.got_callback());
    238   EXPECT_FALSE(callback_helper3.got_callback());
    239 }
    240 
    241 // Verifies Start() invoked a second time works.
    242 TEST_P(HandleWatcherTest, Restart) {
    243   MessagePipe test_pipe1;
    244   MessagePipe test_pipe2;
    245   CallbackHelper callback_helper1;
    246   CallbackHelper callback_helper2;
    247   ASSERT_TRUE(test_pipe1.handle0.is_valid());
    248   ASSERT_TRUE(test_pipe2.handle0.is_valid());
    249 
    250   HandleWatcher watcher1;
    251   callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
    252   RunUntilIdle();
    253   EXPECT_FALSE(callback_helper1.got_callback());
    254   EXPECT_FALSE(callback_helper2.got_callback());
    255 
    256   HandleWatcher watcher2;
    257   callback_helper2.Start(&watcher2, test_pipe2.handle0.get());
    258   RunUntilIdle();
    259   EXPECT_FALSE(callback_helper1.got_callback());
    260   EXPECT_FALSE(callback_helper2.got_callback());
    261 
    262   // Write to 1 and make sure it's notified.
    263   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
    264                                            std::string()));
    265   callback_helper1.RunUntilGotCallback();
    266   EXPECT_TRUE(callback_helper1.got_callback());
    267   EXPECT_FALSE(callback_helper2.got_callback());
    268   callback_helper1.clear_callback();
    269   EXPECT_TRUE(mojo::test::DiscardMessage(test_pipe1.handle0.get()));
    270 
    271   // Write to 2 and make sure it's notified.
    272   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe2.handle1.get(),
    273                                            std::string()));
    274   callback_helper2.RunUntilGotCallback();
    275   EXPECT_FALSE(callback_helper1.got_callback());
    276   EXPECT_TRUE(callback_helper2.got_callback());
    277   callback_helper2.clear_callback();
    278 
    279   // Listen on 1 again.
    280   callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
    281   RunUntilIdle();
    282   EXPECT_FALSE(callback_helper1.got_callback());
    283   EXPECT_FALSE(callback_helper2.got_callback());
    284 
    285   // Write to 1 and make sure it's notified.
    286   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
    287                                            std::string()));
    288   callback_helper1.RunUntilGotCallback();
    289   EXPECT_TRUE(callback_helper1.got_callback());
    290   EXPECT_FALSE(callback_helper2.got_callback());
    291 }
    292 
    293 // Verifies Start() invoked a second time on the same handle works.
    294 TEST_P(HandleWatcherTest, RestartOnSameHandle) {
    295   MessagePipe test_pipe;
    296   CallbackHelper callback_helper;
    297   ASSERT_TRUE(test_pipe.handle0.is_valid());
    298 
    299   HandleWatcher watcher;
    300   callback_helper.Start(&watcher, test_pipe.handle0.get());
    301   RunUntilIdle();
    302   EXPECT_FALSE(callback_helper.got_callback());
    303 
    304   callback_helper.Start(&watcher, test_pipe.handle0.get());
    305   RunUntilIdle();
    306   EXPECT_FALSE(callback_helper.got_callback());
    307 }
    308 
    309 // Verifies deadline is honored.
    310 TEST_P(HandleWatcherTest, Deadline) {
    311   InstallTickClock();
    312 
    313   MessagePipe test_pipe1;
    314   MessagePipe test_pipe2;
    315   MessagePipe test_pipe3;
    316   CallbackHelper callback_helper1;
    317   CallbackHelper callback_helper2;
    318   CallbackHelper callback_helper3;
    319   ASSERT_TRUE(test_pipe1.handle0.is_valid());
    320   ASSERT_TRUE(test_pipe2.handle0.is_valid());
    321   ASSERT_TRUE(test_pipe3.handle0.is_valid());
    322 
    323   // Add a watcher with an infinite timeout.
    324   HandleWatcher watcher1;
    325   callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
    326   RunUntilIdle();
    327   EXPECT_FALSE(callback_helper1.got_callback());
    328   EXPECT_FALSE(callback_helper2.got_callback());
    329   EXPECT_FALSE(callback_helper3.got_callback());
    330 
    331   // Add another watcher wth a timeout of 500 microseconds.
    332   HandleWatcher watcher2;
    333   watcher2.Start(test_pipe2.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE, 500,
    334                  callback_helper2.GetCallback());
    335   RunUntilIdle();
    336   EXPECT_FALSE(callback_helper1.got_callback());
    337   EXPECT_FALSE(callback_helper2.got_callback());
    338   EXPECT_FALSE(callback_helper3.got_callback());
    339 
    340   // Advance the clock passed the deadline. We also have to start another
    341   // watcher to wake up the background thread.
    342   tick_clock_.Advance(base::TimeDelta::FromMicroseconds(501));
    343 
    344   HandleWatcher watcher3;
    345   callback_helper3.Start(&watcher3, test_pipe3.handle0.get());
    346 
    347   callback_helper2.RunUntilGotCallback();
    348   EXPECT_FALSE(callback_helper1.got_callback());
    349   EXPECT_TRUE(callback_helper2.got_callback());
    350   EXPECT_FALSE(callback_helper3.got_callback());
    351 }
    352 
    353 TEST_P(HandleWatcherTest, DeleteInCallback) {
    354   MessagePipe test_pipe;
    355   CallbackHelper callback_helper;
    356 
    357   HandleWatcher* watcher = new HandleWatcher();
    358   callback_helper.StartWithCallback(watcher, test_pipe.handle1.get(),
    359                                     base::Bind(&DeleteWatcherAndForwardResult,
    360                                                watcher,
    361                                                callback_helper.GetCallback()));
    362   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle0.get(),
    363                                            std::string()));
    364   callback_helper.RunUntilGotCallback();
    365   EXPECT_TRUE(callback_helper.got_callback());
    366 }
    367 
    368 TEST_P(HandleWatcherTest, AbortedOnMessageLoopDestruction) {
    369   bool was_signaled = false;
    370   MojoResult result = MOJO_RESULT_OK;
    371 
    372   MessagePipe pipe;
    373   HandleWatcher watcher;
    374   watcher.Start(pipe.handle0.get(),
    375                 MOJO_HANDLE_SIGNAL_READABLE,
    376                 MOJO_DEADLINE_INDEFINITE,
    377                 base::Bind(&ObserveCallback, &was_signaled, &result));
    378 
    379   // Now, let the MessageLoop get torn down. We expect our callback to run.
    380   TearDownMessageLoop();
    381 
    382   EXPECT_TRUE(was_signaled);
    383   EXPECT_EQ(MOJO_RESULT_ABORTED, result);
    384 }
    385 
    386 void NeverReached(MojoResult result) {
    387   FAIL() << "Callback should never be invoked " << result;
    388 }
    389 
    390 // Called on the main thread when a thread is done. Decrements |active_count|
    391 // and if |active_count| is zero quits |run_loop|.
    392 void StressThreadDone(base::RunLoop* run_loop, int* active_count) {
    393   (*active_count)--;
    394   EXPECT_GE(*active_count, 0);
    395   if (*active_count == 0)
    396     run_loop->Quit();
    397 }
    398 
    399 // See description of StressTest. This is called on the background thread.
    400 // |count| is the number of HandleWatchers to create. |active_count| is the
    401 // number of outstanding threads, |task_runner| the task runner for the main
    402 // thread and |run_loop| the run loop that should be quit when there are no more
    403 // threads running. When done StressThreadDone() is invoked on the main thread.
    404 // |active_count| and |run_loop| should only be used on the main thread.
    405 void RunStressTest(int count,
    406                    scoped_refptr<base::TaskRunner> task_runner,
    407                    base::RunLoop* run_loop,
    408                    int* active_count) {
    409   struct TestData {
    410     MessagePipe pipe;
    411     HandleWatcher watcher;
    412   };
    413   ScopedVector<TestData> data_vector;
    414   for (int i = 0; i < count; ++i) {
    415     if (i % 20 == 0) {
    416       // Every so often we wait. This results in some level of thread balancing
    417       // as well as making sure HandleWatcher has time to actually start some
    418       // watches.
    419       MessagePipe test_pipe;
    420       ASSERT_TRUE(test_pipe.handle0.is_valid());
    421       CallbackHelper callback_helper;
    422       HandleWatcher watcher;
    423       callback_helper.Start(&watcher, test_pipe.handle0.get());
    424       RunUntilIdle();
    425       EXPECT_FALSE(callback_helper.got_callback());
    426       EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle1.get(),
    427                                                std::string()));
    428       base::MessageLoop::ScopedNestableTaskAllower scoper(
    429           base::MessageLoop::current());
    430       callback_helper.RunUntilGotCallback();
    431       EXPECT_TRUE(callback_helper.got_callback());
    432     } else {
    433       std::unique_ptr<TestData> test_data(new TestData);
    434       ASSERT_TRUE(test_data->pipe.handle0.is_valid());
    435       test_data->watcher.Start(test_data->pipe.handle0.get(),
    436                     MOJO_HANDLE_SIGNAL_READABLE,
    437                     MOJO_DEADLINE_INDEFINITE,
    438                     base::Bind(&NeverReached));
    439       data_vector.push_back(test_data.release());
    440     }
    441     if (i % 15 == 0)
    442       data_vector.clear();
    443   }
    444   task_runner->PostTask(FROM_HERE,
    445                         base::Bind(&StressThreadDone, run_loop,
    446                                    active_count));
    447 }
    448 
    449 // This test is meant to stress HandleWatcher. It uses from various threads
    450 // repeatedly starting and stopping watches. It spins up kThreadCount
    451 // threads. Each thread creates kWatchCount watches. Every so often each thread
    452 // writes to a pipe and waits for the response.
    453 TEST(HandleWatcherCleanEnvironmentTest, StressTest) {
    454 #if defined(NDEBUG)
    455   const int kThreadCount = 15;
    456   const int kWatchCount = 400;
    457 #else
    458   const int kThreadCount = 10;
    459   const int kWatchCount = 250;
    460 #endif
    461 
    462   base::ShadowingAtExitManager at_exit;
    463   base::MessageLoop message_loop;
    464   base::RunLoop run_loop;
    465   ScopedVector<base::Thread> threads;
    466   int threads_active_counter = kThreadCount;
    467   // Starts the threads first and then post the task in hopes of having more
    468   // threads running at once.
    469   for (int i = 0; i < kThreadCount; ++i) {
    470     std::unique_ptr<base::Thread> thread(new base::Thread("test thread"));
    471     if (i % 2) {
    472       base::Thread::Options thread_options;
    473       thread_options.message_pump_factory =
    474           base::Bind(&MessagePumpMojo::Create);
    475       thread->StartWithOptions(thread_options);
    476     } else {
    477       thread->Start();
    478     }
    479     threads.push_back(thread.release());
    480   }
    481   for (int i = 0; i < kThreadCount; ++i) {
    482     threads[i]->task_runner()->PostTask(
    483         FROM_HERE, base::Bind(&RunStressTest, kWatchCount,
    484                               message_loop.task_runner(),
    485                               &run_loop, &threads_active_counter));
    486   }
    487   run_loop.Run();
    488   ASSERT_EQ(0, threads_active_counter);
    489 }
    490 
    491 }  // namespace test
    492 }  // namespace common
    493 }  // namespace mojo
    494