1 // Copyright 2017 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/public/cpp/system/wait_set.h" 6 7 #include <set> 8 #include <vector> 9 10 #include "base/bind.h" 11 #include "base/callback.h" 12 #include "base/memory/ptr_util.h" 13 #include "base/synchronization/waitable_event.h" 14 #include "base/threading/platform_thread.h" 15 #include "base/threading/simple_thread.h" 16 #include "mojo/public/cpp/system/message_pipe.h" 17 #include "mojo/public/cpp/system/wait.h" 18 #include "testing/gtest/include/gtest/gtest.h" 19 20 namespace mojo { 21 namespace { 22 23 using WaitSetTest = testing::Test; 24 25 void WriteMessage(const ScopedMessagePipeHandle& handle, 26 const base::StringPiece& message) { 27 MojoResult rv = WriteMessageRaw(handle.get(), message.data(), 28 static_cast<uint32_t>(message.size()), 29 nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE); 30 CHECK_EQ(MOJO_RESULT_OK, rv); 31 } 32 33 std::string ReadMessage(const ScopedMessagePipeHandle& handle) { 34 std::vector<uint8_t> bytes; 35 MojoResult rv = ReadMessageRaw(handle.get(), &bytes, nullptr, 36 MOJO_READ_MESSAGE_FLAG_NONE); 37 CHECK_EQ(MOJO_RESULT_OK, rv); 38 return std::string(bytes.begin(), bytes.end()); 39 } 40 41 class ThreadedRunner : public base::SimpleThread { 42 public: 43 explicit ThreadedRunner(const base::Closure& callback) 44 : SimpleThread("ThreadedRunner"), callback_(callback) {} 45 ~ThreadedRunner() override { Join(); } 46 47 void Run() override { callback_.Run(); } 48 49 private: 50 const base::Closure callback_; 51 52 DISALLOW_COPY_AND_ASSIGN(ThreadedRunner); 53 }; 54 55 TEST_F(WaitSetTest, Satisfied) { 56 WaitSet wait_set; 57 MessagePipe p; 58 59 const char kTestMessage1[] = "hello wake up"; 60 61 // Watch only one handle and write to the other. 62 63 wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE); 64 WriteMessage(p.handle0, kTestMessage1); 65 66 size_t num_ready_handles = 2; 67 Handle ready_handles[2]; 68 MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN}; 69 HandleSignalsState hss[2]; 70 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss); 71 72 EXPECT_EQ(1u, num_ready_handles); 73 EXPECT_EQ(p.handle1.get(), ready_handles[0]); 74 EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]); 75 EXPECT_TRUE(hss[0].readable() && hss[0].writable() && !hss[0].peer_closed()); 76 77 wait_set.RemoveHandle(p.handle1.get()); 78 79 // Now watch only the other handle and write to the first one. 80 81 wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE); 82 WriteMessage(p.handle1, kTestMessage1); 83 84 num_ready_handles = 2; 85 ready_results[0] = MOJO_RESULT_UNKNOWN; 86 ready_results[1] = MOJO_RESULT_UNKNOWN; 87 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss); 88 89 EXPECT_EQ(1u, num_ready_handles); 90 EXPECT_EQ(p.handle0.get(), ready_handles[0]); 91 EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]); 92 EXPECT_TRUE(hss[0].readable() && hss[0].writable() && !hss[0].peer_closed()); 93 94 // Now wait on both of them. 95 wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE); 96 97 num_ready_handles = 2; 98 ready_results[0] = MOJO_RESULT_UNKNOWN; 99 ready_results[1] = MOJO_RESULT_UNKNOWN; 100 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss); 101 EXPECT_EQ(2u, num_ready_handles); 102 EXPECT_TRUE((ready_handles[0] == p.handle0.get() && 103 ready_handles[1] == p.handle1.get()) || 104 (ready_handles[0] == p.handle1.get() && 105 ready_handles[1] == p.handle0.get())); 106 EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]); 107 EXPECT_EQ(MOJO_RESULT_OK, ready_results[1]); 108 EXPECT_TRUE(hss[0].readable() && hss[0].writable() && !hss[0].peer_closed()); 109 EXPECT_TRUE(hss[1].readable() && hss[1].writable() && !hss[1].peer_closed()); 110 111 // Wait on both again, but with only enough output space for one result. 112 num_ready_handles = 1; 113 ready_results[0] = MOJO_RESULT_UNKNOWN; 114 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss); 115 EXPECT_EQ(1u, num_ready_handles); 116 EXPECT_TRUE(ready_handles[0] == p.handle0.get() || 117 ready_handles[0] == p.handle1.get()); 118 EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]); 119 120 // Remove the ready handle from the set and wait one more time. 121 EXPECT_EQ(MOJO_RESULT_OK, wait_set.RemoveHandle(ready_handles[0])); 122 123 num_ready_handles = 1; 124 ready_results[0] = MOJO_RESULT_UNKNOWN; 125 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss); 126 EXPECT_EQ(1u, num_ready_handles); 127 EXPECT_TRUE(ready_handles[0] == p.handle0.get() || 128 ready_handles[0] == p.handle1.get()); 129 EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]); 130 131 EXPECT_EQ(MOJO_RESULT_OK, wait_set.RemoveHandle(ready_handles[0])); 132 133 // The wait set should be empty now. Nothing to wait on. 134 num_ready_handles = 2; 135 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results); 136 EXPECT_EQ(0u, num_ready_handles); 137 } 138 139 TEST_F(WaitSetTest, Unsatisfiable) { 140 MessagePipe p, q; 141 WaitSet wait_set; 142 143 wait_set.AddHandle(q.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE); 144 wait_set.AddHandle(q.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE); 145 wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE); 146 147 size_t num_ready_handles = 2; 148 Handle ready_handles[2]; 149 MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN}; 150 151 p.handle1.reset(); 152 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results); 153 EXPECT_EQ(1u, num_ready_handles); 154 EXPECT_EQ(p.handle0.get(), ready_handles[0]); 155 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ready_results[0]); 156 } 157 158 TEST_F(WaitSetTest, CloseWhileWaiting) { 159 MessagePipe p; 160 WaitSet wait_set; 161 162 wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE); 163 164 const Handle handle0_value = p.handle0.get(); 165 ThreadedRunner close_after_delay(base::Bind( 166 [](ScopedMessagePipeHandle* handle) { 167 // Wait a little while, then close the handle. 168 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200)); 169 handle->reset(); 170 }, 171 &p.handle0)); 172 close_after_delay.Start(); 173 174 size_t num_ready_handles = 2; 175 Handle ready_handles[2]; 176 MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN}; 177 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results); 178 EXPECT_EQ(1u, num_ready_handles); 179 EXPECT_EQ(handle0_value, ready_handles[0]); 180 EXPECT_EQ(MOJO_RESULT_CANCELLED, ready_results[0]); 181 182 EXPECT_EQ(MOJO_RESULT_NOT_FOUND, wait_set.RemoveHandle(handle0_value)); 183 } 184 185 TEST_F(WaitSetTest, CloseBeforeWaiting) { 186 MessagePipe p; 187 WaitSet wait_set; 188 189 wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE); 190 wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE); 191 192 Handle handle0_value = p.handle0.get(); 193 Handle handle1_value = p.handle1.get(); 194 195 p.handle0.reset(); 196 p.handle1.reset(); 197 198 // Ensure that the WaitSet user is always made aware of all cancellations even 199 // if they happen while not waiting, or they have to be returned over the span 200 // of multiple Wait() calls due to insufficient output storage. 201 202 size_t num_ready_handles = 1; 203 Handle ready_handle; 204 MojoResult ready_result = MOJO_RESULT_UNKNOWN; 205 wait_set.Wait(nullptr, &num_ready_handles, &ready_handle, &ready_result); 206 EXPECT_EQ(1u, num_ready_handles); 207 EXPECT_TRUE(ready_handle == handle0_value || ready_handle == handle1_value); 208 EXPECT_EQ(MOJO_RESULT_CANCELLED, ready_result); 209 EXPECT_EQ(MOJO_RESULT_NOT_FOUND, wait_set.RemoveHandle(handle0_value)); 210 211 wait_set.Wait(nullptr, &num_ready_handles, &ready_handle, &ready_result); 212 EXPECT_EQ(1u, num_ready_handles); 213 EXPECT_TRUE(ready_handle == handle0_value || ready_handle == handle1_value); 214 EXPECT_EQ(MOJO_RESULT_CANCELLED, ready_result); 215 EXPECT_EQ(MOJO_RESULT_NOT_FOUND, wait_set.RemoveHandle(handle0_value)); 216 217 // Nothing more to wait on. 218 wait_set.Wait(nullptr, &num_ready_handles, &ready_handle, &ready_result); 219 EXPECT_EQ(0u, num_ready_handles); 220 } 221 222 TEST_F(WaitSetTest, SatisfiedThenUnsatisfied) { 223 MessagePipe p; 224 WaitSet wait_set; 225 226 wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE); 227 wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE); 228 229 const char kTestMessage1[] = "testing testing testing"; 230 WriteMessage(p.handle0, kTestMessage1); 231 232 size_t num_ready_handles = 2; 233 Handle ready_handles[2]; 234 MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN}; 235 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results); 236 EXPECT_EQ(1u, num_ready_handles); 237 EXPECT_EQ(p.handle1.get(), ready_handles[0]); 238 EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]); 239 240 EXPECT_EQ(kTestMessage1, ReadMessage(p.handle1)); 241 242 ThreadedRunner write_after_delay(base::Bind( 243 [](ScopedMessagePipeHandle* handle) { 244 // Wait a little while, then write a message. 245 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200)); 246 WriteMessage(*handle, "wakey wakey"); 247 }, 248 &p.handle1)); 249 write_after_delay.Start(); 250 251 num_ready_handles = 2; 252 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results); 253 EXPECT_EQ(1u, num_ready_handles); 254 EXPECT_EQ(p.handle0.get(), ready_handles[0]); 255 EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]); 256 } 257 258 TEST_F(WaitSetTest, EventOnly) { 259 base::WaitableEvent event(base::WaitableEvent::ResetPolicy::MANUAL, 260 base::WaitableEvent::InitialState::SIGNALED); 261 WaitSet wait_set; 262 wait_set.AddEvent(&event); 263 264 base::WaitableEvent* ready_event = nullptr; 265 size_t num_ready_handles = 1; 266 Handle ready_handle; 267 MojoResult ready_result = MOJO_RESULT_UNKNOWN; 268 wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle, &ready_result); 269 EXPECT_EQ(0u, num_ready_handles); 270 EXPECT_EQ(&event, ready_event); 271 } 272 273 TEST_F(WaitSetTest, EventAndHandle) { 274 const char kTestMessage[] = "hello hello"; 275 276 MessagePipe p; 277 WriteMessage(p.handle0, kTestMessage); 278 279 base::WaitableEvent event(base::WaitableEvent::ResetPolicy::MANUAL, 280 base::WaitableEvent::InitialState::NOT_SIGNALED); 281 282 WaitSet wait_set; 283 wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE); 284 wait_set.AddEvent(&event); 285 286 base::WaitableEvent* ready_event = nullptr; 287 size_t num_ready_handles = 1; 288 Handle ready_handle; 289 MojoResult ready_result = MOJO_RESULT_UNKNOWN; 290 wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle, &ready_result); 291 EXPECT_EQ(1u, num_ready_handles); 292 EXPECT_EQ(nullptr, ready_event); 293 EXPECT_EQ(p.handle1.get(), ready_handle); 294 EXPECT_EQ(MOJO_RESULT_OK, ready_result); 295 296 EXPECT_EQ(kTestMessage, ReadMessage(p.handle1)); 297 298 ThreadedRunner signal_after_delay(base::Bind( 299 [](base::WaitableEvent* event) { 300 // Wait a little while, then close the handle. 301 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200)); 302 event->Signal(); 303 }, 304 &event)); 305 signal_after_delay.Start(); 306 307 wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle, &ready_result); 308 EXPECT_EQ(0u, num_ready_handles); 309 EXPECT_EQ(&event, ready_event); 310 } 311 312 TEST_F(WaitSetTest, NoStarvation) { 313 const char kTestMessage[] = "wait for it"; 314 const size_t kNumTestPipes = 50; 315 const size_t kNumTestEvents = 10; 316 317 // Create a bunch of handles and events which are always ready and add them 318 // to a shared WaitSet. 319 320 WaitSet wait_set; 321 322 MessagePipe pipes[kNumTestPipes]; 323 for (size_t i = 0; i < kNumTestPipes; ++i) { 324 WriteMessage(pipes[i].handle0, kTestMessage); 325 Wait(pipes[i].handle1.get(), MOJO_HANDLE_SIGNAL_READABLE); 326 327 WriteMessage(pipes[i].handle1, kTestMessage); 328 Wait(pipes[i].handle0.get(), MOJO_HANDLE_SIGNAL_READABLE); 329 330 wait_set.AddHandle(pipes[i].handle0.get(), MOJO_HANDLE_SIGNAL_READABLE); 331 wait_set.AddHandle(pipes[i].handle1.get(), MOJO_HANDLE_SIGNAL_READABLE); 332 } 333 334 std::vector<std::unique_ptr<base::WaitableEvent>> events(kNumTestEvents); 335 for (auto& event_ptr : events) { 336 event_ptr = std::make_unique<base::WaitableEvent>( 337 base::WaitableEvent::ResetPolicy::MANUAL, 338 base::WaitableEvent::InitialState::NOT_SIGNALED); 339 event_ptr->Signal(); 340 wait_set.AddEvent(event_ptr.get()); 341 } 342 343 // Now verify that all handle and event signals are deteceted within a finite 344 // number of consecutive Wait() calls. Do it a few times for good measure. 345 for (size_t i = 0; i < 3; ++i) { 346 std::set<base::WaitableEvent*> ready_events; 347 std::set<Handle> ready_handles; 348 while (ready_events.size() < kNumTestEvents || 349 ready_handles.size() < kNumTestPipes * 2) { 350 base::WaitableEvent* ready_event = nullptr; 351 size_t num_ready_handles = 1; 352 Handle ready_handle; 353 MojoResult ready_result = MOJO_RESULT_UNKNOWN; 354 wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle, 355 &ready_result); 356 if (ready_event) 357 ready_events.insert(ready_event); 358 359 if (num_ready_handles) { 360 EXPECT_EQ(1u, num_ready_handles); 361 EXPECT_EQ(MOJO_RESULT_OK, ready_result); 362 ready_handles.insert(ready_handle); 363 } 364 } 365 } 366 } 367 368 } // namespace 369 } // namespace mojo 370