1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/system/message_pipe_test_utils.h" 6 7 #include "base/bind.h" 8 #include "base/threading/platform_thread.h" // For |Sleep()|. 9 #include "mojo/system/channel.h" 10 #include "mojo/system/channel_endpoint.h" 11 #include "mojo/system/message_pipe.h" 12 #include "mojo/system/waiter.h" 13 14 namespace mojo { 15 namespace system { 16 namespace test { 17 18 MojoResult WaitIfNecessary(scoped_refptr<MessagePipe> mp, 19 MojoHandleSignals signals, 20 HandleSignalsState* signals_state) { 21 Waiter waiter; 22 waiter.Init(); 23 24 MojoResult add_result = mp->AddWaiter(0, &waiter, signals, 0, signals_state); 25 if (add_result != MOJO_RESULT_OK) { 26 return (add_result == MOJO_RESULT_ALREADY_EXISTS) ? MOJO_RESULT_OK 27 : add_result; 28 } 29 30 MojoResult wait_result = waiter.Wait(MOJO_DEADLINE_INDEFINITE, nullptr); 31 mp->RemoveWaiter(0, &waiter, signals_state); 32 return wait_result; 33 } 34 35 ChannelThread::ChannelThread(embedder::PlatformSupport* platform_support) 36 : platform_support_(platform_support), 37 test_io_thread_(base::TestIOThread::kManualStart) { 38 } 39 40 ChannelThread::~ChannelThread() { 41 Stop(); 42 } 43 44 void ChannelThread::Start(embedder::ScopedPlatformHandle platform_handle, 45 scoped_refptr<ChannelEndpoint> channel_endpoint) { 46 test_io_thread_.Start(); 47 test_io_thread_.PostTaskAndWait( 48 FROM_HERE, 49 base::Bind(&ChannelThread::InitChannelOnIOThread, 50 base::Unretained(this), 51 base::Passed(&platform_handle), 52 channel_endpoint)); 53 } 54 55 void ChannelThread::Stop() { 56 if (channel_.get()) { 57 // Hack to flush write buffers before quitting. 58 // TODO(vtl): Remove this once |Channel| has a 59 // |FlushWriteBufferAndShutdown()| (or whatever). 60 while (!channel_->IsWriteBufferEmpty()) 61 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20)); 62 63 test_io_thread_.PostTaskAndWait( 64 FROM_HERE, 65 base::Bind(&ChannelThread::ShutdownChannelOnIOThread, 66 base::Unretained(this))); 67 } 68 test_io_thread_.Stop(); 69 } 70 71 void ChannelThread::InitChannelOnIOThread( 72 embedder::ScopedPlatformHandle platform_handle, 73 scoped_refptr<ChannelEndpoint> channel_endpoint) { 74 CHECK_EQ(base::MessageLoop::current(), test_io_thread_.message_loop()); 75 CHECK(platform_handle.is_valid()); 76 77 // Create and initialize |Channel|. 78 channel_ = new Channel(platform_support_); 79 CHECK(channel_->Init(RawChannel::Create(platform_handle.Pass()))); 80 81 // Attach the message pipe endpoint. 82 // Note: On the "server" (parent process) side, we need not attach the 83 // message pipe endpoint immediately. However, on the "client" (child 84 // process) side, this *must* be done here -- otherwise, the |Channel| may 85 // receive/process messages (which it can do as soon as it's hooked up to 86 // the IO thread message loop, and that message loop runs) before the 87 // message pipe endpoint is attached. 88 CHECK_EQ(channel_->AttachEndpoint(channel_endpoint), 89 Channel::kBootstrapEndpointId); 90 CHECK(channel_->RunMessagePipeEndpoint(Channel::kBootstrapEndpointId, 91 Channel::kBootstrapEndpointId)); 92 } 93 94 void ChannelThread::ShutdownChannelOnIOThread() { 95 CHECK(channel_.get()); 96 channel_->Shutdown(); 97 channel_ = nullptr; 98 } 99 100 #if !defined(OS_IOS) 101 MultiprocessMessagePipeTestBase::MultiprocessMessagePipeTestBase() 102 : channel_thread_(&platform_support_) { 103 } 104 105 MultiprocessMessagePipeTestBase::~MultiprocessMessagePipeTestBase() { 106 } 107 108 void MultiprocessMessagePipeTestBase::Init(scoped_refptr<ChannelEndpoint> ep) { 109 channel_thread_.Start(helper_.server_platform_handle.Pass(), ep); 110 } 111 #endif 112 113 } // namespace test 114 } // namespace system 115 } // namespace mojo 116