1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/message_pump/handle_watcher.h" 6 7 #include <memory> 8 #include <string> 9 10 #include "base/at_exit.h" 11 #include "base/auto_reset.h" 12 #include "base/bind.h" 13 #include "base/macros.h" 14 #include "base/memory/scoped_vector.h" 15 #include "base/run_loop.h" 16 #include "base/test/simple_test_tick_clock.h" 17 #include "base/threading/thread.h" 18 #include "mojo/message_pump/message_pump_mojo.h" 19 #include "mojo/message_pump/time_helper.h" 20 #include "mojo/public/cpp/system/core.h" 21 #include "mojo/public/cpp/test_support/test_utils.h" 22 #include "testing/gtest/include/gtest/gtest.h" 23 24 namespace mojo { 25 namespace common { 26 namespace test { 27 28 enum MessageLoopConfig { 29 MESSAGE_LOOP_CONFIG_DEFAULT = 0, 30 MESSAGE_LOOP_CONFIG_MOJO = 1 31 }; 32 33 void ObserveCallback(bool* was_signaled, 34 MojoResult* result_observed, 35 MojoResult result) { 36 *was_signaled = true; 37 *result_observed = result; 38 } 39 40 void RunUntilIdle() { 41 base::RunLoop run_loop; 42 run_loop.RunUntilIdle(); 43 } 44 45 void DeleteWatcherAndForwardResult( 46 HandleWatcher* watcher, 47 base::Callback<void(MojoResult)> next_callback, 48 MojoResult result) { 49 delete watcher; 50 next_callback.Run(result); 51 } 52 53 std::unique_ptr<base::MessageLoop> CreateMessageLoop(MessageLoopConfig config) { 54 std::unique_ptr<base::MessageLoop> loop; 55 if (config == MESSAGE_LOOP_CONFIG_DEFAULT) 56 loop.reset(new base::MessageLoop()); 57 else 58 loop.reset(new base::MessageLoop(MessagePumpMojo::Create())); 59 return loop; 60 } 61 62 // Helper class to manage the callback and running the message loop waiting for 63 // message to be received. Typical usage is something like: 64 // Schedule callback returned from GetCallback(). 65 // RunUntilGotCallback(); 66 // EXPECT_TRUE(got_callback()); 67 // clear_callback(); 68 class CallbackHelper { 69 public: 70 CallbackHelper() 71 : got_callback_(false), 72 run_loop_(NULL), 73 weak_factory_(this) {} 74 ~CallbackHelper() {} 75 76 // See description above |got_callback_|. 77 bool got_callback() const { return got_callback_; } 78 void clear_callback() { got_callback_ = false; } 79 80 // Runs the current MessageLoop until the callback returned from GetCallback() 81 // is notified. 82 void RunUntilGotCallback() { 83 ASSERT_TRUE(run_loop_ == NULL); 84 base::RunLoop run_loop; 85 base::AutoReset<base::RunLoop*> reseter(&run_loop_, &run_loop); 86 run_loop.Run(); 87 } 88 89 base::Callback<void(MojoResult)> GetCallback() { 90 return base::Bind(&CallbackHelper::OnCallback, weak_factory_.GetWeakPtr()); 91 } 92 93 void Start(HandleWatcher* watcher, const MessagePipeHandle& handle) { 94 StartWithCallback(watcher, handle, GetCallback()); 95 } 96 97 void StartWithCallback(HandleWatcher* watcher, 98 const MessagePipeHandle& handle, 99 const base::Callback<void(MojoResult)>& callback) { 100 watcher->Start(handle, MOJO_HANDLE_SIGNAL_READABLE, 101 MOJO_DEADLINE_INDEFINITE, callback); 102 } 103 104 private: 105 void OnCallback(MojoResult result) { 106 got_callback_ = true; 107 if (run_loop_) 108 run_loop_->Quit(); 109 } 110 111 // Set to true when the callback is called. 112 bool got_callback_; 113 114 // If non-NULL we're in RunUntilGotCallback(). 115 base::RunLoop* run_loop_; 116 117 base::WeakPtrFactory<CallbackHelper> weak_factory_; 118 119 private: 120 DISALLOW_COPY_AND_ASSIGN(CallbackHelper); 121 }; 122 123 class HandleWatcherTest : public testing::TestWithParam<MessageLoopConfig> { 124 public: 125 HandleWatcherTest() 126 : at_exit_(new base::ShadowingAtExitManager), 127 message_loop_(CreateMessageLoop(GetParam())) {} 128 virtual ~HandleWatcherTest() { 129 // By explicitly destroying |at_exit_| before resetting the tick clock, it 130 // ensures that the handle watcher thread (if there is one) is shut down, 131 // preventing a race with users of the tick clock in MessagePumpMojo. 132 at_exit_.reset(); 133 test::SetTickClockForTest(NULL); 134 } 135 136 protected: 137 void TearDownMessageLoop() { 138 message_loop_.reset(); 139 } 140 141 // This should be called at the beginning of any test that needs it, so that 142 // it is installed before the handle watcher thread starts. 143 void InstallTickClock() { 144 test::SetTickClockForTest(&tick_clock_); 145 } 146 147 base::SimpleTestTickClock tick_clock_; 148 149 private: 150 std::unique_ptr<base::ShadowingAtExitManager> at_exit_; 151 std::unique_ptr<base::MessageLoop> message_loop_; 152 153 DISALLOW_COPY_AND_ASSIGN(HandleWatcherTest); 154 }; 155 156 INSTANTIATE_TEST_CASE_P( 157 MultipleMessageLoopConfigs, HandleWatcherTest, 158 testing::Values(MESSAGE_LOOP_CONFIG_DEFAULT, MESSAGE_LOOP_CONFIG_MOJO)); 159 160 // Trivial test case with a single handle to watch. 161 TEST_P(HandleWatcherTest, SingleHandler) { 162 MessagePipe test_pipe; 163 ASSERT_TRUE(test_pipe.handle0.is_valid()); 164 CallbackHelper callback_helper; 165 HandleWatcher watcher; 166 callback_helper.Start(&watcher, test_pipe.handle0.get()); 167 RunUntilIdle(); 168 EXPECT_FALSE(callback_helper.got_callback()); 169 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle1.get(), 170 std::string())); 171 callback_helper.RunUntilGotCallback(); 172 EXPECT_TRUE(callback_helper.got_callback()); 173 } 174 175 // Creates three handles and notfies them in reverse order ensuring each one is 176 // notified appropriately. 177 TEST_P(HandleWatcherTest, ThreeHandles) { 178 MessagePipe test_pipe1; 179 MessagePipe test_pipe2; 180 MessagePipe test_pipe3; 181 CallbackHelper callback_helper1; 182 CallbackHelper callback_helper2; 183 CallbackHelper callback_helper3; 184 ASSERT_TRUE(test_pipe1.handle0.is_valid()); 185 ASSERT_TRUE(test_pipe2.handle0.is_valid()); 186 ASSERT_TRUE(test_pipe3.handle0.is_valid()); 187 188 HandleWatcher watcher1; 189 callback_helper1.Start(&watcher1, test_pipe1.handle0.get()); 190 RunUntilIdle(); 191 EXPECT_FALSE(callback_helper1.got_callback()); 192 EXPECT_FALSE(callback_helper2.got_callback()); 193 EXPECT_FALSE(callback_helper3.got_callback()); 194 195 HandleWatcher watcher2; 196 callback_helper2.Start(&watcher2, test_pipe2.handle0.get()); 197 RunUntilIdle(); 198 EXPECT_FALSE(callback_helper1.got_callback()); 199 EXPECT_FALSE(callback_helper2.got_callback()); 200 EXPECT_FALSE(callback_helper3.got_callback()); 201 202 HandleWatcher watcher3; 203 callback_helper3.Start(&watcher3, test_pipe3.handle0.get()); 204 RunUntilIdle(); 205 EXPECT_FALSE(callback_helper1.got_callback()); 206 EXPECT_FALSE(callback_helper2.got_callback()); 207 EXPECT_FALSE(callback_helper3.got_callback()); 208 209 // Write to 3 and make sure it's notified. 210 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe3.handle1.get(), 211 std::string())); 212 callback_helper3.RunUntilGotCallback(); 213 EXPECT_FALSE(callback_helper1.got_callback()); 214 EXPECT_FALSE(callback_helper2.got_callback()); 215 EXPECT_TRUE(callback_helper3.got_callback()); 216 callback_helper3.clear_callback(); 217 218 // Write to 1 and 3. Only 1 should be notified since 3 was is no longer 219 // running. 220 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(), 221 std::string())); 222 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe3.handle1.get(), 223 std::string())); 224 callback_helper1.RunUntilGotCallback(); 225 EXPECT_TRUE(callback_helper1.got_callback()); 226 EXPECT_FALSE(callback_helper2.got_callback()); 227 EXPECT_FALSE(callback_helper3.got_callback()); 228 callback_helper1.clear_callback(); 229 230 // Write to 1 and 2. Only 2 should be notified (since 1 was already notified). 231 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(), 232 std::string())); 233 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe2.handle1.get(), 234 std::string())); 235 callback_helper2.RunUntilGotCallback(); 236 EXPECT_FALSE(callback_helper1.got_callback()); 237 EXPECT_TRUE(callback_helper2.got_callback()); 238 EXPECT_FALSE(callback_helper3.got_callback()); 239 } 240 241 // Verifies Start() invoked a second time works. 242 TEST_P(HandleWatcherTest, Restart) { 243 MessagePipe test_pipe1; 244 MessagePipe test_pipe2; 245 CallbackHelper callback_helper1; 246 CallbackHelper callback_helper2; 247 ASSERT_TRUE(test_pipe1.handle0.is_valid()); 248 ASSERT_TRUE(test_pipe2.handle0.is_valid()); 249 250 HandleWatcher watcher1; 251 callback_helper1.Start(&watcher1, test_pipe1.handle0.get()); 252 RunUntilIdle(); 253 EXPECT_FALSE(callback_helper1.got_callback()); 254 EXPECT_FALSE(callback_helper2.got_callback()); 255 256 HandleWatcher watcher2; 257 callback_helper2.Start(&watcher2, test_pipe2.handle0.get()); 258 RunUntilIdle(); 259 EXPECT_FALSE(callback_helper1.got_callback()); 260 EXPECT_FALSE(callback_helper2.got_callback()); 261 262 // Write to 1 and make sure it's notified. 263 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(), 264 std::string())); 265 callback_helper1.RunUntilGotCallback(); 266 EXPECT_TRUE(callback_helper1.got_callback()); 267 EXPECT_FALSE(callback_helper2.got_callback()); 268 callback_helper1.clear_callback(); 269 EXPECT_TRUE(mojo::test::DiscardMessage(test_pipe1.handle0.get())); 270 271 // Write to 2 and make sure it's notified. 272 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe2.handle1.get(), 273 std::string())); 274 callback_helper2.RunUntilGotCallback(); 275 EXPECT_FALSE(callback_helper1.got_callback()); 276 EXPECT_TRUE(callback_helper2.got_callback()); 277 callback_helper2.clear_callback(); 278 279 // Listen on 1 again. 280 callback_helper1.Start(&watcher1, test_pipe1.handle0.get()); 281 RunUntilIdle(); 282 EXPECT_FALSE(callback_helper1.got_callback()); 283 EXPECT_FALSE(callback_helper2.got_callback()); 284 285 // Write to 1 and make sure it's notified. 286 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(), 287 std::string())); 288 callback_helper1.RunUntilGotCallback(); 289 EXPECT_TRUE(callback_helper1.got_callback()); 290 EXPECT_FALSE(callback_helper2.got_callback()); 291 } 292 293 // Verifies Start() invoked a second time on the same handle works. 294 TEST_P(HandleWatcherTest, RestartOnSameHandle) { 295 MessagePipe test_pipe; 296 CallbackHelper callback_helper; 297 ASSERT_TRUE(test_pipe.handle0.is_valid()); 298 299 HandleWatcher watcher; 300 callback_helper.Start(&watcher, test_pipe.handle0.get()); 301 RunUntilIdle(); 302 EXPECT_FALSE(callback_helper.got_callback()); 303 304 callback_helper.Start(&watcher, test_pipe.handle0.get()); 305 RunUntilIdle(); 306 EXPECT_FALSE(callback_helper.got_callback()); 307 } 308 309 // Verifies deadline is honored. 310 TEST_P(HandleWatcherTest, Deadline) { 311 InstallTickClock(); 312 313 MessagePipe test_pipe1; 314 MessagePipe test_pipe2; 315 MessagePipe test_pipe3; 316 CallbackHelper callback_helper1; 317 CallbackHelper callback_helper2; 318 CallbackHelper callback_helper3; 319 ASSERT_TRUE(test_pipe1.handle0.is_valid()); 320 ASSERT_TRUE(test_pipe2.handle0.is_valid()); 321 ASSERT_TRUE(test_pipe3.handle0.is_valid()); 322 323 // Add a watcher with an infinite timeout. 324 HandleWatcher watcher1; 325 callback_helper1.Start(&watcher1, test_pipe1.handle0.get()); 326 RunUntilIdle(); 327 EXPECT_FALSE(callback_helper1.got_callback()); 328 EXPECT_FALSE(callback_helper2.got_callback()); 329 EXPECT_FALSE(callback_helper3.got_callback()); 330 331 // Add another watcher wth a timeout of 500 microseconds. 332 HandleWatcher watcher2; 333 watcher2.Start(test_pipe2.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE, 500, 334 callback_helper2.GetCallback()); 335 RunUntilIdle(); 336 EXPECT_FALSE(callback_helper1.got_callback()); 337 EXPECT_FALSE(callback_helper2.got_callback()); 338 EXPECT_FALSE(callback_helper3.got_callback()); 339 340 // Advance the clock passed the deadline. We also have to start another 341 // watcher to wake up the background thread. 342 tick_clock_.Advance(base::TimeDelta::FromMicroseconds(501)); 343 344 HandleWatcher watcher3; 345 callback_helper3.Start(&watcher3, test_pipe3.handle0.get()); 346 347 callback_helper2.RunUntilGotCallback(); 348 EXPECT_FALSE(callback_helper1.got_callback()); 349 EXPECT_TRUE(callback_helper2.got_callback()); 350 EXPECT_FALSE(callback_helper3.got_callback()); 351 } 352 353 TEST_P(HandleWatcherTest, DeleteInCallback) { 354 MessagePipe test_pipe; 355 CallbackHelper callback_helper; 356 357 HandleWatcher* watcher = new HandleWatcher(); 358 callback_helper.StartWithCallback(watcher, test_pipe.handle1.get(), 359 base::Bind(&DeleteWatcherAndForwardResult, 360 watcher, 361 callback_helper.GetCallback())); 362 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle0.get(), 363 std::string())); 364 callback_helper.RunUntilGotCallback(); 365 EXPECT_TRUE(callback_helper.got_callback()); 366 } 367 368 TEST_P(HandleWatcherTest, AbortedOnMessageLoopDestruction) { 369 bool was_signaled = false; 370 MojoResult result = MOJO_RESULT_OK; 371 372 MessagePipe pipe; 373 HandleWatcher watcher; 374 watcher.Start(pipe.handle0.get(), 375 MOJO_HANDLE_SIGNAL_READABLE, 376 MOJO_DEADLINE_INDEFINITE, 377 base::Bind(&ObserveCallback, &was_signaled, &result)); 378 379 // Now, let the MessageLoop get torn down. We expect our callback to run. 380 TearDownMessageLoop(); 381 382 EXPECT_TRUE(was_signaled); 383 EXPECT_EQ(MOJO_RESULT_ABORTED, result); 384 } 385 386 void NeverReached(MojoResult result) { 387 FAIL() << "Callback should never be invoked " << result; 388 } 389 390 // Called on the main thread when a thread is done. Decrements |active_count| 391 // and if |active_count| is zero quits |run_loop|. 392 void StressThreadDone(base::RunLoop* run_loop, int* active_count) { 393 (*active_count)--; 394 EXPECT_GE(*active_count, 0); 395 if (*active_count == 0) 396 run_loop->Quit(); 397 } 398 399 // See description of StressTest. This is called on the background thread. 400 // |count| is the number of HandleWatchers to create. |active_count| is the 401 // number of outstanding threads, |task_runner| the task runner for the main 402 // thread and |run_loop| the run loop that should be quit when there are no more 403 // threads running. When done StressThreadDone() is invoked on the main thread. 404 // |active_count| and |run_loop| should only be used on the main thread. 405 void RunStressTest(int count, 406 scoped_refptr<base::TaskRunner> task_runner, 407 base::RunLoop* run_loop, 408 int* active_count) { 409 struct TestData { 410 MessagePipe pipe; 411 HandleWatcher watcher; 412 }; 413 ScopedVector<TestData> data_vector; 414 for (int i = 0; i < count; ++i) { 415 if (i % 20 == 0) { 416 // Every so often we wait. This results in some level of thread balancing 417 // as well as making sure HandleWatcher has time to actually start some 418 // watches. 419 MessagePipe test_pipe; 420 ASSERT_TRUE(test_pipe.handle0.is_valid()); 421 CallbackHelper callback_helper; 422 HandleWatcher watcher; 423 callback_helper.Start(&watcher, test_pipe.handle0.get()); 424 RunUntilIdle(); 425 EXPECT_FALSE(callback_helper.got_callback()); 426 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle1.get(), 427 std::string())); 428 base::MessageLoop::ScopedNestableTaskAllower scoper( 429 base::MessageLoop::current()); 430 callback_helper.RunUntilGotCallback(); 431 EXPECT_TRUE(callback_helper.got_callback()); 432 } else { 433 std::unique_ptr<TestData> test_data(new TestData); 434 ASSERT_TRUE(test_data->pipe.handle0.is_valid()); 435 test_data->watcher.Start(test_data->pipe.handle0.get(), 436 MOJO_HANDLE_SIGNAL_READABLE, 437 MOJO_DEADLINE_INDEFINITE, 438 base::Bind(&NeverReached)); 439 data_vector.push_back(test_data.release()); 440 } 441 if (i % 15 == 0) 442 data_vector.clear(); 443 } 444 task_runner->PostTask(FROM_HERE, 445 base::Bind(&StressThreadDone, run_loop, 446 active_count)); 447 } 448 449 // This test is meant to stress HandleWatcher. It uses from various threads 450 // repeatedly starting and stopping watches. It spins up kThreadCount 451 // threads. Each thread creates kWatchCount watches. Every so often each thread 452 // writes to a pipe and waits for the response. 453 TEST(HandleWatcherCleanEnvironmentTest, StressTest) { 454 #if defined(NDEBUG) 455 const int kThreadCount = 15; 456 const int kWatchCount = 400; 457 #else 458 const int kThreadCount = 10; 459 const int kWatchCount = 250; 460 #endif 461 462 base::ShadowingAtExitManager at_exit; 463 base::MessageLoop message_loop; 464 base::RunLoop run_loop; 465 ScopedVector<base::Thread> threads; 466 int threads_active_counter = kThreadCount; 467 // Starts the threads first and then post the task in hopes of having more 468 // threads running at once. 469 for (int i = 0; i < kThreadCount; ++i) { 470 std::unique_ptr<base::Thread> thread(new base::Thread("test thread")); 471 if (i % 2) { 472 base::Thread::Options thread_options; 473 thread_options.message_pump_factory = 474 base::Bind(&MessagePumpMojo::Create); 475 thread->StartWithOptions(thread_options); 476 } else { 477 thread->Start(); 478 } 479 threads.push_back(thread.release()); 480 } 481 for (int i = 0; i < kThreadCount; ++i) { 482 threads[i]->task_runner()->PostTask( 483 FROM_HERE, base::Bind(&RunStressTest, kWatchCount, 484 message_loop.task_runner(), 485 &run_loop, &threads_active_counter)); 486 } 487 run_loop.Run(); 488 ASSERT_EQ(0, threads_active_counter); 489 } 490 491 } // namespace test 492 } // namespace common 493 } // namespace mojo 494