1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 // TODO(vtl): Enable this on non-POSIX once we have a non-POSIX implementation. 6 #include "build/build_config.h" 7 #if defined(OS_POSIX) 8 9 #include <stdint.h> 10 11 #include <string> 12 13 #include "base/basictypes.h" 14 #include "base/bind.h" 15 #include "base/callback.h" 16 #include "base/location.h" 17 #include "base/logging.h" 18 #include "base/message_loop/message_loop.h" 19 #include "base/threading/thread.h" 20 #include "mojo/common/test/multiprocess_test_base.h" 21 #include "mojo/system/channel.h" 22 #include "mojo/system/local_message_pipe_endpoint.h" 23 #include "mojo/system/message_pipe.h" 24 #include "mojo/system/platform_channel.h" 25 #include "mojo/system/proxy_message_pipe_endpoint.h" 26 #include "mojo/system/test_utils.h" 27 #include "mojo/system/waiter.h" 28 29 namespace mojo { 30 namespace system { 31 namespace { 32 33 class IOThreadWrapper { 34 public: 35 IOThreadWrapper() : io_thread_("io_thread") {} 36 ~IOThreadWrapper() { 37 CHECK(!channel_.get()); 38 CHECK(!io_thread_.IsRunning()); 39 } 40 41 void PostTask(const tracked_objects::Location& from_here, 42 const base::Closure& task) { 43 task_runner()->PostTask(from_here, task); 44 } 45 46 void PostTaskAndWait(const tracked_objects::Location& from_here, 47 const base::Closure& task) { 48 test::PostTaskAndWait(task_runner(), from_here, task); 49 } 50 51 void Init(PlatformChannel* platform_channel, scoped_refptr<MessagePipe> mp) { 52 io_thread_.StartWithOptions( 53 base::Thread::Options(base::MessageLoop::TYPE_IO, 0)); 54 PostTask(FROM_HERE, 55 base::Bind(&IOThreadWrapper::InitOnIOThread, 56 base::Unretained(this), 57 platform_channel, mp)); 58 } 59 60 void Shutdown() { 61 PostTaskAndWait(FROM_HERE, 62 base::Bind(&IOThreadWrapper::ShutdownOnIOThread, 63 base::Unretained(this))); 64 io_thread_.Stop(); 65 } 66 67 bool is_initialized() const { return !!channel_.get(); } 68 69 base::MessageLoop* message_loop() { 70 return io_thread_.message_loop(); 71 } 72 73 scoped_refptr<base::TaskRunner> task_runner() { 74 return message_loop()->message_loop_proxy(); 75 } 76 77 private: 78 void InitOnIOThread(PlatformChannel* platform_channel, 79 scoped_refptr<MessagePipe> mp) { 80 CHECK_EQ(base::MessageLoop::current(), message_loop()); 81 CHECK(platform_channel); 82 CHECK(platform_channel->is_valid()); 83 84 // Create and initialize |Channel|. 85 channel_ = new Channel(); 86 CHECK(channel_->Init(platform_channel->PassHandle())); 87 88 // Attach the message pipe endpoint. 89 // Note: On the "server" (parent process) side, we need not attach the 90 // message pipe endpoint immediately. However, on the "client" (child 91 // process) side, this *must* be done here -- otherwise, the |Channel| may 92 // receive/process messages (which it can do as soon as it's hooked up to 93 // the IO thread message loop, and that message loop runs) before the 94 // message pipe endpoint is attached. 95 CHECK_EQ(channel_->AttachMessagePipeEndpoint(mp, 1), 96 Channel::kBootstrapEndpointId); 97 channel_->RunMessagePipeEndpoint(Channel::kBootstrapEndpointId, 98 Channel::kBootstrapEndpointId); 99 } 100 101 void ShutdownOnIOThread() { 102 CHECK(channel_.get()); 103 channel_->Shutdown(); 104 channel_ = NULL; 105 } 106 107 base::Thread io_thread_; 108 scoped_refptr<Channel> channel_; 109 110 DISALLOW_COPY_AND_ASSIGN(IOThreadWrapper); 111 }; 112 113 class MultiprocessMessagePipeTest : public mojo::test::MultiprocessTestBase { 114 public: 115 MultiprocessMessagePipeTest() {} 116 virtual ~MultiprocessMessagePipeTest() {} 117 118 virtual void TearDown() OVERRIDE { 119 if (io_thread_wrapper_.is_initialized()) 120 io_thread_wrapper_.Shutdown(); 121 mojo::test::MultiprocessTestBase::TearDown(); 122 } 123 124 void Init(scoped_refptr<MessagePipe> mp) { 125 io_thread_wrapper_.Init(platform_server_channel.get(), mp); 126 } 127 128 private: 129 IOThreadWrapper io_thread_wrapper_; 130 131 DISALLOW_COPY_AND_ASSIGN(MultiprocessMessagePipeTest); 132 }; 133 134 MojoResult WaitIfNecessary(scoped_refptr<MessagePipe> mp, MojoWaitFlags flags) { 135 Waiter waiter; 136 waiter.Init(); 137 138 MojoResult add_result = mp->AddWaiter(0, &waiter, flags, MOJO_RESULT_OK); 139 if (add_result != MOJO_RESULT_OK) { 140 return (add_result == MOJO_RESULT_ALREADY_EXISTS) ? MOJO_RESULT_OK : 141 add_result; 142 } 143 144 MojoResult wait_result = waiter.Wait(MOJO_DEADLINE_INDEFINITE); 145 mp->RemoveWaiter(0, &waiter); 146 return wait_result; 147 } 148 149 // For each message received, sends a reply message with the same contents 150 // repeated twice, until the other end is closed or it receives "quitquitquit" 151 // (which it doesn't reply to). It'll return the number of messages received, 152 // not including any "quitquitquit" message, modulo 100. 153 MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho) { 154 IOThreadWrapper io_thread_wrapper; 155 PlatformClientChannel* const platform_client_channel = 156 MultiprocessMessagePipeTest::platform_client_channel.get(); 157 CHECK(platform_client_channel); 158 CHECK(platform_client_channel->is_valid()); 159 scoped_refptr<MessagePipe> mp(new MessagePipe( 160 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), 161 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); 162 io_thread_wrapper.Init(platform_client_channel, mp); 163 164 const std::string quitquitquit("quitquitquit"); 165 int rv = 0; 166 for (;; rv = (rv + 1) % 100) { 167 // Wait for our end of the message pipe to be readable. 168 MojoResult result = WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE); 169 if (result != MOJO_RESULT_OK) { 170 // It was closed, probably. 171 CHECK_EQ(result, MOJO_RESULT_FAILED_PRECONDITION); 172 break; 173 } 174 175 std::string read_buffer(1000, '\0'); 176 uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size()); 177 CHECK_EQ(mp->ReadMessage(0, 178 &read_buffer[0], &read_buffer_size, 179 NULL, NULL, 180 MOJO_READ_MESSAGE_FLAG_NONE), 181 MOJO_RESULT_OK); 182 read_buffer.resize(read_buffer_size); 183 VLOG(2) << "Child got: " << read_buffer; 184 185 if (read_buffer == quitquitquit) { 186 VLOG(2) << "Child quitting."; 187 break; 188 } 189 190 std::string write_buffer = read_buffer + read_buffer; 191 CHECK_EQ(mp->WriteMessage(0, 192 write_buffer.data(), 193 static_cast<uint32_t>(write_buffer.size()), 194 NULL, 195 MOJO_WRITE_MESSAGE_FLAG_NONE), 196 MOJO_RESULT_OK); 197 } 198 199 200 mp->Close(0); 201 io_thread_wrapper.Shutdown(); 202 return rv; 203 } 204 205 // Sends "hello" to child, and expects "hellohello" back. 206 TEST_F(MultiprocessMessagePipeTest, Basic) { 207 StartChild("EchoEcho"); 208 209 scoped_refptr<MessagePipe> mp(new MessagePipe( 210 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), 211 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); 212 Init(mp); 213 214 std::string hello("hello"); 215 EXPECT_EQ(MOJO_RESULT_OK, 216 mp->WriteMessage(0, 217 hello.data(), static_cast<uint32_t>(hello.size()), 218 NULL, 219 MOJO_WRITE_MESSAGE_FLAG_NONE)); 220 221 EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE)); 222 223 std::string read_buffer(1000, '\0'); 224 uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size()); 225 CHECK_EQ(mp->ReadMessage(0, 226 &read_buffer[0], &read_buffer_size, 227 NULL, NULL, 228 MOJO_READ_MESSAGE_FLAG_NONE), 229 MOJO_RESULT_OK); 230 read_buffer.resize(read_buffer_size); 231 VLOG(2) << "Parent got: " << read_buffer; 232 EXPECT_EQ(hello + hello, read_buffer); 233 234 mp->Close(0); 235 236 // We sent one message. 237 EXPECT_EQ(1 % 100, WaitForChildShutdown()); 238 } 239 240 // Sends a bunch of messages to the child. Expects them "repeated" back. Waits 241 // for the child to close its end before quitting. 242 TEST_F(MultiprocessMessagePipeTest, QueueMessages) { 243 StartChild("EchoEcho"); 244 245 scoped_refptr<MessagePipe> mp(new MessagePipe( 246 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), 247 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); 248 Init(mp); 249 250 static const size_t kNumMessages = 1001; 251 for (size_t i = 0; i < kNumMessages; i++) { 252 std::string write_buffer(i, 'A' + (i % 26)); 253 EXPECT_EQ(MOJO_RESULT_OK, 254 mp->WriteMessage(0, 255 write_buffer.data(), 256 static_cast<uint32_t>(write_buffer.size()), 257 NULL, 258 MOJO_WRITE_MESSAGE_FLAG_NONE)); 259 } 260 261 const std::string quitquitquit("quitquitquit"); 262 EXPECT_EQ(MOJO_RESULT_OK, 263 mp->WriteMessage(0, 264 quitquitquit.data(), 265 static_cast<uint32_t>(quitquitquit.size()), 266 NULL, 267 MOJO_WRITE_MESSAGE_FLAG_NONE)); 268 269 for (size_t i = 0; i < kNumMessages; i++) { 270 EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE)); 271 272 std::string read_buffer(kNumMessages * 2, '\0'); 273 uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size()); 274 CHECK_EQ(mp->ReadMessage(0, 275 &read_buffer[0], &read_buffer_size, 276 NULL, NULL, 277 MOJO_READ_MESSAGE_FLAG_NONE), 278 MOJO_RESULT_OK); 279 read_buffer.resize(read_buffer_size); 280 281 EXPECT_EQ(std::string(i * 2, 'A' + (i % 26)), read_buffer); 282 } 283 284 // Wait for it to become readable, which should fail (since we sent 285 // "quitquitquit"). 286 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 287 WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE)); 288 289 mp->Close(0); 290 291 EXPECT_EQ(static_cast<int>(kNumMessages % 100), WaitForChildShutdown()); 292 } 293 294 } // namespace 295 } // namespace system 296 } // namespace mojo 297 298 #endif // defined(OS_POSIX) 299