1 // Copyright 2015 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include <stddef.h> 6 #include <stdint.h> 7 #include <algorithm> 8 #include <utility> 9 10 #include "base/bind.h" 11 #include "base/callback.h" 12 #include "base/message_loop/message_loop.h" 13 #include "base/run_loop.h" 14 #include "base/single_thread_task_runner.h" 15 #include "base/threading/thread.h" 16 #include "base/threading/thread_task_runner_handle.h" 17 #include "mojo/public/cpp/bindings/associated_binding.h" 18 #include "mojo/public/cpp/bindings/associated_group.h" 19 #include "mojo/public/cpp/bindings/associated_interface_ptr.h" 20 #include "mojo/public/cpp/bindings/associated_interface_ptr_info.h" 21 #include "mojo/public/cpp/bindings/associated_interface_request.h" 22 #include "mojo/public/cpp/bindings/binding.h" 23 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" 24 #include "mojo/public/interfaces/bindings/tests/test_associated_interfaces.mojom.h" 25 #include "testing/gtest/include/gtest/gtest.h" 26 27 namespace mojo { 28 namespace test { 29 namespace { 30 31 using mojo::internal::MultiplexRouter; 32 33 class IntegerSenderImpl : public IntegerSender { 34 public: 35 explicit IntegerSenderImpl(AssociatedInterfaceRequest<IntegerSender> request) 36 : binding_(this, std::move(request)) {} 37 38 ~IntegerSenderImpl() override {} 39 40 void set_notify_send_method_called( 41 const base::Callback<void(int32_t)>& callback) { 42 notify_send_method_called_ = callback; 43 } 44 45 void Echo(int32_t value, const EchoCallback& callback) override { 46 callback.Run(value); 47 } 48 void Send(int32_t value) override { notify_send_method_called_.Run(value); } 49 50 AssociatedBinding<IntegerSender>* binding() { return &binding_; } 51 52 void set_connection_error_handler(const base::Closure& handler) { 53 binding_.set_connection_error_handler(handler); 54 } 55 56 private: 57 AssociatedBinding<IntegerSender> binding_; 58 base::Callback<void(int32_t)> notify_send_method_called_; 59 }; 60 61 class IntegerSenderConnectionImpl : public IntegerSenderConnection { 62 public: 63 explicit IntegerSenderConnectionImpl( 64 InterfaceRequest<IntegerSenderConnection> request) 65 : binding_(this, std::move(request)) {} 66 67 ~IntegerSenderConnectionImpl() override {} 68 69 void GetSender(AssociatedInterfaceRequest<IntegerSender> sender) override { 70 IntegerSenderImpl* sender_impl = new IntegerSenderImpl(std::move(sender)); 71 sender_impl->set_connection_error_handler( 72 base::Bind(&DeleteSender, sender_impl)); 73 } 74 75 void AsyncGetSender(const AsyncGetSenderCallback& callback) override { 76 AssociatedInterfaceRequest<IntegerSender> request; 77 IntegerSenderAssociatedPtrInfo ptr_info; 78 binding_.associated_group()->CreateAssociatedInterface( 79 AssociatedGroup::WILL_PASS_PTR, &ptr_info, &request); 80 GetSender(std::move(request)); 81 callback.Run(std::move(ptr_info)); 82 } 83 84 Binding<IntegerSenderConnection>* binding() { return &binding_; } 85 86 private: 87 static void DeleteSender(IntegerSenderImpl* sender) { delete sender; } 88 89 Binding<IntegerSenderConnection> binding_; 90 }; 91 92 class AssociatedInterfaceTest : public testing::Test { 93 public: 94 AssociatedInterfaceTest() {} 95 ~AssociatedInterfaceTest() override { base::RunLoop().RunUntilIdle(); } 96 97 void PumpMessages() { base::RunLoop().RunUntilIdle(); } 98 99 template <typename T> 100 AssociatedInterfacePtrInfo<T> EmulatePassingAssociatedPtrInfo( 101 AssociatedInterfacePtrInfo<T> ptr_info, 102 scoped_refptr<MultiplexRouter> target) { 103 ScopedInterfaceEndpointHandle handle = ptr_info.PassHandle(); 104 CHECK(!handle.is_local()); 105 return AssociatedInterfacePtrInfo<T>( 106 target->CreateLocalEndpointHandle(handle.release()), 107 ptr_info.version()); 108 } 109 110 template <typename T> 111 AssociatedInterfaceRequest<T> EmulatePassingAssociatedRequest( 112 AssociatedInterfaceRequest<T> request, 113 scoped_refptr<MultiplexRouter> target) { 114 ScopedInterfaceEndpointHandle handle = request.PassHandle(); 115 CHECK(!handle.is_local()); 116 return MakeAssociatedRequest<T>( 117 target->CreateLocalEndpointHandle(handle.release())); 118 } 119 120 // Okay to call from any thread. 121 void QuitRunLoop(base::RunLoop* run_loop) { 122 if (loop_.task_runner()->BelongsToCurrentThread()) { 123 run_loop->Quit(); 124 } else { 125 loop_.task_runner()->PostTask( 126 FROM_HERE, 127 base::Bind(&AssociatedInterfaceTest::QuitRunLoop, 128 base::Unretained(this), base::Unretained(run_loop))); 129 } 130 } 131 132 private: 133 base::MessageLoop loop_; 134 }; 135 136 void DoSetFlagAndRunClosure(bool* flag, const base::Closure& closure) { 137 *flag = true; 138 closure.Run(); 139 } 140 141 void DoExpectValueSetFlagAndRunClosure(int32_t expected_value, 142 bool* flag, 143 const base::Closure& closure, 144 int32_t value) { 145 EXPECT_EQ(expected_value, value); 146 DoSetFlagAndRunClosure(flag, closure); 147 } 148 149 base::Closure SetFlagAndRunClosure(bool* flag, const base::Closure& closure) { 150 return base::Bind(&DoSetFlagAndRunClosure, flag, closure); 151 } 152 153 base::Callback<void(int32_t)> ExpectValueSetFlagAndRunClosure( 154 int32_t expected_value, 155 bool* flag, 156 const base::Closure& closure) { 157 return base::Bind( 158 &DoExpectValueSetFlagAndRunClosure, expected_value, flag, closure); 159 } 160 161 TEST_F(AssociatedInterfaceTest, InterfacesAtBothEnds) { 162 // Bind to the same pipe two associated interfaces, whose implementation lives 163 // at different ends. Test that the two don't interfere with each other. 164 165 MessagePipe pipe; 166 scoped_refptr<MultiplexRouter> router0(new MultiplexRouter( 167 true, std::move(pipe.handle0), base::ThreadTaskRunnerHandle::Get())); 168 scoped_refptr<MultiplexRouter> router1(new MultiplexRouter( 169 false, std::move(pipe.handle1), base::ThreadTaskRunnerHandle::Get())); 170 171 AssociatedInterfaceRequest<IntegerSender> request; 172 IntegerSenderAssociatedPtrInfo ptr_info; 173 174 router0->CreateAssociatedGroup()->CreateAssociatedInterface( 175 AssociatedGroup::WILL_PASS_PTR, &ptr_info, &request); 176 ptr_info = EmulatePassingAssociatedPtrInfo(std::move(ptr_info), router1); 177 178 IntegerSenderImpl impl0(std::move(request)); 179 AssociatedInterfacePtr<IntegerSender> ptr0; 180 ptr0.Bind(std::move(ptr_info)); 181 182 router0->CreateAssociatedGroup()->CreateAssociatedInterface( 183 AssociatedGroup::WILL_PASS_REQUEST, &ptr_info, &request); 184 request = EmulatePassingAssociatedRequest(std::move(request), router1); 185 186 IntegerSenderImpl impl1(std::move(request)); 187 AssociatedInterfacePtr<IntegerSender> ptr1; 188 ptr1.Bind(std::move(ptr_info)); 189 190 base::RunLoop run_loop, run_loop2; 191 bool ptr0_callback_run = false; 192 ptr0->Echo(123, ExpectValueSetFlagAndRunClosure(123, &ptr0_callback_run, 193 run_loop.QuitClosure())); 194 195 bool ptr1_callback_run = false; 196 ptr1->Echo(456, ExpectValueSetFlagAndRunClosure(456, &ptr1_callback_run, 197 run_loop2.QuitClosure())); 198 199 run_loop.Run(); 200 run_loop2.Run(); 201 EXPECT_TRUE(ptr0_callback_run); 202 EXPECT_TRUE(ptr1_callback_run); 203 204 bool ptr0_error_callback_run = false; 205 base::RunLoop run_loop3; 206 ptr0.set_connection_error_handler( 207 SetFlagAndRunClosure(&ptr0_error_callback_run, run_loop3.QuitClosure())); 208 209 impl0.binding()->Close(); 210 run_loop3.Run(); 211 EXPECT_TRUE(ptr0_error_callback_run); 212 213 bool impl1_error_callback_run = false; 214 base::RunLoop run_loop4; 215 impl1.binding()->set_connection_error_handler( 216 SetFlagAndRunClosure(&impl1_error_callback_run, run_loop4.QuitClosure())); 217 218 ptr1.reset(); 219 run_loop4.Run(); 220 EXPECT_TRUE(impl1_error_callback_run); 221 } 222 223 class TestSender { 224 public: 225 TestSender() 226 : sender_thread_("TestSender"), 227 next_sender_(nullptr), 228 max_value_to_send_(-1) { 229 sender_thread_.Start(); 230 } 231 232 // The following three methods are called on the corresponding sender thread. 233 void SetUp(IntegerSenderAssociatedPtrInfo ptr_info, 234 TestSender* next_sender, 235 int32_t max_value_to_send) { 236 CHECK(sender_thread_.task_runner()->BelongsToCurrentThread()); 237 238 ptr_.Bind(std::move(ptr_info)); 239 next_sender_ = next_sender ? next_sender : this; 240 max_value_to_send_ = max_value_to_send; 241 } 242 243 void Send(int32_t value) { 244 CHECK(sender_thread_.task_runner()->BelongsToCurrentThread()); 245 246 if (value > max_value_to_send_) 247 return; 248 249 ptr_->Send(value); 250 251 next_sender_->sender_thread()->task_runner()->PostTask( 252 FROM_HERE, 253 base::Bind(&TestSender::Send, base::Unretained(next_sender_), ++value)); 254 } 255 256 void TearDown() { 257 CHECK(sender_thread_.task_runner()->BelongsToCurrentThread()); 258 259 ptr_.reset(); 260 } 261 262 base::Thread* sender_thread() { return &sender_thread_; } 263 264 private: 265 base::Thread sender_thread_; 266 TestSender* next_sender_; 267 int32_t max_value_to_send_; 268 269 AssociatedInterfacePtr<IntegerSender> ptr_; 270 }; 271 272 class TestReceiver { 273 public: 274 TestReceiver() : receiver_thread_("TestReceiver"), expected_calls_(0) { 275 receiver_thread_.Start(); 276 } 277 278 void SetUp(AssociatedInterfaceRequest<IntegerSender> request0, 279 AssociatedInterfaceRequest<IntegerSender> request1, 280 size_t expected_calls, 281 const base::Closure& notify_finish) { 282 CHECK(receiver_thread_.task_runner()->BelongsToCurrentThread()); 283 284 impl0_.reset(new IntegerSenderImpl(std::move(request0))); 285 impl0_->set_notify_send_method_called( 286 base::Bind(&TestReceiver::SendMethodCalled, base::Unretained(this))); 287 impl1_.reset(new IntegerSenderImpl(std::move(request1))); 288 impl1_->set_notify_send_method_called( 289 base::Bind(&TestReceiver::SendMethodCalled, base::Unretained(this))); 290 291 expected_calls_ = expected_calls; 292 notify_finish_ = notify_finish; 293 } 294 295 void TearDown() { 296 CHECK(receiver_thread_.task_runner()->BelongsToCurrentThread()); 297 298 impl0_.reset(); 299 impl1_.reset(); 300 } 301 302 base::Thread* receiver_thread() { return &receiver_thread_; } 303 const std::vector<int32_t>& values() const { return values_; } 304 305 private: 306 void SendMethodCalled(int32_t value) { 307 values_.push_back(value); 308 309 if (values_.size() >= expected_calls_) 310 notify_finish_.Run(); 311 } 312 313 base::Thread receiver_thread_; 314 size_t expected_calls_; 315 316 std::unique_ptr<IntegerSenderImpl> impl0_; 317 std::unique_ptr<IntegerSenderImpl> impl1_; 318 319 std::vector<int32_t> values_; 320 321 base::Closure notify_finish_; 322 }; 323 324 class NotificationCounter { 325 public: 326 NotificationCounter(size_t total_count, const base::Closure& notify_finish) 327 : total_count_(total_count), 328 current_count_(0), 329 notify_finish_(notify_finish) {} 330 331 ~NotificationCounter() {} 332 333 // Okay to call from any thread. 334 void OnGotNotification() { 335 bool finshed = false; 336 { 337 base::AutoLock locker(lock_); 338 CHECK_LT(current_count_, total_count_); 339 current_count_++; 340 finshed = current_count_ == total_count_; 341 } 342 343 if (finshed) 344 notify_finish_.Run(); 345 } 346 347 private: 348 base::Lock lock_; 349 const size_t total_count_; 350 size_t current_count_; 351 base::Closure notify_finish_; 352 }; 353 354 TEST_F(AssociatedInterfaceTest, MultiThreadAccess) { 355 // Set up four associated interfaces on a message pipe. Use the inteface 356 // pointers on four threads in parallel; run the interface implementations on 357 // two threads. Test that multi-threaded access works. 358 359 const int32_t kMaxValue = 1000; 360 MessagePipe pipe; 361 scoped_refptr<MultiplexRouter> router0(new MultiplexRouter( 362 true, std::move(pipe.handle0), base::ThreadTaskRunnerHandle::Get())); 363 scoped_refptr<MultiplexRouter> router1(new MultiplexRouter( 364 false, std::move(pipe.handle1), base::ThreadTaskRunnerHandle::Get())); 365 366 AssociatedInterfaceRequest<IntegerSender> requests[4]; 367 IntegerSenderAssociatedPtrInfo ptr_infos[4]; 368 369 for (size_t i = 0; i < 4; ++i) { 370 router0->CreateAssociatedGroup()->CreateAssociatedInterface( 371 AssociatedGroup::WILL_PASS_PTR, &ptr_infos[i], &requests[i]); 372 ptr_infos[i] = 373 EmulatePassingAssociatedPtrInfo(std::move(ptr_infos[i]), router1); 374 } 375 376 TestSender senders[4]; 377 for (size_t i = 0; i < 4; ++i) { 378 senders[i].sender_thread()->task_runner()->PostTask( 379 FROM_HERE, base::Bind(&TestSender::SetUp, base::Unretained(&senders[i]), 380 base::Passed(&ptr_infos[i]), nullptr, 381 kMaxValue * (i + 1) / 4)); 382 } 383 384 base::RunLoop run_loop; 385 TestReceiver receivers[2]; 386 NotificationCounter counter( 387 2, base::Bind(&AssociatedInterfaceTest::QuitRunLoop, 388 base::Unretained(this), base::Unretained(&run_loop))); 389 for (size_t i = 0; i < 2; ++i) { 390 receivers[i].receiver_thread()->task_runner()->PostTask( 391 FROM_HERE, 392 base::Bind(&TestReceiver::SetUp, base::Unretained(&receivers[i]), 393 base::Passed(&requests[2 * i]), 394 base::Passed(&requests[2 * i + 1]), 395 static_cast<size_t>(kMaxValue / 2), 396 base::Bind(&NotificationCounter::OnGotNotification, 397 base::Unretained(&counter)))); 398 } 399 400 for (size_t i = 0; i < 4; ++i) { 401 senders[i].sender_thread()->task_runner()->PostTask( 402 FROM_HERE, base::Bind(&TestSender::Send, base::Unretained(&senders[i]), 403 kMaxValue * i / 4 + 1)); 404 } 405 406 run_loop.Run(); 407 408 for (size_t i = 0; i < 4; ++i) { 409 base::RunLoop run_loop; 410 senders[i].sender_thread()->task_runner()->PostTaskAndReply( 411 FROM_HERE, 412 base::Bind(&TestSender::TearDown, base::Unretained(&senders[i])), 413 base::Bind(&AssociatedInterfaceTest::QuitRunLoop, 414 base::Unretained(this), base::Unretained(&run_loop))); 415 run_loop.Run(); 416 } 417 418 for (size_t i = 0; i < 2; ++i) { 419 base::RunLoop run_loop; 420 receivers[i].receiver_thread()->task_runner()->PostTaskAndReply( 421 FROM_HERE, 422 base::Bind(&TestReceiver::TearDown, base::Unretained(&receivers[i])), 423 base::Bind(&AssociatedInterfaceTest::QuitRunLoop, 424 base::Unretained(this), base::Unretained(&run_loop))); 425 run_loop.Run(); 426 } 427 428 EXPECT_EQ(static_cast<size_t>(kMaxValue / 2), receivers[0].values().size()); 429 EXPECT_EQ(static_cast<size_t>(kMaxValue / 2), receivers[1].values().size()); 430 431 std::vector<int32_t> all_values; 432 all_values.insert(all_values.end(), receivers[0].values().begin(), 433 receivers[0].values().end()); 434 all_values.insert(all_values.end(), receivers[1].values().begin(), 435 receivers[1].values().end()); 436 437 std::sort(all_values.begin(), all_values.end()); 438 for (size_t i = 0; i < all_values.size(); ++i) 439 ASSERT_EQ(static_cast<int32_t>(i + 1), all_values[i]); 440 } 441 442 TEST_F(AssociatedInterfaceTest, FIFO) { 443 // Set up four associated interfaces on a message pipe. Use the inteface 444 // pointers on four threads; run the interface implementations on two threads. 445 // Take turns to make calls using the four pointers. Test that FIFO-ness is 446 // preserved. 447 448 const int32_t kMaxValue = 100; 449 MessagePipe pipe; 450 scoped_refptr<MultiplexRouter> router0(new MultiplexRouter( 451 true, std::move(pipe.handle0), base::ThreadTaskRunnerHandle::Get())); 452 scoped_refptr<MultiplexRouter> router1(new MultiplexRouter( 453 false, std::move(pipe.handle1), base::ThreadTaskRunnerHandle::Get())); 454 455 AssociatedInterfaceRequest<IntegerSender> requests[4]; 456 IntegerSenderAssociatedPtrInfo ptr_infos[4]; 457 458 for (size_t i = 0; i < 4; ++i) { 459 router0->CreateAssociatedGroup()->CreateAssociatedInterface( 460 AssociatedGroup::WILL_PASS_PTR, &ptr_infos[i], &requests[i]); 461 ptr_infos[i] = 462 EmulatePassingAssociatedPtrInfo(std::move(ptr_infos[i]), router1); 463 } 464 465 TestSender senders[4]; 466 for (size_t i = 0; i < 4; ++i) { 467 senders[i].sender_thread()->task_runner()->PostTask( 468 FROM_HERE, 469 base::Bind(&TestSender::SetUp, base::Unretained(&senders[i]), 470 base::Passed(&ptr_infos[i]), 471 base::Unretained(&senders[(i + 1) % 4]), kMaxValue)); 472 } 473 474 base::RunLoop run_loop; 475 TestReceiver receivers[2]; 476 NotificationCounter counter( 477 2, base::Bind(&AssociatedInterfaceTest::QuitRunLoop, 478 base::Unretained(this), base::Unretained(&run_loop))); 479 for (size_t i = 0; i < 2; ++i) { 480 receivers[i].receiver_thread()->task_runner()->PostTask( 481 FROM_HERE, 482 base::Bind(&TestReceiver::SetUp, base::Unretained(&receivers[i]), 483 base::Passed(&requests[2 * i]), 484 base::Passed(&requests[2 * i + 1]), 485 static_cast<size_t>(kMaxValue / 2), 486 base::Bind(&NotificationCounter::OnGotNotification, 487 base::Unretained(&counter)))); 488 } 489 490 senders[0].sender_thread()->task_runner()->PostTask( 491 FROM_HERE, 492 base::Bind(&TestSender::Send, base::Unretained(&senders[0]), 1)); 493 494 run_loop.Run(); 495 496 for (size_t i = 0; i < 4; ++i) { 497 base::RunLoop run_loop; 498 senders[i].sender_thread()->task_runner()->PostTaskAndReply( 499 FROM_HERE, 500 base::Bind(&TestSender::TearDown, base::Unretained(&senders[i])), 501 base::Bind(&AssociatedInterfaceTest::QuitRunLoop, 502 base::Unretained(this), base::Unretained(&run_loop))); 503 run_loop.Run(); 504 } 505 506 for (size_t i = 0; i < 2; ++i) { 507 base::RunLoop run_loop; 508 receivers[i].receiver_thread()->task_runner()->PostTaskAndReply( 509 FROM_HERE, 510 base::Bind(&TestReceiver::TearDown, base::Unretained(&receivers[i])), 511 base::Bind(&AssociatedInterfaceTest::QuitRunLoop, 512 base::Unretained(this), base::Unretained(&run_loop))); 513 run_loop.Run(); 514 } 515 516 EXPECT_EQ(static_cast<size_t>(kMaxValue / 2), receivers[0].values().size()); 517 EXPECT_EQ(static_cast<size_t>(kMaxValue / 2), receivers[1].values().size()); 518 519 for (size_t i = 0; i < 2; ++i) { 520 for (size_t j = 1; j < receivers[i].values().size(); ++j) 521 EXPECT_LT(receivers[i].values()[j - 1], receivers[i].values()[j]); 522 } 523 } 524 525 void CaptureInt32(int32_t* storage, 526 const base::Closure& closure, 527 int32_t value) { 528 *storage = value; 529 closure.Run(); 530 } 531 532 void CaptureSenderPtrInfo(IntegerSenderAssociatedPtr* storage, 533 const base::Closure& closure, 534 IntegerSenderAssociatedPtrInfo info) { 535 storage->Bind(std::move(info)); 536 closure.Run(); 537 } 538 539 TEST_F(AssociatedInterfaceTest, PassAssociatedInterfaces) { 540 IntegerSenderConnectionPtr connection_ptr; 541 IntegerSenderConnectionImpl connection(GetProxy(&connection_ptr)); 542 543 IntegerSenderAssociatedPtr sender0; 544 connection_ptr->GetSender( 545 GetProxy(&sender0, connection_ptr.associated_group())); 546 547 int32_t echoed_value = 0; 548 base::RunLoop run_loop; 549 sender0->Echo(123, base::Bind(&CaptureInt32, &echoed_value, 550 run_loop.QuitClosure())); 551 run_loop.Run(); 552 EXPECT_EQ(123, echoed_value); 553 554 IntegerSenderAssociatedPtr sender1; 555 base::RunLoop run_loop2; 556 connection_ptr->AsyncGetSender( 557 base::Bind(&CaptureSenderPtrInfo, &sender1, run_loop2.QuitClosure())); 558 run_loop2.Run(); 559 EXPECT_TRUE(sender1); 560 561 base::RunLoop run_loop3; 562 sender1->Echo(456, base::Bind(&CaptureInt32, &echoed_value, 563 run_loop3.QuitClosure())); 564 run_loop3.Run(); 565 EXPECT_EQ(456, echoed_value); 566 } 567 568 TEST_F(AssociatedInterfaceTest, BindingWaitAndPauseWhenNoAssociatedInterfaces) { 569 IntegerSenderConnectionPtr connection_ptr; 570 IntegerSenderConnectionImpl connection(GetProxy(&connection_ptr)); 571 572 IntegerSenderAssociatedPtr sender0; 573 connection_ptr->GetSender( 574 GetProxy(&sender0, connection_ptr.associated_group())); 575 576 EXPECT_FALSE(connection.binding()->HasAssociatedInterfaces()); 577 // There are no associated interfaces running on the pipe yet. It is okay to 578 // pause. 579 connection.binding()->PauseIncomingMethodCallProcessing(); 580 connection.binding()->ResumeIncomingMethodCallProcessing(); 581 582 // There are no associated interfaces running on the pipe yet. It is okay to 583 // wait. 584 EXPECT_TRUE(connection.binding()->WaitForIncomingMethodCall()); 585 586 // The previous wait has dispatched the GetSender request message, therefore 587 // an associated interface has been set up on the pipe. It is not allowed to 588 // wait or pause. 589 EXPECT_TRUE(connection.binding()->HasAssociatedInterfaces()); 590 } 591 592 } // namespace 593 } // namespace test 594 } // namespace mojo 595