1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include <stdint.h> 6 #include <stdio.h> 7 #include <string.h> 8 9 #include <vector> 10 11 #include "base/basictypes.h" 12 #include "base/bind.h" 13 #include "base/file_util.h" 14 #include "base/files/file_path.h" 15 #include "base/files/scoped_file.h" 16 #include "base/location.h" 17 #include "base/logging.h" 18 #include "base/message_loop/message_loop.h" 19 #include "base/threading/platform_thread.h" // For |Sleep()|. 20 #include "build/build_config.h" // TODO(vtl): Remove this. 21 #include "mojo/common/test/test_utils.h" 22 #include "mojo/embedder/platform_channel_pair.h" 23 #include "mojo/embedder/scoped_platform_handle.h" 24 #include "mojo/system/channel.h" 25 #include "mojo/system/local_message_pipe_endpoint.h" 26 #include "mojo/system/message_pipe.h" 27 #include "mojo/system/message_pipe_dispatcher.h" 28 #include "mojo/system/platform_handle_dispatcher.h" 29 #include "mojo/system/proxy_message_pipe_endpoint.h" 30 #include "mojo/system/raw_channel.h" 31 #include "mojo/system/shared_buffer_dispatcher.h" 32 #include "mojo/system/test_utils.h" 33 #include "mojo/system/waiter.h" 34 #include "testing/gtest/include/gtest/gtest.h" 35 36 namespace mojo { 37 namespace system { 38 namespace { 39 40 class RemoteMessagePipeTest : public testing::Test { 41 public: 42 RemoteMessagePipeTest() : io_thread_(test::TestIOThread::kAutoStart) {} 43 virtual ~RemoteMessagePipeTest() {} 44 45 virtual void SetUp() OVERRIDE { 46 io_thread_.PostTaskAndWait( 47 FROM_HERE, 48 base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread, 49 base::Unretained(this))); 50 } 51 52 virtual void TearDown() OVERRIDE { 53 io_thread_.PostTaskAndWait( 54 FROM_HERE, 55 base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread, 56 base::Unretained(this))); 57 } 58 59 protected: 60 // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1, 61 // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP 62 // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s. 63 void ConnectMessagePipes(scoped_refptr<MessagePipe> mp0, 64 scoped_refptr<MessagePipe> mp1) { 65 io_thread_.PostTaskAndWait( 66 FROM_HERE, 67 base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread, 68 base::Unretained(this), mp0, mp1)); 69 } 70 71 // This connects |mp|'s port |channel_index ^ 1| to channel |channel_index|. 72 // It assumes/requires that this is the bootstrap case, i.e., that the 73 // endpoint IDs are both/will both be |Channel::kBootstrapEndpointId|. This 74 // returns *without* waiting for it to finish connecting. 75 void BootstrapMessagePipeNoWait(unsigned channel_index, 76 scoped_refptr<MessagePipe> mp) { 77 io_thread_.PostTask( 78 FROM_HERE, 79 base::Bind(&RemoteMessagePipeTest::BootstrapMessagePipeOnIOThread, 80 base::Unretained(this), channel_index, mp)); 81 } 82 83 void RestoreInitialState() { 84 io_thread_.PostTaskAndWait( 85 FROM_HERE, 86 base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread, 87 base::Unretained(this))); 88 } 89 90 test::TestIOThread* io_thread() { return &io_thread_; } 91 92 private: 93 void SetUpOnIOThread() { 94 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop()); 95 96 embedder::PlatformChannelPair channel_pair; 97 platform_handles_[0] = channel_pair.PassServerHandle(); 98 platform_handles_[1] = channel_pair.PassClientHandle(); 99 } 100 101 void TearDownOnIOThread() { 102 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop()); 103 104 if (channels_[0]) { 105 channels_[0]->Shutdown(); 106 channels_[0] = NULL; 107 } 108 if (channels_[1]) { 109 channels_[1]->Shutdown(); 110 channels_[1] = NULL; 111 } 112 } 113 114 void CreateAndInitChannel(unsigned channel_index) { 115 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop()); 116 CHECK(channel_index == 0 || channel_index == 1); 117 CHECK(!channels_[channel_index]); 118 119 channels_[channel_index] = new Channel(); 120 CHECK(channels_[channel_index]->Init( 121 RawChannel::Create(platform_handles_[channel_index].Pass()))); 122 } 123 124 void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp0, 125 scoped_refptr<MessagePipe> mp1) { 126 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop()); 127 128 if (!channels_[0]) 129 CreateAndInitChannel(0); 130 if (!channels_[1]) 131 CreateAndInitChannel(1); 132 133 MessageInTransit::EndpointId local_id0 = 134 channels_[0]->AttachMessagePipeEndpoint(mp0, 1); 135 MessageInTransit::EndpointId local_id1 = 136 channels_[1]->AttachMessagePipeEndpoint(mp1, 0); 137 138 CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1)); 139 CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0)); 140 } 141 142 void BootstrapMessagePipeOnIOThread(unsigned channel_index, 143 scoped_refptr<MessagePipe> mp) { 144 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop()); 145 CHECK(channel_index == 0 || channel_index == 1); 146 147 unsigned port = channel_index ^ 1u; 148 149 CreateAndInitChannel(channel_index); 150 MessageInTransit::EndpointId endpoint_id = 151 channels_[channel_index]->AttachMessagePipeEndpoint(mp, port); 152 if (endpoint_id == MessageInTransit::kInvalidEndpointId) 153 return; 154 155 CHECK_EQ(endpoint_id, Channel::kBootstrapEndpointId); 156 CHECK(channels_[channel_index]->RunMessagePipeEndpoint( 157 Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId)); 158 } 159 160 void RestoreInitialStateOnIOThread() { 161 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop()); 162 163 TearDownOnIOThread(); 164 SetUpOnIOThread(); 165 } 166 167 test::TestIOThread io_thread_; 168 embedder::ScopedPlatformHandle platform_handles_[2]; 169 scoped_refptr<Channel> channels_[2]; 170 171 DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest); 172 }; 173 174 TEST_F(RemoteMessagePipeTest, Basic) { 175 static const char kHello[] = "hello"; 176 static const char kWorld[] = "world!!!1!!!1!"; 177 char buffer[100] = { 0 }; 178 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer)); 179 Waiter waiter; 180 uint32_t context = 0; 181 182 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and 183 // connected to MP 1, port 0, which will be attached to channel 1. This leaves 184 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints. 185 186 scoped_refptr<MessagePipe> mp0(new MessagePipe( 187 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), 188 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); 189 scoped_refptr<MessagePipe> mp1(new MessagePipe( 190 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()), 191 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()))); 192 ConnectMessagePipes(mp0, mp1); 193 194 // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1. 195 196 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do 197 // it later, it might already be readable.) 198 waiter.Init(); 199 EXPECT_EQ(MOJO_RESULT_OK, 200 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123)); 201 202 // Write to MP 0, port 0. 203 EXPECT_EQ(MOJO_RESULT_OK, 204 mp0->WriteMessage(0, 205 kHello, sizeof(kHello), 206 NULL, 207 MOJO_WRITE_MESSAGE_FLAG_NONE)); 208 209 // Wait. 210 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 211 EXPECT_EQ(123u, context); 212 mp1->RemoveWaiter(1, &waiter); 213 214 // Read from MP 1, port 1. 215 EXPECT_EQ(MOJO_RESULT_OK, 216 mp1->ReadMessage(1, 217 buffer, &buffer_size, 218 NULL, NULL, 219 MOJO_READ_MESSAGE_FLAG_NONE)); 220 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size)); 221 EXPECT_STREQ(kHello, buffer); 222 223 // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0. 224 225 waiter.Init(); 226 EXPECT_EQ(MOJO_RESULT_OK, 227 mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456)); 228 229 EXPECT_EQ(MOJO_RESULT_OK, 230 mp1->WriteMessage(1, 231 kWorld, sizeof(kWorld), 232 NULL, 233 MOJO_WRITE_MESSAGE_FLAG_NONE)); 234 235 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 236 EXPECT_EQ(456u, context); 237 mp0->RemoveWaiter(0, &waiter); 238 239 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 240 EXPECT_EQ(MOJO_RESULT_OK, 241 mp0->ReadMessage(0, 242 buffer, &buffer_size, 243 NULL, NULL, 244 MOJO_READ_MESSAGE_FLAG_NONE)); 245 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size)); 246 EXPECT_STREQ(kWorld, buffer); 247 248 // Close MP 0, port 0. 249 mp0->Close(0); 250 251 // Try to wait for MP 1, port 1 to become readable. This will eventually fail 252 // when it realizes that MP 0, port 0 has been closed. (It may also fail 253 // immediately.) 254 waiter.Init(); 255 MojoResult result = 256 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789); 257 if (result == MOJO_RESULT_OK) { 258 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 259 waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 260 EXPECT_EQ(789u, context); 261 mp1->RemoveWaiter(1, &waiter); 262 } else { 263 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); 264 } 265 266 // And MP 1, port 1. 267 mp1->Close(1); 268 } 269 270 TEST_F(RemoteMessagePipeTest, Multiplex) { 271 static const char kHello[] = "hello"; 272 static const char kWorld[] = "world!!!1!!!1!"; 273 char buffer[100] = { 0 }; 274 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer)); 275 Waiter waiter; 276 uint32_t context = 0; 277 278 // Connect message pipes as in the |Basic| test. 279 280 scoped_refptr<MessagePipe> mp0(new MessagePipe( 281 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), 282 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); 283 scoped_refptr<MessagePipe> mp1(new MessagePipe( 284 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()), 285 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()))); 286 ConnectMessagePipes(mp0, mp1); 287 288 // Now put another message pipe on the channel. 289 290 scoped_refptr<MessagePipe> mp2(new MessagePipe( 291 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), 292 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); 293 scoped_refptr<MessagePipe> mp3(new MessagePipe( 294 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()), 295 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()))); 296 ConnectMessagePipes(mp2, mp3); 297 298 // Write: MP 2, port 0 -> MP 3, port 1. 299 300 waiter.Init(); 301 EXPECT_EQ(MOJO_RESULT_OK, 302 mp3->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789)); 303 304 EXPECT_EQ(MOJO_RESULT_OK, 305 mp2->WriteMessage(0, 306 kHello, sizeof(kHello), 307 NULL, 308 MOJO_WRITE_MESSAGE_FLAG_NONE)); 309 310 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 311 EXPECT_EQ(789u, context); 312 mp3->RemoveWaiter(1, &waiter); 313 314 // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0. 315 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 316 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, 317 mp0->ReadMessage(0, 318 buffer, &buffer_size, 319 NULL, NULL, 320 MOJO_READ_MESSAGE_FLAG_NONE)); 321 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 322 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, 323 mp1->ReadMessage(1, 324 buffer, &buffer_size, 325 NULL, NULL, 326 MOJO_READ_MESSAGE_FLAG_NONE)); 327 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 328 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, 329 mp2->ReadMessage(0, 330 buffer, &buffer_size, 331 NULL, NULL, 332 MOJO_READ_MESSAGE_FLAG_NONE)); 333 334 // Read from MP 3, port 1. 335 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 336 EXPECT_EQ(MOJO_RESULT_OK, 337 mp3->ReadMessage(1, 338 buffer, &buffer_size, 339 NULL, NULL, 340 MOJO_READ_MESSAGE_FLAG_NONE)); 341 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size)); 342 EXPECT_STREQ(kHello, buffer); 343 344 // Write: MP 0, port 0 -> MP 1, port 1 again. 345 346 waiter.Init(); 347 EXPECT_EQ(MOJO_RESULT_OK, 348 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123)); 349 350 EXPECT_EQ(MOJO_RESULT_OK, 351 mp0->WriteMessage(0, 352 kWorld, sizeof(kWorld), 353 NULL, 354 MOJO_WRITE_MESSAGE_FLAG_NONE)); 355 356 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 357 EXPECT_EQ(123u, context); 358 mp1->RemoveWaiter(1, &waiter); 359 360 // Make sure there's nothing on the other ports. 361 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 362 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, 363 mp0->ReadMessage(0, 364 buffer, &buffer_size, 365 NULL, NULL, 366 MOJO_READ_MESSAGE_FLAG_NONE)); 367 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 368 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, 369 mp2->ReadMessage(0, 370 buffer, &buffer_size, 371 NULL, NULL, 372 MOJO_READ_MESSAGE_FLAG_NONE)); 373 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 374 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, 375 mp3->ReadMessage(1, 376 buffer, &buffer_size, 377 NULL, NULL, 378 MOJO_READ_MESSAGE_FLAG_NONE)); 379 380 buffer_size = static_cast<uint32_t>(sizeof(buffer)); 381 EXPECT_EQ(MOJO_RESULT_OK, 382 mp1->ReadMessage(1, 383 buffer, &buffer_size, 384 NULL, NULL, 385 MOJO_READ_MESSAGE_FLAG_NONE)); 386 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size)); 387 EXPECT_STREQ(kWorld, buffer); 388 389 mp0->Close(0); 390 mp1->Close(1); 391 mp2->Close(0); 392 mp3->Close(1); 393 } 394 395 TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) { 396 static const char kHello[] = "hello"; 397 char buffer[100] = { 0 }; 398 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer)); 399 Waiter waiter; 400 uint32_t context = 0; 401 402 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and 403 // connected to MP 1, port 0, which will be attached to channel 1. This leaves 404 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints. 405 406 scoped_refptr<MessagePipe> mp0(new MessagePipe( 407 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), 408 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); 409 410 // Write to MP 0, port 0. 411 EXPECT_EQ(MOJO_RESULT_OK, 412 mp0->WriteMessage(0, 413 kHello, sizeof(kHello), 414 NULL, 415 MOJO_WRITE_MESSAGE_FLAG_NONE)); 416 417 BootstrapMessagePipeNoWait(0, mp0); 418 419 420 // Close MP 0, port 0 before channel 1 is even connected. 421 mp0->Close(0); 422 423 scoped_refptr<MessagePipe> mp1(new MessagePipe( 424 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()), 425 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()))); 426 427 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do 428 // it later, it might already be readable.) 429 waiter.Init(); 430 EXPECT_EQ(MOJO_RESULT_OK, 431 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123)); 432 433 BootstrapMessagePipeNoWait(1, mp1); 434 435 // Wait. 436 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 437 EXPECT_EQ(123u, context); 438 mp1->RemoveWaiter(1, &waiter); 439 440 // Read from MP 1, port 1. 441 EXPECT_EQ(MOJO_RESULT_OK, 442 mp1->ReadMessage(1, 443 buffer, &buffer_size, 444 NULL, NULL, 445 MOJO_READ_MESSAGE_FLAG_NONE)); 446 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size)); 447 EXPECT_STREQ(kHello, buffer); 448 449 // And MP 1, port 1. 450 mp1->Close(1); 451 } 452 453 TEST_F(RemoteMessagePipeTest, HandlePassing) { 454 static const char kHello[] = "hello"; 455 Waiter waiter; 456 uint32_t context = 0; 457 458 scoped_refptr<MessagePipe> mp0(new MessagePipe( 459 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), 460 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); 461 scoped_refptr<MessagePipe> mp1(new MessagePipe( 462 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()), 463 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()))); 464 ConnectMessagePipes(mp0, mp1); 465 466 // We'll try to pass this dispatcher. 467 scoped_refptr<MessagePipeDispatcher> dispatcher(new MessagePipeDispatcher( 468 MessagePipeDispatcher::kDefaultCreateOptions)); 469 scoped_refptr<MessagePipe> local_mp(new MessagePipe()); 470 dispatcher->Init(local_mp, 0); 471 472 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do 473 // it later, it might already be readable.) 474 waiter.Init(); 475 EXPECT_EQ(MOJO_RESULT_OK, 476 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123)); 477 478 // Write to MP 0, port 0. 479 { 480 DispatcherTransport 481 transport(test::DispatcherTryStartTransport(dispatcher.get())); 482 EXPECT_TRUE(transport.is_valid()); 483 484 std::vector<DispatcherTransport> transports; 485 transports.push_back(transport); 486 EXPECT_EQ(MOJO_RESULT_OK, 487 mp0->WriteMessage(0, kHello, sizeof(kHello), &transports, 488 MOJO_WRITE_MESSAGE_FLAG_NONE)); 489 transport.End(); 490 491 // |dispatcher| should have been closed. This is |DCHECK()|ed when the 492 // |dispatcher| is destroyed. 493 EXPECT_TRUE(dispatcher->HasOneRef()); 494 dispatcher = NULL; 495 } 496 497 // Wait. 498 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 499 EXPECT_EQ(123u, context); 500 mp1->RemoveWaiter(1, &waiter); 501 502 // Read from MP 1, port 1. 503 char read_buffer[100] = { 0 }; 504 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); 505 DispatcherVector read_dispatchers; 506 uint32_t read_num_dispatchers = 10; // Maximum to get. 507 EXPECT_EQ(MOJO_RESULT_OK, 508 mp1->ReadMessage(1, read_buffer, &read_buffer_size, 509 &read_dispatchers, &read_num_dispatchers, 510 MOJO_READ_MESSAGE_FLAG_NONE)); 511 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size)); 512 EXPECT_STREQ(kHello, read_buffer); 513 EXPECT_EQ(1u, read_dispatchers.size()); 514 EXPECT_EQ(1u, read_num_dispatchers); 515 ASSERT_TRUE(read_dispatchers[0]); 516 EXPECT_TRUE(read_dispatchers[0]->HasOneRef()); 517 518 EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType()); 519 dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get()); 520 521 // Write to "local_mp", port 1. 522 EXPECT_EQ(MOJO_RESULT_OK, 523 local_mp->WriteMessage(1, kHello, sizeof(kHello), NULL, 524 MOJO_WRITE_MESSAGE_FLAG_NONE)); 525 526 // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately 527 // here. (We don't crash if I sleep and then close.) 528 529 // Wait for the dispatcher to become readable. 530 waiter.Init(); 531 EXPECT_EQ(MOJO_RESULT_OK, 532 dispatcher->AddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 456)); 533 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 534 EXPECT_EQ(456u, context); 535 dispatcher->RemoveWaiter(&waiter); 536 537 // Read from the dispatcher. 538 memset(read_buffer, 0, sizeof(read_buffer)); 539 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); 540 EXPECT_EQ(MOJO_RESULT_OK, 541 dispatcher->ReadMessage(read_buffer, &read_buffer_size, 0, NULL, 542 MOJO_READ_MESSAGE_FLAG_NONE)); 543 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size)); 544 EXPECT_STREQ(kHello, read_buffer); 545 546 // Prepare to wait on "local_mp", port 1. 547 waiter.Init(); 548 EXPECT_EQ(MOJO_RESULT_OK, 549 local_mp->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789)); 550 551 // Write to the dispatcher. 552 EXPECT_EQ(MOJO_RESULT_OK, 553 dispatcher->WriteMessage(kHello, sizeof(kHello), NULL, 554 MOJO_WRITE_MESSAGE_FLAG_NONE)); 555 556 // Wait. 557 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 558 EXPECT_EQ(789u, context); 559 local_mp->RemoveWaiter(1, &waiter); 560 561 // Read from "local_mp", port 1. 562 memset(read_buffer, 0, sizeof(read_buffer)); 563 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); 564 EXPECT_EQ(MOJO_RESULT_OK, 565 local_mp->ReadMessage(1, read_buffer, &read_buffer_size, NULL, NULL, 566 MOJO_READ_MESSAGE_FLAG_NONE)); 567 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size)); 568 EXPECT_STREQ(kHello, read_buffer); 569 570 // TODO(vtl): Also test that messages queued up before the handle was sent are 571 // delivered properly. 572 573 // Close everything that belongs to us. 574 mp0->Close(0); 575 mp1->Close(1); 576 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close()); 577 // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed. 578 local_mp->Close(1); 579 } 580 581 #if defined(OS_POSIX) 582 #define MAYBE_SharedBufferPassing SharedBufferPassing 583 #else 584 // Not yet implemented (on Windows). 585 #define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing 586 #endif 587 TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) { 588 static const char kHello[] = "hello"; 589 Waiter waiter; 590 uint32_t context = 0; 591 592 scoped_refptr<MessagePipe> mp0(new MessagePipe( 593 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), 594 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); 595 scoped_refptr<MessagePipe> mp1(new MessagePipe( 596 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()), 597 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()))); 598 ConnectMessagePipes(mp0, mp1); 599 600 // We'll try to pass this dispatcher. 601 scoped_refptr<SharedBufferDispatcher> dispatcher; 602 EXPECT_EQ(MOJO_RESULT_OK, 603 SharedBufferDispatcher::Create( 604 SharedBufferDispatcher::kDefaultCreateOptions, 100, 605 &dispatcher)); 606 ASSERT_TRUE(dispatcher); 607 608 // Make a mapping. 609 scoped_ptr<RawSharedBufferMapping> mapping0; 610 EXPECT_EQ(MOJO_RESULT_OK, 611 dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, 612 &mapping0)); 613 ASSERT_TRUE(mapping0); 614 ASSERT_TRUE(mapping0->base()); 615 ASSERT_EQ(100u, mapping0->length()); 616 static_cast<char*>(mapping0->base())[0] = 'A'; 617 static_cast<char*>(mapping0->base())[50] = 'B'; 618 static_cast<char*>(mapping0->base())[99] = 'C'; 619 620 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do 621 // it later, it might already be readable.) 622 waiter.Init(); 623 EXPECT_EQ(MOJO_RESULT_OK, 624 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123)); 625 626 // Write to MP 0, port 0. 627 { 628 DispatcherTransport 629 transport(test::DispatcherTryStartTransport(dispatcher.get())); 630 EXPECT_TRUE(transport.is_valid()); 631 632 std::vector<DispatcherTransport> transports; 633 transports.push_back(transport); 634 EXPECT_EQ(MOJO_RESULT_OK, 635 mp0->WriteMessage(0, kHello, sizeof(kHello), &transports, 636 MOJO_WRITE_MESSAGE_FLAG_NONE)); 637 transport.End(); 638 639 // |dispatcher| should have been closed. This is |DCHECK()|ed when the 640 // |dispatcher| is destroyed. 641 EXPECT_TRUE(dispatcher->HasOneRef()); 642 dispatcher = NULL; 643 } 644 645 // Wait. 646 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 647 EXPECT_EQ(123u, context); 648 mp1->RemoveWaiter(1, &waiter); 649 650 // Read from MP 1, port 1. 651 char read_buffer[100] = { 0 }; 652 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); 653 DispatcherVector read_dispatchers; 654 uint32_t read_num_dispatchers = 10; // Maximum to get. 655 EXPECT_EQ(MOJO_RESULT_OK, 656 mp1->ReadMessage(1, read_buffer, &read_buffer_size, 657 &read_dispatchers, &read_num_dispatchers, 658 MOJO_READ_MESSAGE_FLAG_NONE)); 659 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size)); 660 EXPECT_STREQ(kHello, read_buffer); 661 EXPECT_EQ(1u, read_dispatchers.size()); 662 EXPECT_EQ(1u, read_num_dispatchers); 663 ASSERT_TRUE(read_dispatchers[0]); 664 EXPECT_TRUE(read_dispatchers[0]->HasOneRef()); 665 666 EXPECT_EQ(Dispatcher::kTypeSharedBuffer, read_dispatchers[0]->GetType()); 667 dispatcher = 668 static_cast<SharedBufferDispatcher*>(read_dispatchers[0].get()); 669 670 // Make another mapping. 671 scoped_ptr<RawSharedBufferMapping> mapping1; 672 EXPECT_EQ(MOJO_RESULT_OK, 673 dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, 674 &mapping1)); 675 ASSERT_TRUE(mapping1); 676 ASSERT_TRUE(mapping1->base()); 677 ASSERT_EQ(100u, mapping1->length()); 678 EXPECT_NE(mapping1->base(), mapping0->base()); 679 EXPECT_EQ('A', static_cast<char*>(mapping1->base())[0]); 680 EXPECT_EQ('B', static_cast<char*>(mapping1->base())[50]); 681 EXPECT_EQ('C', static_cast<char*>(mapping1->base())[99]); 682 683 // Write stuff either way. 684 static_cast<char*>(mapping1->base())[1] = 'x'; 685 EXPECT_EQ('x', static_cast<char*>(mapping0->base())[1]); 686 static_cast<char*>(mapping0->base())[2] = 'y'; 687 EXPECT_EQ('y', static_cast<char*>(mapping1->base())[2]); 688 689 // Kill the first mapping; the second should still be valid. 690 mapping0.reset(); 691 EXPECT_EQ('A', static_cast<char*>(mapping1->base())[0]); 692 693 // Close everything that belongs to us. 694 mp0->Close(0); 695 mp1->Close(1); 696 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close()); 697 698 // The second mapping should still be good. 699 EXPECT_EQ('x', static_cast<char*>(mapping1->base())[1]); 700 } 701 702 #if defined(OS_POSIX) 703 #define MAYBE_PlatformHandlePassing PlatformHandlePassing 704 #else 705 // Not yet implemented (on Windows). 706 #define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing 707 #endif 708 TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) { 709 static const char kHello[] = "hello"; 710 static const char kWorld[] = "world"; 711 Waiter waiter; 712 uint32_t context = 0; 713 714 scoped_refptr<MessagePipe> mp0(new MessagePipe( 715 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), 716 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); 717 scoped_refptr<MessagePipe> mp1(new MessagePipe( 718 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()), 719 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()))); 720 ConnectMessagePipes(mp0, mp1); 721 722 base::FilePath unused; 723 base::ScopedFILE fp(CreateAndOpenTemporaryFile(&unused)); 724 EXPECT_EQ(sizeof(kHello), fwrite(kHello, 1, sizeof(kHello), fp.get())); 725 // We'll try to pass this dispatcher, which will cause a |PlatformHandle| to 726 // be passed. 727 scoped_refptr<PlatformHandleDispatcher> dispatcher( 728 new PlatformHandleDispatcher( 729 mojo::test::PlatformHandleFromFILE(fp.Pass()))); 730 731 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do 732 // it later, it might already be readable.) 733 waiter.Init(); 734 EXPECT_EQ(MOJO_RESULT_OK, 735 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123)); 736 737 // Write to MP 0, port 0. 738 { 739 DispatcherTransport 740 transport(test::DispatcherTryStartTransport(dispatcher.get())); 741 EXPECT_TRUE(transport.is_valid()); 742 743 std::vector<DispatcherTransport> transports; 744 transports.push_back(transport); 745 EXPECT_EQ(MOJO_RESULT_OK, 746 mp0->WriteMessage(0, kWorld, sizeof(kWorld), &transports, 747 MOJO_WRITE_MESSAGE_FLAG_NONE)); 748 transport.End(); 749 750 // |dispatcher| should have been closed. This is |DCHECK()|ed when the 751 // |dispatcher| is destroyed. 752 EXPECT_TRUE(dispatcher->HasOneRef()); 753 dispatcher = NULL; 754 } 755 756 // Wait. 757 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 758 EXPECT_EQ(123u, context); 759 mp1->RemoveWaiter(1, &waiter); 760 761 // Read from MP 1, port 1. 762 char read_buffer[100] = { 0 }; 763 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); 764 DispatcherVector read_dispatchers; 765 uint32_t read_num_dispatchers = 10; // Maximum to get. 766 EXPECT_EQ(MOJO_RESULT_OK, 767 mp1->ReadMessage(1, read_buffer, &read_buffer_size, 768 &read_dispatchers, &read_num_dispatchers, 769 MOJO_READ_MESSAGE_FLAG_NONE)); 770 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size)); 771 EXPECT_STREQ(kWorld, read_buffer); 772 EXPECT_EQ(1u, read_dispatchers.size()); 773 EXPECT_EQ(1u, read_num_dispatchers); 774 ASSERT_TRUE(read_dispatchers[0]); 775 EXPECT_TRUE(read_dispatchers[0]->HasOneRef()); 776 777 EXPECT_EQ(Dispatcher::kTypePlatformHandle, read_dispatchers[0]->GetType()); 778 dispatcher = 779 static_cast<PlatformHandleDispatcher*>(read_dispatchers[0].get()); 780 781 embedder::ScopedPlatformHandle h = dispatcher->PassPlatformHandle().Pass(); 782 EXPECT_TRUE(h.is_valid()); 783 784 fp = mojo::test::FILEFromPlatformHandle(h.Pass(), "rb").Pass(); 785 EXPECT_FALSE(h.is_valid()); 786 EXPECT_TRUE(fp); 787 788 rewind(fp.get()); 789 memset(read_buffer, 0, sizeof(read_buffer)); 790 EXPECT_EQ(sizeof(kHello), 791 fread(read_buffer, 1, sizeof(read_buffer), fp.get())); 792 EXPECT_STREQ(kHello, read_buffer); 793 794 // Close everything that belongs to us. 795 mp0->Close(0); 796 mp1->Close(1); 797 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close()); 798 } 799 800 // Test racing closes (on each end). 801 // Note: A flaky failure would almost certainly indicate a problem in the code 802 // itself (not in the test). Also, any logged warnings/errors would also 803 // probably be indicative of bugs. 804 TEST_F(RemoteMessagePipeTest, RacingClosesStress) { 805 base::TimeDelta delay = base::TimeDelta::FromMilliseconds(5); 806 807 for (unsigned i = 0; i < 256; i++) { 808 DVLOG(2) << "---------------------------------------- " << i; 809 scoped_refptr<MessagePipe> mp0(new MessagePipe( 810 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), 811 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); 812 BootstrapMessagePipeNoWait(0, mp0); 813 814 scoped_refptr<MessagePipe> mp1(new MessagePipe( 815 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()), 816 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()))); 817 BootstrapMessagePipeNoWait(1, mp1); 818 819 if (i & 1u) { 820 io_thread()->task_runner()->PostTask( 821 FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay)); 822 } 823 if (i & 2u) 824 base::PlatformThread::Sleep(delay); 825 826 mp0->Close(0); 827 828 if (i & 4u) { 829 io_thread()->task_runner()->PostTask( 830 FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay)); 831 } 832 if (i & 8u) 833 base::PlatformThread::Sleep(delay); 834 835 mp1->Close(1); 836 837 RestoreInitialState(); 838 } 839 } 840 841 } // namespace 842 } // namespace system 843 } // namespace mojo 844