1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include <stdint.h> 6 #include <stdio.h> 7 #include <string.h> 8 9 #include <vector> 10 11 #include "base/bind.h" 12 #include "base/files/file_path.h" 13 #include "base/files/file_util.h" 14 #include "base/files/scoped_file.h" 15 #include "base/files/scoped_temp_dir.h" 16 #include "base/location.h" 17 #include "base/logging.h" 18 #include "base/macros.h" 19 #include "base/message_loop/message_loop.h" 20 #include "base/test/test_io_thread.h" 21 #include "base/threading/platform_thread.h" // For |Sleep()|. 22 #include "build/build_config.h" // TODO(vtl): Remove this. 23 #include "mojo/common/test/test_utils.h" 24 #include "mojo/embedder/platform_channel_pair.h" 25 #include "mojo/embedder/platform_shared_buffer.h" 26 #include "mojo/embedder/scoped_platform_handle.h" 27 #include "mojo/embedder/simple_platform_support.h" 28 #include "mojo/system/channel.h" 29 #include "mojo/system/channel_endpoint.h" 30 #include "mojo/system/message_pipe.h" 31 #include "mojo/system/message_pipe_dispatcher.h" 32 #include "mojo/system/platform_handle_dispatcher.h" 33 #include "mojo/system/raw_channel.h" 34 #include "mojo/system/shared_buffer_dispatcher.h" 35 #include "mojo/system/test_utils.h" 36 #include "mojo/system/waiter.h" 37 #include "testing/gtest/include/gtest/gtest.h" 38 39 namespace mojo { 40 namespace system { 41 namespace { 42 43 class RemoteMessagePipeTest : public testing::Test { 44 public: 45 RemoteMessagePipeTest() : io_thread_(base::TestIOThread::kAutoStart) {} 46 virtual ~RemoteMessagePipeTest() {} 47 48 virtual void SetUp() OVERRIDE { 49 io_thread_.PostTaskAndWait( 50 FROM_HERE, 51 base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread, 52 base::Unretained(this))); 53 } 54 55 virtual void TearDown() OVERRIDE { 56 io_thread_.PostTaskAndWait( 57 FROM_HERE, 58 base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread, 59 base::Unretained(this))); 60 } 61 62 protected: 63 // This connects the two given |ChannelEndpoint|s. 64 void ConnectChannelEndpoints(scoped_refptr<ChannelEndpoint> ep0, 65 scoped_refptr<ChannelEndpoint> ep1) { 66 io_thread_.PostTaskAndWait( 67 FROM_HERE, 68 base::Bind(&RemoteMessagePipeTest::ConnectChannelEndpointsOnIOThread, 69 base::Unretained(this), 70 ep0, 71 ep1)); 72 } 73 74 // This bootstraps |ep| on |channels_[channel_index]|. It assumes/requires 75 // that this is the bootstrap case, i.e., that the endpoint IDs are both/will 76 // both be |Channel::kBootstrapEndpointId|. This returns *without* waiting for 77 // it to finish connecting. 78 void BootstrapChannelEndpointNoWait(unsigned channel_index, 79 scoped_refptr<ChannelEndpoint> ep) { 80 io_thread_.PostTask( 81 FROM_HERE, 82 base::Bind(&RemoteMessagePipeTest::BootstrapChannelEndpointOnIOThread, 83 base::Unretained(this), 84 channel_index, 85 ep)); 86 } 87 88 void RestoreInitialState() { 89 io_thread_.PostTaskAndWait( 90 FROM_HERE, 91 base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread, 92 base::Unretained(this))); 93 } 94 95 embedder::PlatformSupport* platform_support() { return &platform_support_; } 96 base::TestIOThread* io_thread() { return &io_thread_; } 97 98 private: 99 void SetUpOnIOThread() { 100 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop()); 101 102 embedder::PlatformChannelPair channel_pair; 103 platform_handles_[0] = channel_pair.PassServerHandle(); 104 platform_handles_[1] = channel_pair.PassClientHandle(); 105 } 106 107 void TearDownOnIOThread() { 108 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop()); 109 110 if (channels_[0].get()) { 111 channels_[0]->Shutdown(); 112 channels_[0] = nullptr; 113 } 114 if (channels_[1].get()) { 115 channels_[1]->Shutdown(); 116 channels_[1] = nullptr; 117 } 118 } 119 120 void CreateAndInitChannel(unsigned channel_index) { 121 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop()); 122 CHECK(channel_index == 0 || channel_index == 1); 123 CHECK(!channels_[channel_index].get()); 124 125 channels_[channel_index] = new Channel(&platform_support_); 126 CHECK(channels_[channel_index]->Init( 127 RawChannel::Create(platform_handles_[channel_index].Pass()))); 128 } 129 130 void ConnectChannelEndpointsOnIOThread(scoped_refptr<ChannelEndpoint> ep0, 131 scoped_refptr<ChannelEndpoint> ep1) { 132 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop()); 133 134 if (!channels_[0].get()) 135 CreateAndInitChannel(0); 136 if (!channels_[1].get()) 137 CreateAndInitChannel(1); 138 139 MessageInTransit::EndpointId local_id0 = channels_[0]->AttachEndpoint(ep0); 140 MessageInTransit::EndpointId local_id1 = channels_[1]->AttachEndpoint(ep1); 141 142 CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1)); 143 CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0)); 144 } 145 146 void BootstrapChannelEndpointOnIOThread(unsigned channel_index, 147 scoped_refptr<ChannelEndpoint> ep) { 148 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop()); 149 CHECK(channel_index == 0 || channel_index == 1); 150 151 CreateAndInitChannel(channel_index); 152 MessageInTransit::EndpointId endpoint_id = 153 channels_[channel_index]->AttachEndpoint(ep); 154 if (endpoint_id == MessageInTransit::kInvalidEndpointId) 155 return; 156 157 CHECK_EQ(endpoint_id, Channel::kBootstrapEndpointId); 158 CHECK(channels_[channel_index]->RunMessagePipeEndpoint( 159 Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId)); 160 } 161 162 void RestoreInitialStateOnIOThread() { 163 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop()); 164 165 TearDownOnIOThread(); 166 SetUpOnIOThread(); 167 } 168 169 embedder::SimplePlatformSupport platform_support_; 170 base::TestIOThread io_thread_; 171 embedder::ScopedPlatformHandle platform_handles_[2]; 172 scoped_refptr<Channel> channels_[2]; 173 174 DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest); 175 }; 176 177 TEST_F(RemoteMessagePipeTest, Basic) { 178 static const char kHello[] = "hello"; 179 static const char kWorld[] = "world!!!1!!!1!"; 180 char buffer[100] = {0}; 181 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer)); 182 Waiter waiter; 183 HandleSignalsState hss; 184 uint32_t context = 0; 185 186 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and 187 // connected to MP 1, port 0, which will be attached to channel 1. This leaves 188 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints. 189 190 scoped_refptr<ChannelEndpoint> ep0; 191 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0)); 192 scoped_refptr<ChannelEndpoint> ep1; 193 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1)); 194 ConnectChannelEndpoints(ep0, ep1); 195 196 // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1. 197 198 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do 199 // it later, it might already be readable.) 200 waiter.Init(); 201 ASSERT_EQ( 202 MOJO_RESULT_OK, 203 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr)); 204 205 // Write to MP 0, port 0. 206 EXPECT_EQ(MOJO_RESULT_OK, 207 mp0->WriteMessage(0, 208 UserPointer<const void>(kHello), 209 sizeof(kHello), 210 nullptr, 211 MOJO_WRITE_MESSAGE_FLAG_NONE)); 212 213 // Wait. 214 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 215 EXPECT_EQ(123u, context); 216 hss = HandleSignalsState(); 217 mp1->RemoveWaiter(1, &waiter, &hss); 218 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 219 hss.satisfied_signals); 220 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 221 hss.satisfiable_signals); 222 223 // Read from MP 1, port 1. 224 EXPECT_EQ(MOJO_RESULT_OK, 225 mp1->ReadMessage(1, 226 UserPointer<void>(buffer), 227 MakeUserPointer(&buffer_size), 228 nullptr, 229 nullptr, 230 MOJO_READ_MESSAGE_FLAG_NONE)); 231 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size)); 232 EXPECT_STREQ(kHello, buffer); 233 234 // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0. 235 236 waiter.Init(); 237 ASSERT_EQ( 238 MOJO_RESULT_OK, 239 mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr)); 240 241 EXPECT_EQ(MOJO_RESULT_OK, 242 mp1->WriteMessage(1, 243 UserPointer<const void>(kWorld), 244 sizeof(kWorld), 245 nullptr, 246 MOJO_WRITE_MESSAGE_FLAG_NONE)); 247 248 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 249 EXPECT_EQ(456u, context); 250 hss = HandleSignalsState(); 251 mp0->RemoveWaiter(0, &waiter, &hss); 252 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 253 hss.satisfied_signals); 254 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 255 hss.satisfiable_signals); 256 257 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 258 EXPECT_EQ(MOJO_RESULT_OK, 259 mp0->ReadMessage(0, 260 UserPointer<void>(buffer), 261 MakeUserPointer(&buffer_size), 262 nullptr, 263 nullptr, 264 MOJO_READ_MESSAGE_FLAG_NONE)); 265 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size)); 266 EXPECT_STREQ(kWorld, buffer); 267 268 // Close MP 0, port 0. 269 mp0->Close(0); 270 271 // Try to wait for MP 1, port 1 to become readable. This will eventually fail 272 // when it realizes that MP 0, port 0 has been closed. (It may also fail 273 // immediately.) 274 waiter.Init(); 275 hss = HandleSignalsState(); 276 MojoResult result = 277 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, &hss); 278 if (result == MOJO_RESULT_OK) { 279 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 280 waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 281 EXPECT_EQ(789u, context); 282 hss = HandleSignalsState(); 283 mp1->RemoveWaiter(1, &waiter, &hss); 284 } 285 EXPECT_EQ(0u, hss.satisfied_signals); 286 EXPECT_EQ(0u, hss.satisfiable_signals); 287 288 // And MP 1, port 1. 289 mp1->Close(1); 290 } 291 292 TEST_F(RemoteMessagePipeTest, Multiplex) { 293 static const char kHello[] = "hello"; 294 static const char kWorld[] = "world!!!1!!!1!"; 295 char buffer[100] = {0}; 296 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer)); 297 Waiter waiter; 298 HandleSignalsState hss; 299 uint32_t context = 0; 300 301 // Connect message pipes as in the |Basic| test. 302 303 scoped_refptr<ChannelEndpoint> ep0; 304 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0)); 305 scoped_refptr<ChannelEndpoint> ep1; 306 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1)); 307 ConnectChannelEndpoints(ep0, ep1); 308 309 // Now put another message pipe on the channel. 310 311 scoped_refptr<ChannelEndpoint> ep2; 312 scoped_refptr<MessagePipe> mp2(MessagePipe::CreateLocalProxy(&ep2)); 313 scoped_refptr<ChannelEndpoint> ep3; 314 scoped_refptr<MessagePipe> mp3(MessagePipe::CreateProxyLocal(&ep3)); 315 ConnectChannelEndpoints(ep2, ep3); 316 317 // Write: MP 2, port 0 -> MP 3, port 1. 318 319 waiter.Init(); 320 ASSERT_EQ( 321 MOJO_RESULT_OK, 322 mp3->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr)); 323 324 EXPECT_EQ(MOJO_RESULT_OK, 325 mp2->WriteMessage(0, 326 UserPointer<const void>(kHello), 327 sizeof(kHello), 328 nullptr, 329 MOJO_WRITE_MESSAGE_FLAG_NONE)); 330 331 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 332 EXPECT_EQ(789u, context); 333 hss = HandleSignalsState(); 334 mp3->RemoveWaiter(1, &waiter, &hss); 335 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 336 hss.satisfied_signals); 337 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 338 hss.satisfiable_signals); 339 340 // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0. 341 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 342 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, 343 mp0->ReadMessage(0, 344 UserPointer<void>(buffer), 345 MakeUserPointer(&buffer_size), 346 nullptr, 347 nullptr, 348 MOJO_READ_MESSAGE_FLAG_NONE)); 349 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 350 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, 351 mp1->ReadMessage(1, 352 UserPointer<void>(buffer), 353 MakeUserPointer(&buffer_size), 354 nullptr, 355 nullptr, 356 MOJO_READ_MESSAGE_FLAG_NONE)); 357 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 358 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, 359 mp2->ReadMessage(0, 360 UserPointer<void>(buffer), 361 MakeUserPointer(&buffer_size), 362 nullptr, 363 nullptr, 364 MOJO_READ_MESSAGE_FLAG_NONE)); 365 366 // Read from MP 3, port 1. 367 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 368 EXPECT_EQ(MOJO_RESULT_OK, 369 mp3->ReadMessage(1, 370 UserPointer<void>(buffer), 371 MakeUserPointer(&buffer_size), 372 nullptr, 373 nullptr, 374 MOJO_READ_MESSAGE_FLAG_NONE)); 375 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size)); 376 EXPECT_STREQ(kHello, buffer); 377 378 // Write: MP 0, port 0 -> MP 1, port 1 again. 379 380 waiter.Init(); 381 ASSERT_EQ( 382 MOJO_RESULT_OK, 383 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr)); 384 385 EXPECT_EQ(MOJO_RESULT_OK, 386 mp0->WriteMessage(0, 387 UserPointer<const void>(kWorld), 388 sizeof(kWorld), 389 nullptr, 390 MOJO_WRITE_MESSAGE_FLAG_NONE)); 391 392 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 393 EXPECT_EQ(123u, context); 394 hss = HandleSignalsState(); 395 mp1->RemoveWaiter(1, &waiter, &hss); 396 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 397 hss.satisfied_signals); 398 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 399 hss.satisfiable_signals); 400 401 // Make sure there's nothing on the other ports. 402 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 403 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, 404 mp0->ReadMessage(0, 405 UserPointer<void>(buffer), 406 MakeUserPointer(&buffer_size), 407 nullptr, 408 nullptr, 409 MOJO_READ_MESSAGE_FLAG_NONE)); 410 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 411 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, 412 mp2->ReadMessage(0, 413 UserPointer<void>(buffer), 414 MakeUserPointer(&buffer_size), 415 nullptr, 416 nullptr, 417 MOJO_READ_MESSAGE_FLAG_NONE)); 418 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 419 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, 420 mp3->ReadMessage(1, 421 UserPointer<void>(buffer), 422 MakeUserPointer(&buffer_size), 423 nullptr, 424 nullptr, 425 MOJO_READ_MESSAGE_FLAG_NONE)); 426 427 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 428 EXPECT_EQ(MOJO_RESULT_OK, 429 mp1->ReadMessage(1, 430 UserPointer<void>(buffer), 431 MakeUserPointer(&buffer_size), 432 nullptr, 433 nullptr, 434 MOJO_READ_MESSAGE_FLAG_NONE)); 435 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size)); 436 EXPECT_STREQ(kWorld, buffer); 437 438 mp0->Close(0); 439 mp1->Close(1); 440 mp2->Close(0); 441 mp3->Close(1); 442 } 443 444 TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) { 445 static const char kHello[] = "hello"; 446 char buffer[100] = {0}; 447 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer)); 448 Waiter waiter; 449 HandleSignalsState hss; 450 uint32_t context = 0; 451 452 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and 453 // connected to MP 1, port 0, which will be attached to channel 1. This leaves 454 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints. 455 456 scoped_refptr<ChannelEndpoint> ep0; 457 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0)); 458 459 // Write to MP 0, port 0. 460 EXPECT_EQ(MOJO_RESULT_OK, 461 mp0->WriteMessage(0, 462 UserPointer<const void>(kHello), 463 sizeof(kHello), 464 nullptr, 465 MOJO_WRITE_MESSAGE_FLAG_NONE)); 466 467 BootstrapChannelEndpointNoWait(0, ep0); 468 469 // Close MP 0, port 0 before channel 1 is even connected. 470 mp0->Close(0); 471 472 scoped_refptr<ChannelEndpoint> ep1; 473 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1)); 474 475 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do 476 // it later, it might already be readable.) 477 waiter.Init(); 478 ASSERT_EQ( 479 MOJO_RESULT_OK, 480 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr)); 481 482 BootstrapChannelEndpointNoWait(1, ep1); 483 484 // Wait. 485 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 486 EXPECT_EQ(123u, context); 487 hss = HandleSignalsState(); 488 // Note: MP 1, port 1 should definitely should be readable, but it may or may 489 // not appear as writable (there's a race, and it may not have noticed that 490 // the other side was closed yet -- e.g., inserting a sleep here would make it 491 // much more likely to notice that it's no longer writable). 492 mp1->RemoveWaiter(1, &waiter, &hss); 493 EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE)); 494 EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE)); 495 496 // Read from MP 1, port 1. 497 EXPECT_EQ(MOJO_RESULT_OK, 498 mp1->ReadMessage(1, 499 UserPointer<void>(buffer), 500 MakeUserPointer(&buffer_size), 501 nullptr, 502 nullptr, 503 MOJO_READ_MESSAGE_FLAG_NONE)); 504 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size)); 505 EXPECT_STREQ(kHello, buffer); 506 507 // And MP 1, port 1. 508 mp1->Close(1); 509 } 510 511 TEST_F(RemoteMessagePipeTest, HandlePassing) { 512 static const char kHello[] = "hello"; 513 Waiter waiter; 514 HandleSignalsState hss; 515 uint32_t context = 0; 516 517 scoped_refptr<ChannelEndpoint> ep0; 518 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0)); 519 scoped_refptr<ChannelEndpoint> ep1; 520 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1)); 521 ConnectChannelEndpoints(ep0, ep1); 522 523 // We'll try to pass this dispatcher. 524 scoped_refptr<MessagePipeDispatcher> dispatcher( 525 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); 526 scoped_refptr<MessagePipe> local_mp(MessagePipe::CreateLocalLocal()); 527 dispatcher->Init(local_mp, 0); 528 529 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do 530 // it later, it might already be readable.) 531 waiter.Init(); 532 ASSERT_EQ( 533 MOJO_RESULT_OK, 534 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr)); 535 536 // Write to MP 0, port 0. 537 { 538 DispatcherTransport transport( 539 test::DispatcherTryStartTransport(dispatcher.get())); 540 EXPECT_TRUE(transport.is_valid()); 541 542 std::vector<DispatcherTransport> transports; 543 transports.push_back(transport); 544 EXPECT_EQ(MOJO_RESULT_OK, 545 mp0->WriteMessage(0, 546 UserPointer<const void>(kHello), 547 sizeof(kHello), 548 &transports, 549 MOJO_WRITE_MESSAGE_FLAG_NONE)); 550 transport.End(); 551 552 // |dispatcher| should have been closed. This is |DCHECK()|ed when the 553 // |dispatcher| is destroyed. 554 EXPECT_TRUE(dispatcher->HasOneRef()); 555 dispatcher = nullptr; 556 } 557 558 // Wait. 559 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 560 EXPECT_EQ(123u, context); 561 hss = HandleSignalsState(); 562 mp1->RemoveWaiter(1, &waiter, &hss); 563 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 564 hss.satisfied_signals); 565 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 566 hss.satisfiable_signals); 567 568 // Read from MP 1, port 1. 569 char read_buffer[100] = {0}; 570 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); 571 DispatcherVector read_dispatchers; 572 uint32_t read_num_dispatchers = 10; // Maximum to get. 573 EXPECT_EQ(MOJO_RESULT_OK, 574 mp1->ReadMessage(1, 575 UserPointer<void>(read_buffer), 576 MakeUserPointer(&read_buffer_size), 577 &read_dispatchers, 578 &read_num_dispatchers, 579 MOJO_READ_MESSAGE_FLAG_NONE)); 580 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size)); 581 EXPECT_STREQ(kHello, read_buffer); 582 EXPECT_EQ(1u, read_dispatchers.size()); 583 EXPECT_EQ(1u, read_num_dispatchers); 584 ASSERT_TRUE(read_dispatchers[0].get()); 585 EXPECT_TRUE(read_dispatchers[0]->HasOneRef()); 586 587 EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType()); 588 dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get()); 589 590 // Add the waiter now, before it becomes readable to avoid a race. 591 waiter.Init(); 592 ASSERT_EQ(MOJO_RESULT_OK, 593 dispatcher->AddWaiter( 594 &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr)); 595 596 // Write to "local_mp", port 1. 597 EXPECT_EQ(MOJO_RESULT_OK, 598 local_mp->WriteMessage(1, 599 UserPointer<const void>(kHello), 600 sizeof(kHello), 601 nullptr, 602 MOJO_WRITE_MESSAGE_FLAG_NONE)); 603 604 // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately 605 // here. (We don't crash if I sleep and then close.) 606 607 // Wait for the dispatcher to become readable. 608 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 609 EXPECT_EQ(456u, context); 610 hss = HandleSignalsState(); 611 dispatcher->RemoveWaiter(&waiter, &hss); 612 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 613 hss.satisfied_signals); 614 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 615 hss.satisfiable_signals); 616 617 // Read from the dispatcher. 618 memset(read_buffer, 0, sizeof(read_buffer)); 619 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); 620 EXPECT_EQ(MOJO_RESULT_OK, 621 dispatcher->ReadMessage(UserPointer<void>(read_buffer), 622 MakeUserPointer(&read_buffer_size), 623 0, 624 nullptr, 625 MOJO_READ_MESSAGE_FLAG_NONE)); 626 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size)); 627 EXPECT_STREQ(kHello, read_buffer); 628 629 // Prepare to wait on "local_mp", port 1. 630 waiter.Init(); 631 ASSERT_EQ(MOJO_RESULT_OK, 632 local_mp->AddWaiter( 633 1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr)); 634 635 // Write to the dispatcher. 636 EXPECT_EQ(MOJO_RESULT_OK, 637 dispatcher->WriteMessage(UserPointer<const void>(kHello), 638 sizeof(kHello), 639 nullptr, 640 MOJO_WRITE_MESSAGE_FLAG_NONE)); 641 642 // Wait. 643 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 644 EXPECT_EQ(789u, context); 645 hss = HandleSignalsState(); 646 local_mp->RemoveWaiter(1, &waiter, &hss); 647 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 648 hss.satisfied_signals); 649 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 650 hss.satisfiable_signals); 651 652 // Read from "local_mp", port 1. 653 memset(read_buffer, 0, sizeof(read_buffer)); 654 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); 655 EXPECT_EQ(MOJO_RESULT_OK, 656 local_mp->ReadMessage(1, 657 UserPointer<void>(read_buffer), 658 MakeUserPointer(&read_buffer_size), 659 nullptr, 660 nullptr, 661 MOJO_READ_MESSAGE_FLAG_NONE)); 662 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size)); 663 EXPECT_STREQ(kHello, read_buffer); 664 665 // TODO(vtl): Also test that messages queued up before the handle was sent are 666 // delivered properly. 667 668 // Close everything that belongs to us. 669 mp0->Close(0); 670 mp1->Close(1); 671 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close()); 672 // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed. 673 local_mp->Close(1); 674 } 675 676 #if defined(OS_POSIX) 677 #define MAYBE_SharedBufferPassing SharedBufferPassing 678 #else 679 // Not yet implemented (on Windows). 680 #define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing 681 #endif 682 TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) { 683 static const char kHello[] = "hello"; 684 Waiter waiter; 685 HandleSignalsState hss; 686 uint32_t context = 0; 687 688 scoped_refptr<ChannelEndpoint> ep0; 689 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0)); 690 scoped_refptr<ChannelEndpoint> ep1; 691 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1)); 692 ConnectChannelEndpoints(ep0, ep1); 693 694 // We'll try to pass this dispatcher. 695 scoped_refptr<SharedBufferDispatcher> dispatcher; 696 EXPECT_EQ(MOJO_RESULT_OK, 697 SharedBufferDispatcher::Create( 698 platform_support(), 699 SharedBufferDispatcher::kDefaultCreateOptions, 700 100, 701 &dispatcher)); 702 ASSERT_TRUE(dispatcher.get()); 703 704 // Make a mapping. 705 scoped_ptr<embedder::PlatformSharedBufferMapping> mapping0; 706 EXPECT_EQ( 707 MOJO_RESULT_OK, 708 dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping0)); 709 ASSERT_TRUE(mapping0); 710 ASSERT_TRUE(mapping0->GetBase()); 711 ASSERT_EQ(100u, mapping0->GetLength()); 712 static_cast<char*>(mapping0->GetBase())[0] = 'A'; 713 static_cast<char*>(mapping0->GetBase())[50] = 'B'; 714 static_cast<char*>(mapping0->GetBase())[99] = 'C'; 715 716 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do 717 // it later, it might already be readable.) 718 waiter.Init(); 719 ASSERT_EQ( 720 MOJO_RESULT_OK, 721 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr)); 722 723 // Write to MP 0, port 0. 724 { 725 DispatcherTransport transport( 726 test::DispatcherTryStartTransport(dispatcher.get())); 727 EXPECT_TRUE(transport.is_valid()); 728 729 std::vector<DispatcherTransport> transports; 730 transports.push_back(transport); 731 EXPECT_EQ(MOJO_RESULT_OK, 732 mp0->WriteMessage(0, 733 UserPointer<const void>(kHello), 734 sizeof(kHello), 735 &transports, 736 MOJO_WRITE_MESSAGE_FLAG_NONE)); 737 transport.End(); 738 739 // |dispatcher| should have been closed. This is |DCHECK()|ed when the 740 // |dispatcher| is destroyed. 741 EXPECT_TRUE(dispatcher->HasOneRef()); 742 dispatcher = nullptr; 743 } 744 745 // Wait. 746 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 747 EXPECT_EQ(123u, context); 748 hss = HandleSignalsState(); 749 mp1->RemoveWaiter(1, &waiter, &hss); 750 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 751 hss.satisfied_signals); 752 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 753 hss.satisfiable_signals); 754 755 // Read from MP 1, port 1. 756 char read_buffer[100] = {0}; 757 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); 758 DispatcherVector read_dispatchers; 759 uint32_t read_num_dispatchers = 10; // Maximum to get. 760 EXPECT_EQ(MOJO_RESULT_OK, 761 mp1->ReadMessage(1, 762 UserPointer<void>(read_buffer), 763 MakeUserPointer(&read_buffer_size), 764 &read_dispatchers, 765 &read_num_dispatchers, 766 MOJO_READ_MESSAGE_FLAG_NONE)); 767 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size)); 768 EXPECT_STREQ(kHello, read_buffer); 769 EXPECT_EQ(1u, read_dispatchers.size()); 770 EXPECT_EQ(1u, read_num_dispatchers); 771 ASSERT_TRUE(read_dispatchers[0].get()); 772 EXPECT_TRUE(read_dispatchers[0]->HasOneRef()); 773 774 EXPECT_EQ(Dispatcher::kTypeSharedBuffer, read_dispatchers[0]->GetType()); 775 dispatcher = static_cast<SharedBufferDispatcher*>(read_dispatchers[0].get()); 776 777 // Make another mapping. 778 scoped_ptr<embedder::PlatformSharedBufferMapping> mapping1; 779 EXPECT_EQ( 780 MOJO_RESULT_OK, 781 dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping1)); 782 ASSERT_TRUE(mapping1); 783 ASSERT_TRUE(mapping1->GetBase()); 784 ASSERT_EQ(100u, mapping1->GetLength()); 785 EXPECT_NE(mapping1->GetBase(), mapping0->GetBase()); 786 EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]); 787 EXPECT_EQ('B', static_cast<char*>(mapping1->GetBase())[50]); 788 EXPECT_EQ('C', static_cast<char*>(mapping1->GetBase())[99]); 789 790 // Write stuff either way. 791 static_cast<char*>(mapping1->GetBase())[1] = 'x'; 792 EXPECT_EQ('x', static_cast<char*>(mapping0->GetBase())[1]); 793 static_cast<char*>(mapping0->GetBase())[2] = 'y'; 794 EXPECT_EQ('y', static_cast<char*>(mapping1->GetBase())[2]); 795 796 // Kill the first mapping; the second should still be valid. 797 mapping0.reset(); 798 EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]); 799 800 // Close everything that belongs to us. 801 mp0->Close(0); 802 mp1->Close(1); 803 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close()); 804 805 // The second mapping should still be good. 806 EXPECT_EQ('x', static_cast<char*>(mapping1->GetBase())[1]); 807 } 808 809 #if defined(OS_POSIX) 810 #define MAYBE_PlatformHandlePassing PlatformHandlePassing 811 #else 812 // Not yet implemented (on Windows). 813 #define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing 814 #endif 815 TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) { 816 base::ScopedTempDir temp_dir; 817 ASSERT_TRUE(temp_dir.CreateUniqueTempDir()); 818 819 static const char kHello[] = "hello"; 820 static const char kWorld[] = "world"; 821 Waiter waiter; 822 uint32_t context = 0; 823 HandleSignalsState hss; 824 825 scoped_refptr<ChannelEndpoint> ep0; 826 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0)); 827 scoped_refptr<ChannelEndpoint> ep1; 828 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1)); 829 ConnectChannelEndpoints(ep0, ep1); 830 831 base::FilePath unused; 832 base::ScopedFILE fp( 833 CreateAndOpenTemporaryFileInDir(temp_dir.path(), &unused)); 834 EXPECT_EQ(sizeof(kHello), fwrite(kHello, 1, sizeof(kHello), fp.get())); 835 // We'll try to pass this dispatcher, which will cause a |PlatformHandle| to 836 // be passed. 837 scoped_refptr<PlatformHandleDispatcher> dispatcher( 838 new PlatformHandleDispatcher( 839 mojo::test::PlatformHandleFromFILE(fp.Pass()))); 840 841 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do 842 // it later, it might already be readable.) 843 waiter.Init(); 844 ASSERT_EQ( 845 MOJO_RESULT_OK, 846 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr)); 847 848 // Write to MP 0, port 0. 849 { 850 DispatcherTransport transport( 851 test::DispatcherTryStartTransport(dispatcher.get())); 852 EXPECT_TRUE(transport.is_valid()); 853 854 std::vector<DispatcherTransport> transports; 855 transports.push_back(transport); 856 EXPECT_EQ(MOJO_RESULT_OK, 857 mp0->WriteMessage(0, 858 UserPointer<const void>(kWorld), 859 sizeof(kWorld), 860 &transports, 861 MOJO_WRITE_MESSAGE_FLAG_NONE)); 862 transport.End(); 863 864 // |dispatcher| should have been closed. This is |DCHECK()|ed when the 865 // |dispatcher| is destroyed. 866 EXPECT_TRUE(dispatcher->HasOneRef()); 867 dispatcher = nullptr; 868 } 869 870 // Wait. 871 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 872 EXPECT_EQ(123u, context); 873 hss = HandleSignalsState(); 874 mp1->RemoveWaiter(1, &waiter, &hss); 875 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 876 hss.satisfied_signals); 877 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 878 hss.satisfiable_signals); 879 880 // Read from MP 1, port 1. 881 char read_buffer[100] = {0}; 882 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); 883 DispatcherVector read_dispatchers; 884 uint32_t read_num_dispatchers = 10; // Maximum to get. 885 EXPECT_EQ(MOJO_RESULT_OK, 886 mp1->ReadMessage(1, 887 UserPointer<void>(read_buffer), 888 MakeUserPointer(&read_buffer_size), 889 &read_dispatchers, 890 &read_num_dispatchers, 891 MOJO_READ_MESSAGE_FLAG_NONE)); 892 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size)); 893 EXPECT_STREQ(kWorld, read_buffer); 894 EXPECT_EQ(1u, read_dispatchers.size()); 895 EXPECT_EQ(1u, read_num_dispatchers); 896 ASSERT_TRUE(read_dispatchers[0].get()); 897 EXPECT_TRUE(read_dispatchers[0]->HasOneRef()); 898 899 EXPECT_EQ(Dispatcher::kTypePlatformHandle, read_dispatchers[0]->GetType()); 900 dispatcher = 901 static_cast<PlatformHandleDispatcher*>(read_dispatchers[0].get()); 902 903 embedder::ScopedPlatformHandle h = dispatcher->PassPlatformHandle().Pass(); 904 EXPECT_TRUE(h.is_valid()); 905 906 fp = mojo::test::FILEFromPlatformHandle(h.Pass(), "rb").Pass(); 907 EXPECT_FALSE(h.is_valid()); 908 EXPECT_TRUE(fp); 909 910 rewind(fp.get()); 911 memset(read_buffer, 0, sizeof(read_buffer)); 912 EXPECT_EQ(sizeof(kHello), 913 fread(read_buffer, 1, sizeof(read_buffer), fp.get())); 914 EXPECT_STREQ(kHello, read_buffer); 915 916 // Close everything that belongs to us. 917 mp0->Close(0); 918 mp1->Close(1); 919 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close()); 920 } 921 922 // Test racing closes (on each end). 923 // Note: A flaky failure would almost certainly indicate a problem in the code 924 // itself (not in the test). Also, any logged warnings/errors would also 925 // probably be indicative of bugs. 926 TEST_F(RemoteMessagePipeTest, RacingClosesStress) { 927 base::TimeDelta delay = base::TimeDelta::FromMilliseconds(5); 928 929 for (unsigned i = 0; i < 256; i++) { 930 DVLOG(2) << "---------------------------------------- " << i; 931 scoped_refptr<ChannelEndpoint> ep0; 932 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0)); 933 BootstrapChannelEndpointNoWait(0, ep0); 934 935 scoped_refptr<ChannelEndpoint> ep1; 936 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1)); 937 BootstrapChannelEndpointNoWait(1, ep1); 938 939 if (i & 1u) { 940 io_thread()->task_runner()->PostTask( 941 FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay)); 942 } 943 if (i & 2u) 944 base::PlatformThread::Sleep(delay); 945 946 mp0->Close(0); 947 948 if (i & 4u) { 949 io_thread()->task_runner()->PostTask( 950 FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay)); 951 } 952 if (i & 8u) 953 base::PlatformThread::Sleep(delay); 954 955 mp1->Close(1); 956 957 RestoreInitialState(); 958 } 959 } 960 961 // Tests passing an end of a message pipe over a remote message pipe, and then 962 // passing that end back. 963 // TODO(vtl): Also test passing a message pipe across two remote message pipes. 964 TEST_F(RemoteMessagePipeTest, PassMessagePipeHandleAcrossAndBack) { 965 static const char kHello[] = "hello"; 966 static const char kWorld[] = "world"; 967 Waiter waiter; 968 HandleSignalsState hss; 969 uint32_t context = 0; 970 971 scoped_refptr<ChannelEndpoint> ep0; 972 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0)); 973 scoped_refptr<ChannelEndpoint> ep1; 974 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1)); 975 ConnectChannelEndpoints(ep0, ep1); 976 977 // We'll try to pass this dispatcher. 978 scoped_refptr<MessagePipeDispatcher> dispatcher( 979 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); 980 scoped_refptr<MessagePipe> local_mp(MessagePipe::CreateLocalLocal()); 981 dispatcher->Init(local_mp, 0); 982 983 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do 984 // it later, it might already be readable.) 985 waiter.Init(); 986 ASSERT_EQ( 987 MOJO_RESULT_OK, 988 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr)); 989 990 // Write to MP 0, port 0. 991 { 992 DispatcherTransport transport( 993 test::DispatcherTryStartTransport(dispatcher.get())); 994 EXPECT_TRUE(transport.is_valid()); 995 996 std::vector<DispatcherTransport> transports; 997 transports.push_back(transport); 998 EXPECT_EQ(MOJO_RESULT_OK, 999 mp0->WriteMessage(0, 1000 UserPointer<const void>(kHello), 1001 sizeof(kHello), 1002 &transports, 1003 MOJO_WRITE_MESSAGE_FLAG_NONE)); 1004 transport.End(); 1005 1006 // |dispatcher| should have been closed. This is |DCHECK()|ed when the 1007 // |dispatcher| is destroyed. 1008 EXPECT_TRUE(dispatcher->HasOneRef()); 1009 dispatcher = nullptr; 1010 } 1011 1012 // Wait. 1013 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 1014 EXPECT_EQ(123u, context); 1015 hss = HandleSignalsState(); 1016 mp1->RemoveWaiter(1, &waiter, &hss); 1017 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 1018 hss.satisfied_signals); 1019 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 1020 hss.satisfiable_signals); 1021 1022 // Read from MP 1, port 1. 1023 char read_buffer[100] = {0}; 1024 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); 1025 DispatcherVector read_dispatchers; 1026 uint32_t read_num_dispatchers = 10; // Maximum to get. 1027 EXPECT_EQ(MOJO_RESULT_OK, 1028 mp1->ReadMessage(1, 1029 UserPointer<void>(read_buffer), 1030 MakeUserPointer(&read_buffer_size), 1031 &read_dispatchers, 1032 &read_num_dispatchers, 1033 MOJO_READ_MESSAGE_FLAG_NONE)); 1034 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size)); 1035 EXPECT_STREQ(kHello, read_buffer); 1036 EXPECT_EQ(1u, read_dispatchers.size()); 1037 EXPECT_EQ(1u, read_num_dispatchers); 1038 ASSERT_TRUE(read_dispatchers[0].get()); 1039 EXPECT_TRUE(read_dispatchers[0]->HasOneRef()); 1040 1041 EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType()); 1042 dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get()); 1043 read_dispatchers.clear(); 1044 1045 // Now pass it back. 1046 1047 // Prepare to wait on MP 0, port 0. (Add the waiter now. Otherwise, if we do 1048 // it later, it might already be readable.) 1049 waiter.Init(); 1050 ASSERT_EQ( 1051 MOJO_RESULT_OK, 1052 mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr)); 1053 1054 // Write to MP 1, port 1. 1055 { 1056 DispatcherTransport transport( 1057 test::DispatcherTryStartTransport(dispatcher.get())); 1058 EXPECT_TRUE(transport.is_valid()); 1059 1060 std::vector<DispatcherTransport> transports; 1061 transports.push_back(transport); 1062 EXPECT_EQ(MOJO_RESULT_OK, 1063 mp1->WriteMessage(1, 1064 UserPointer<const void>(kWorld), 1065 sizeof(kWorld), 1066 &transports, 1067 MOJO_WRITE_MESSAGE_FLAG_NONE)); 1068 transport.End(); 1069 1070 // |dispatcher| should have been closed. This is |DCHECK()|ed when the 1071 // |dispatcher| is destroyed. 1072 EXPECT_TRUE(dispatcher->HasOneRef()); 1073 dispatcher = nullptr; 1074 } 1075 1076 // Wait. 1077 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 1078 EXPECT_EQ(456u, context); 1079 hss = HandleSignalsState(); 1080 mp0->RemoveWaiter(0, &waiter, &hss); 1081 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 1082 hss.satisfied_signals); 1083 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 1084 hss.satisfiable_signals); 1085 1086 // Read from MP 0, port 0. 1087 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); 1088 read_num_dispatchers = 10; // Maximum to get. 1089 EXPECT_EQ(MOJO_RESULT_OK, 1090 mp0->ReadMessage(0, 1091 UserPointer<void>(read_buffer), 1092 MakeUserPointer(&read_buffer_size), 1093 &read_dispatchers, 1094 &read_num_dispatchers, 1095 MOJO_READ_MESSAGE_FLAG_NONE)); 1096 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size)); 1097 EXPECT_STREQ(kWorld, read_buffer); 1098 EXPECT_EQ(1u, read_dispatchers.size()); 1099 EXPECT_EQ(1u, read_num_dispatchers); 1100 ASSERT_TRUE(read_dispatchers[0].get()); 1101 EXPECT_TRUE(read_dispatchers[0]->HasOneRef()); 1102 1103 EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType()); 1104 dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get()); 1105 read_dispatchers.clear(); 1106 1107 // Add the waiter now, before it becomes readable to avoid a race. 1108 waiter.Init(); 1109 ASSERT_EQ(MOJO_RESULT_OK, 1110 dispatcher->AddWaiter( 1111 &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr)); 1112 1113 // Write to "local_mp", port 1. 1114 EXPECT_EQ(MOJO_RESULT_OK, 1115 local_mp->WriteMessage(1, 1116 UserPointer<const void>(kHello), 1117 sizeof(kHello), 1118 nullptr, 1119 MOJO_WRITE_MESSAGE_FLAG_NONE)); 1120 1121 // Wait for the dispatcher to become readable. 1122 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 1123 EXPECT_EQ(789u, context); 1124 hss = HandleSignalsState(); 1125 dispatcher->RemoveWaiter(&waiter, &hss); 1126 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 1127 hss.satisfied_signals); 1128 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 1129 hss.satisfiable_signals); 1130 1131 // Read from the dispatcher. 1132 memset(read_buffer, 0, sizeof(read_buffer)); 1133 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); 1134 EXPECT_EQ(MOJO_RESULT_OK, 1135 dispatcher->ReadMessage(UserPointer<void>(read_buffer), 1136 MakeUserPointer(&read_buffer_size), 1137 0, 1138 nullptr, 1139 MOJO_READ_MESSAGE_FLAG_NONE)); 1140 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size)); 1141 EXPECT_STREQ(kHello, read_buffer); 1142 1143 // Prepare to wait on "local_mp", port 1. 1144 waiter.Init(); 1145 ASSERT_EQ(MOJO_RESULT_OK, 1146 local_mp->AddWaiter( 1147 1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr)); 1148 1149 // Write to the dispatcher. 1150 EXPECT_EQ(MOJO_RESULT_OK, 1151 dispatcher->WriteMessage(UserPointer<const void>(kHello), 1152 sizeof(kHello), 1153 nullptr, 1154 MOJO_WRITE_MESSAGE_FLAG_NONE)); 1155 1156 // Wait. 1157 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 1158 EXPECT_EQ(789u, context); 1159 hss = HandleSignalsState(); 1160 local_mp->RemoveWaiter(1, &waiter, &hss); 1161 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 1162 hss.satisfied_signals); 1163 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 1164 hss.satisfiable_signals); 1165 1166 // Read from "local_mp", port 1. 1167 memset(read_buffer, 0, sizeof(read_buffer)); 1168 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); 1169 EXPECT_EQ(MOJO_RESULT_OK, 1170 local_mp->ReadMessage(1, 1171 UserPointer<void>(read_buffer), 1172 MakeUserPointer(&read_buffer_size), 1173 nullptr, 1174 nullptr, 1175 MOJO_READ_MESSAGE_FLAG_NONE)); 1176 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size)); 1177 EXPECT_STREQ(kHello, read_buffer); 1178 1179 // TODO(vtl): Also test the cases where messages are written and read (at 1180 // various points) on the message pipe being passed around. 1181 1182 // Close everything that belongs to us. 1183 mp0->Close(0); 1184 mp1->Close(1); 1185 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close()); 1186 // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed. 1187 local_mp->Close(1); 1188 } 1189 1190 } // namespace 1191 } // namespace system 1192 } // namespace mojo 1193