1 /* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "webrtc/base/asyncinvoker.h" 12 #include "webrtc/base/asyncudpsocket.h" 13 #include "webrtc/base/event.h" 14 #include "webrtc/base/gunit.h" 15 #include "webrtc/base/physicalsocketserver.h" 16 #include "webrtc/base/socketaddress.h" 17 #include "webrtc/base/thread.h" 18 #include "webrtc/test/testsupport/gtest_disable.h" 19 20 #if defined(WEBRTC_WIN) 21 #include <comdef.h> // NOLINT 22 #endif 23 24 using namespace rtc; 25 26 // Generates a sequence of numbers (collaboratively). 27 class TestGenerator { 28 public: 29 TestGenerator() : last(0), count(0) {} 30 31 int Next(int prev) { 32 int result = prev + last; 33 last = result; 34 count += 1; 35 return result; 36 } 37 38 int last; 39 int count; 40 }; 41 42 struct TestMessage : public MessageData { 43 explicit TestMessage(int v) : value(v) {} 44 virtual ~TestMessage() {} 45 46 int value; 47 }; 48 49 // Receives on a socket and sends by posting messages. 50 class SocketClient : public TestGenerator, public sigslot::has_slots<> { 51 public: 52 SocketClient(AsyncSocket* socket, const SocketAddress& addr, 53 Thread* post_thread, MessageHandler* phandler) 54 : socket_(AsyncUDPSocket::Create(socket, addr)), 55 post_thread_(post_thread), 56 post_handler_(phandler) { 57 socket_->SignalReadPacket.connect(this, &SocketClient::OnPacket); 58 } 59 60 ~SocketClient() { 61 delete socket_; 62 } 63 64 SocketAddress address() const { return socket_->GetLocalAddress(); } 65 66 void OnPacket(AsyncPacketSocket* socket, const char* buf, size_t size, 67 const SocketAddress& remote_addr, 68 const PacketTime& packet_time) { 69 EXPECT_EQ(size, sizeof(uint32)); 70 uint32 prev = reinterpret_cast<const uint32*>(buf)[0]; 71 uint32 result = Next(prev); 72 73 post_thread_->PostDelayed(200, post_handler_, 0, new TestMessage(result)); 74 } 75 76 private: 77 AsyncUDPSocket* socket_; 78 Thread* post_thread_; 79 MessageHandler* post_handler_; 80 }; 81 82 // Receives messages and sends on a socket. 83 class MessageClient : public MessageHandler, public TestGenerator { 84 public: 85 MessageClient(Thread* pth, Socket* socket) 86 : socket_(socket) { 87 } 88 89 virtual ~MessageClient() { 90 delete socket_; 91 } 92 93 virtual void OnMessage(Message *pmsg) { 94 TestMessage* msg = static_cast<TestMessage*>(pmsg->pdata); 95 int result = Next(msg->value); 96 EXPECT_GE(socket_->Send(&result, sizeof(result)), 0); 97 delete msg; 98 } 99 100 private: 101 Socket* socket_; 102 }; 103 104 class CustomThread : public rtc::Thread { 105 public: 106 CustomThread() {} 107 virtual ~CustomThread() { Stop(); } 108 bool Start() { return false; } 109 110 bool WrapCurrent() { 111 return Thread::WrapCurrent(); 112 } 113 void UnwrapCurrent() { 114 Thread::UnwrapCurrent(); 115 } 116 }; 117 118 119 // A thread that does nothing when it runs and signals an event 120 // when it is destroyed. 121 class SignalWhenDestroyedThread : public Thread { 122 public: 123 SignalWhenDestroyedThread(Event* event) 124 : event_(event) { 125 } 126 127 virtual ~SignalWhenDestroyedThread() { 128 Stop(); 129 event_->Set(); 130 } 131 132 virtual void Run() { 133 // Do nothing. 134 } 135 136 private: 137 Event* event_; 138 }; 139 140 // Function objects to test Thread::Invoke. 141 struct FunctorA { 142 int operator()() { return 42; } 143 }; 144 class FunctorB { 145 public: 146 explicit FunctorB(bool* flag) : flag_(flag) {} 147 void operator()() { if (flag_) *flag_ = true; } 148 private: 149 bool* flag_; 150 }; 151 struct FunctorC { 152 int operator()() { 153 Thread::Current()->ProcessMessages(50); 154 return 24; 155 } 156 }; 157 158 // See: https://code.google.com/p/webrtc/issues/detail?id=2409 159 TEST(ThreadTest, DISABLED_Main) { 160 const SocketAddress addr("127.0.0.1", 0); 161 162 // Create the messaging client on its own thread. 163 Thread th1; 164 Socket* socket = th1.socketserver()->CreateAsyncSocket(addr.family(), 165 SOCK_DGRAM); 166 MessageClient msg_client(&th1, socket); 167 168 // Create the socket client on its own thread. 169 Thread th2; 170 AsyncSocket* asocket = 171 th2.socketserver()->CreateAsyncSocket(addr.family(), SOCK_DGRAM); 172 SocketClient sock_client(asocket, addr, &th1, &msg_client); 173 174 socket->Connect(sock_client.address()); 175 176 th1.Start(); 177 th2.Start(); 178 179 // Get the messages started. 180 th1.PostDelayed(100, &msg_client, 0, new TestMessage(1)); 181 182 // Give the clients a little while to run. 183 // Messages will be processed at 100, 300, 500, 700, 900. 184 Thread* th_main = Thread::Current(); 185 th_main->ProcessMessages(1000); 186 187 // Stop the sending client. Give the receiver a bit longer to run, in case 188 // it is running on a machine that is under load (e.g. the build machine). 189 th1.Stop(); 190 th_main->ProcessMessages(200); 191 th2.Stop(); 192 193 // Make sure the results were correct 194 EXPECT_EQ(5, msg_client.count); 195 EXPECT_EQ(34, msg_client.last); 196 EXPECT_EQ(5, sock_client.count); 197 EXPECT_EQ(55, sock_client.last); 198 } 199 200 // Test that setting thread names doesn't cause a malfunction. 201 // There's no easy way to verify the name was set properly at this time. 202 TEST(ThreadTest, DISABLED_ON_MAC(Names)) { 203 // Default name 204 Thread *thread; 205 thread = new Thread(); 206 EXPECT_TRUE(thread->Start()); 207 thread->Stop(); 208 delete thread; 209 thread = new Thread(); 210 // Name with no object parameter 211 EXPECT_TRUE(thread->SetName("No object", NULL)); 212 EXPECT_TRUE(thread->Start()); 213 thread->Stop(); 214 delete thread; 215 // Really long name 216 thread = new Thread(); 217 EXPECT_TRUE(thread->SetName("Abcdefghijklmnopqrstuvwxyz1234567890", this)); 218 EXPECT_TRUE(thread->Start()); 219 thread->Stop(); 220 delete thread; 221 } 222 223 // Test that setting thread priorities doesn't cause a malfunction. 224 // There's no easy way to verify the priority was set properly at this time. 225 TEST(ThreadTest, DISABLED_ON_MAC(Priorities)) { 226 Thread *thread; 227 thread = new Thread(); 228 EXPECT_TRUE(thread->SetPriority(PRIORITY_HIGH)); 229 EXPECT_TRUE(thread->Start()); 230 thread->Stop(); 231 delete thread; 232 thread = new Thread(); 233 EXPECT_TRUE(thread->SetPriority(PRIORITY_ABOVE_NORMAL)); 234 EXPECT_TRUE(thread->Start()); 235 thread->Stop(); 236 delete thread; 237 238 thread = new Thread(); 239 EXPECT_TRUE(thread->Start()); 240 #if defined(WEBRTC_WIN) 241 EXPECT_TRUE(thread->SetPriority(PRIORITY_ABOVE_NORMAL)); 242 #else 243 EXPECT_FALSE(thread->SetPriority(PRIORITY_ABOVE_NORMAL)); 244 #endif 245 thread->Stop(); 246 delete thread; 247 248 } 249 250 TEST(ThreadTest, DISABLED_ON_MAC(Wrap)) { 251 CustomThread* cthread = new CustomThread(); 252 EXPECT_TRUE(cthread->WrapCurrent()); 253 EXPECT_TRUE(cthread->RunningForTest()); 254 EXPECT_FALSE(cthread->IsOwned()); 255 cthread->UnwrapCurrent(); 256 EXPECT_FALSE(cthread->RunningForTest()); 257 delete cthread; 258 } 259 260 TEST(ThreadTest, DISABLED_ON_MAC(Invoke)) { 261 // Create and start the thread. 262 Thread thread; 263 thread.Start(); 264 // Try calling functors. 265 EXPECT_EQ(42, thread.Invoke<int>(FunctorA())); 266 bool called = false; 267 FunctorB f2(&called); 268 thread.Invoke<void>(f2); 269 EXPECT_TRUE(called); 270 // Try calling bare functions. 271 struct LocalFuncs { 272 static int Func1() { return 999; } 273 static void Func2() {} 274 }; 275 EXPECT_EQ(999, thread.Invoke<int>(&LocalFuncs::Func1)); 276 thread.Invoke<void>(&LocalFuncs::Func2); 277 } 278 279 // Verifies that two threads calling Invoke on each other at the same time does 280 // not deadlock. 281 TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) { 282 AutoThread thread; 283 Thread* current_thread = Thread::Current(); 284 ASSERT_TRUE(current_thread != NULL); 285 286 Thread other_thread; 287 other_thread.Start(); 288 289 struct LocalFuncs { 290 static void Set(bool* out) { *out = true; } 291 static void InvokeSet(Thread* thread, bool* out) { 292 thread->Invoke<void>(Bind(&Set, out)); 293 } 294 }; 295 296 bool called = false; 297 other_thread.Invoke<void>( 298 Bind(&LocalFuncs::InvokeSet, current_thread, &called)); 299 300 EXPECT_TRUE(called); 301 } 302 303 // Verifies that if thread A invokes a call on thread B and thread C is trying 304 // to invoke A at the same time, thread A does not handle C's invoke while 305 // invoking B. 306 TEST(ThreadTest, ThreeThreadsInvoke) { 307 AutoThread thread; 308 Thread* thread_a = Thread::Current(); 309 Thread thread_b, thread_c; 310 thread_b.Start(); 311 thread_c.Start(); 312 313 struct LocalFuncs { 314 static void Set(bool* out) { *out = true; } 315 static void InvokeSet(Thread* thread, bool* out) { 316 thread->Invoke<void>(Bind(&Set, out)); 317 } 318 319 // Set |out| true and call InvokeSet on |thread|. 320 static void SetAndInvokeSet(bool* out, Thread* thread, bool* out_inner) { 321 *out = true; 322 InvokeSet(thread, out_inner); 323 } 324 325 // Asynchronously invoke SetAndInvokeSet on |thread1| and wait until 326 // |thread1| starts the call. 327 static void AsyncInvokeSetAndWait( 328 Thread* thread1, Thread* thread2, bool* out) { 329 bool async_invoked = false; 330 331 AsyncInvoker invoker; 332 invoker.AsyncInvoke<void>( 333 thread1, Bind(&SetAndInvokeSet, &async_invoked, thread2, out)); 334 335 EXPECT_TRUE_WAIT(async_invoked, 2000); 336 } 337 }; 338 339 bool thread_a_called = false; 340 341 // Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A. 342 // Thread B returns when C receives the call and C should be blocked until A 343 // starts to process messages. 344 thread_b.Invoke<void>(Bind(&LocalFuncs::AsyncInvokeSetAndWait, 345 &thread_c, thread_a, &thread_a_called)); 346 EXPECT_FALSE(thread_a_called); 347 348 EXPECT_TRUE_WAIT(thread_a_called, 2000); 349 } 350 351 class AsyncInvokeTest : public testing::Test { 352 public: 353 void IntCallback(int value) { 354 EXPECT_EQ(expected_thread_, Thread::Current()); 355 int_value_ = value; 356 } 357 void AsyncInvokeIntCallback(AsyncInvoker* invoker, Thread* thread) { 358 expected_thread_ = thread; 359 invoker->AsyncInvoke(thread, FunctorC(), 360 &AsyncInvokeTest::IntCallback, 361 static_cast<AsyncInvokeTest*>(this)); 362 invoke_started_.Set(); 363 } 364 void SetExpectedThreadForIntCallback(Thread* thread) { 365 expected_thread_ = thread; 366 } 367 368 protected: 369 enum { kWaitTimeout = 1000 }; 370 AsyncInvokeTest() 371 : int_value_(0), 372 invoke_started_(true, false), 373 expected_thread_(NULL) {} 374 375 int int_value_; 376 Event invoke_started_; 377 Thread* expected_thread_; 378 }; 379 380 TEST_F(AsyncInvokeTest, DISABLED_FireAndForget) { 381 AsyncInvoker invoker; 382 // Create and start the thread. 383 Thread thread; 384 thread.Start(); 385 // Try calling functor. 386 bool called = false; 387 invoker.AsyncInvoke<void>(&thread, FunctorB(&called)); 388 EXPECT_TRUE_WAIT(called, kWaitTimeout); 389 } 390 391 TEST_F(AsyncInvokeTest, DISABLED_WithCallback) { 392 AsyncInvoker invoker; 393 // Create and start the thread. 394 Thread thread; 395 thread.Start(); 396 // Try calling functor. 397 SetExpectedThreadForIntCallback(Thread::Current()); 398 invoker.AsyncInvoke(&thread, FunctorA(), 399 &AsyncInvokeTest::IntCallback, 400 static_cast<AsyncInvokeTest*>(this)); 401 EXPECT_EQ_WAIT(42, int_value_, kWaitTimeout); 402 } 403 404 TEST_F(AsyncInvokeTest, DISABLED_CancelInvoker) { 405 // Create and start the thread. 406 Thread thread; 407 thread.Start(); 408 // Try destroying invoker during call. 409 { 410 AsyncInvoker invoker; 411 invoker.AsyncInvoke(&thread, FunctorC(), 412 &AsyncInvokeTest::IntCallback, 413 static_cast<AsyncInvokeTest*>(this)); 414 } 415 // With invoker gone, callback should be cancelled. 416 Thread::Current()->ProcessMessages(kWaitTimeout); 417 EXPECT_EQ(0, int_value_); 418 } 419 420 TEST_F(AsyncInvokeTest, DISABLED_CancelCallingThread) { 421 AsyncInvoker invoker; 422 { // Create and start the thread. 423 Thread thread; 424 thread.Start(); 425 // Try calling functor. 426 thread.Invoke<void>(Bind(&AsyncInvokeTest::AsyncInvokeIntCallback, 427 static_cast<AsyncInvokeTest*>(this), 428 &invoker, Thread::Current())); 429 // Wait for the call to begin. 430 ASSERT_TRUE(invoke_started_.Wait(kWaitTimeout)); 431 } 432 // Calling thread is gone. Return message shouldn't happen. 433 Thread::Current()->ProcessMessages(kWaitTimeout); 434 EXPECT_EQ(0, int_value_); 435 } 436 437 TEST_F(AsyncInvokeTest, DISABLED_KillInvokerBeforeExecute) { 438 Thread thread; 439 thread.Start(); 440 { 441 AsyncInvoker invoker; 442 // Try calling functor. 443 thread.Invoke<void>(Bind(&AsyncInvokeTest::AsyncInvokeIntCallback, 444 static_cast<AsyncInvokeTest*>(this), 445 &invoker, Thread::Current())); 446 // Wait for the call to begin. 447 ASSERT_TRUE(invoke_started_.Wait(kWaitTimeout)); 448 } 449 // Invoker is destroyed. Function should not execute. 450 Thread::Current()->ProcessMessages(kWaitTimeout); 451 EXPECT_EQ(0, int_value_); 452 } 453 454 TEST_F(AsyncInvokeTest, DISABLED_Flush) { 455 AsyncInvoker invoker; 456 bool flag1 = false; 457 bool flag2 = false; 458 // Queue two async calls to the current thread. 459 invoker.AsyncInvoke<void>(Thread::Current(), 460 FunctorB(&flag1)); 461 invoker.AsyncInvoke<void>(Thread::Current(), 462 FunctorB(&flag2)); 463 // Because we haven't pumped messages, these should not have run yet. 464 EXPECT_FALSE(flag1); 465 EXPECT_FALSE(flag2); 466 // Force them to run now. 467 invoker.Flush(Thread::Current()); 468 EXPECT_TRUE(flag1); 469 EXPECT_TRUE(flag2); 470 } 471 472 TEST_F(AsyncInvokeTest, DISABLED_FlushWithIds) { 473 AsyncInvoker invoker; 474 bool flag1 = false; 475 bool flag2 = false; 476 // Queue two async calls to the current thread, one with a message id. 477 invoker.AsyncInvoke<void>(Thread::Current(), 478 FunctorB(&flag1), 479 5); 480 invoker.AsyncInvoke<void>(Thread::Current(), 481 FunctorB(&flag2)); 482 // Because we haven't pumped messages, these should not have run yet. 483 EXPECT_FALSE(flag1); 484 EXPECT_FALSE(flag2); 485 // Execute pending calls with id == 5. 486 invoker.Flush(Thread::Current(), 5); 487 EXPECT_TRUE(flag1); 488 EXPECT_FALSE(flag2); 489 flag1 = false; 490 // Execute all pending calls. The id == 5 call should not execute again. 491 invoker.Flush(Thread::Current()); 492 EXPECT_FALSE(flag1); 493 EXPECT_TRUE(flag2); 494 } 495 496 497 #if defined(WEBRTC_WIN) 498 class ComThreadTest : public testing::Test, public MessageHandler { 499 public: 500 ComThreadTest() : done_(false) {} 501 protected: 502 virtual void OnMessage(Message* message) { 503 HRESULT hr = CoInitializeEx(NULL, COINIT_MULTITHREADED); 504 // S_FALSE means the thread was already inited for a multithread apartment. 505 EXPECT_EQ(S_FALSE, hr); 506 if (SUCCEEDED(hr)) { 507 CoUninitialize(); 508 } 509 done_ = true; 510 } 511 bool done_; 512 }; 513 514 TEST_F(ComThreadTest, ComInited) { 515 Thread* thread = new ComThread(); 516 EXPECT_TRUE(thread->Start()); 517 thread->Post(this, 0); 518 EXPECT_TRUE_WAIT(done_, 1000); 519 delete thread; 520 } 521 #endif 522