1 /* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "webrtc/base/asyncinvoker.h" 12 #include "webrtc/base/asyncudpsocket.h" 13 #include "webrtc/base/event.h" 14 #include "webrtc/base/gunit.h" 15 #include "webrtc/base/physicalsocketserver.h" 16 #include "webrtc/base/socketaddress.h" 17 #include "webrtc/base/thread.h" 18 19 #if defined(WEBRTC_WIN) 20 #include <comdef.h> // NOLINT 21 #endif 22 23 using namespace rtc; 24 25 // Generates a sequence of numbers (collaboratively). 26 class TestGenerator { 27 public: 28 TestGenerator() : last(0), count(0) {} 29 30 int Next(int prev) { 31 int result = prev + last; 32 last = result; 33 count += 1; 34 return result; 35 } 36 37 int last; 38 int count; 39 }; 40 41 struct TestMessage : public MessageData { 42 explicit TestMessage(int v) : value(v) {} 43 virtual ~TestMessage() {} 44 45 int value; 46 }; 47 48 // Receives on a socket and sends by posting messages. 49 class SocketClient : public TestGenerator, public sigslot::has_slots<> { 50 public: 51 SocketClient(AsyncSocket* socket, const SocketAddress& addr, 52 Thread* post_thread, MessageHandler* phandler) 53 : socket_(AsyncUDPSocket::Create(socket, addr)), 54 post_thread_(post_thread), 55 post_handler_(phandler) { 56 socket_->SignalReadPacket.connect(this, &SocketClient::OnPacket); 57 } 58 59 ~SocketClient() { 60 delete socket_; 61 } 62 63 SocketAddress address() const { return socket_->GetLocalAddress(); } 64 65 void OnPacket(AsyncPacketSocket* socket, const char* buf, size_t size, 66 const SocketAddress& remote_addr, 67 const PacketTime& packet_time) { 68 EXPECT_EQ(size, sizeof(uint32_t)); 69 uint32_t prev = reinterpret_cast<const uint32_t*>(buf)[0]; 70 uint32_t result = Next(prev); 71 72 post_thread_->PostDelayed(200, post_handler_, 0, new TestMessage(result)); 73 } 74 75 private: 76 AsyncUDPSocket* socket_; 77 Thread* post_thread_; 78 MessageHandler* post_handler_; 79 }; 80 81 // Receives messages and sends on a socket. 82 class MessageClient : public MessageHandler, public TestGenerator { 83 public: 84 MessageClient(Thread* pth, Socket* socket) 85 : socket_(socket) { 86 } 87 88 virtual ~MessageClient() { 89 delete socket_; 90 } 91 92 virtual void OnMessage(Message *pmsg) { 93 TestMessage* msg = static_cast<TestMessage*>(pmsg->pdata); 94 int result = Next(msg->value); 95 EXPECT_GE(socket_->Send(&result, sizeof(result)), 0); 96 delete msg; 97 } 98 99 private: 100 Socket* socket_; 101 }; 102 103 class CustomThread : public rtc::Thread { 104 public: 105 CustomThread() {} 106 virtual ~CustomThread() { Stop(); } 107 bool Start() { return false; } 108 109 bool WrapCurrent() { 110 return Thread::WrapCurrent(); 111 } 112 void UnwrapCurrent() { 113 Thread::UnwrapCurrent(); 114 } 115 }; 116 117 118 // A thread that does nothing when it runs and signals an event 119 // when it is destroyed. 120 class SignalWhenDestroyedThread : public Thread { 121 public: 122 SignalWhenDestroyedThread(Event* event) 123 : event_(event) { 124 } 125 126 virtual ~SignalWhenDestroyedThread() { 127 Stop(); 128 event_->Set(); 129 } 130 131 virtual void Run() { 132 // Do nothing. 133 } 134 135 private: 136 Event* event_; 137 }; 138 139 // A bool wrapped in a mutex, to avoid data races. Using a volatile 140 // bool should be sufficient for correct code ("eventual consistency" 141 // between caches is sufficient), but we can't tell the compiler about 142 // that, and then tsan complains about a data race. 143 144 // See also discussion at 145 // http://stackoverflow.com/questions/7223164/is-mutex-needed-to-synchronize-a-simple-flag-between-pthreads 146 147 // Using std::atomic<bool> or std::atomic_flag in C++11 is probably 148 // the right thing to do, but those features are not yet allowed. Or 149 // rtc::AtomicInt, if/when that is added. Since the use isn't 150 // performance critical, use a plain critical section for the time 151 // being. 152 153 class AtomicBool { 154 public: 155 explicit AtomicBool(bool value = false) : flag_(value) {} 156 AtomicBool& operator=(bool value) { 157 CritScope scoped_lock(&cs_); 158 flag_ = value; 159 return *this; 160 } 161 bool get() const { 162 CritScope scoped_lock(&cs_); 163 return flag_; 164 } 165 166 private: 167 mutable CriticalSection cs_; 168 bool flag_; 169 }; 170 171 // Function objects to test Thread::Invoke. 172 struct FunctorA { 173 int operator()() { return 42; } 174 }; 175 class FunctorB { 176 public: 177 explicit FunctorB(AtomicBool* flag) : flag_(flag) {} 178 void operator()() { if (flag_) *flag_ = true; } 179 private: 180 AtomicBool* flag_; 181 }; 182 struct FunctorC { 183 int operator()() { 184 Thread::Current()->ProcessMessages(50); 185 return 24; 186 } 187 }; 188 189 // See: https://code.google.com/p/webrtc/issues/detail?id=2409 190 TEST(ThreadTest, DISABLED_Main) { 191 const SocketAddress addr("127.0.0.1", 0); 192 193 // Create the messaging client on its own thread. 194 Thread th1; 195 Socket* socket = th1.socketserver()->CreateAsyncSocket(addr.family(), 196 SOCK_DGRAM); 197 MessageClient msg_client(&th1, socket); 198 199 // Create the socket client on its own thread. 200 Thread th2; 201 AsyncSocket* asocket = 202 th2.socketserver()->CreateAsyncSocket(addr.family(), SOCK_DGRAM); 203 SocketClient sock_client(asocket, addr, &th1, &msg_client); 204 205 socket->Connect(sock_client.address()); 206 207 th1.Start(); 208 th2.Start(); 209 210 // Get the messages started. 211 th1.PostDelayed(100, &msg_client, 0, new TestMessage(1)); 212 213 // Give the clients a little while to run. 214 // Messages will be processed at 100, 300, 500, 700, 900. 215 Thread* th_main = Thread::Current(); 216 th_main->ProcessMessages(1000); 217 218 // Stop the sending client. Give the receiver a bit longer to run, in case 219 // it is running on a machine that is under load (e.g. the build machine). 220 th1.Stop(); 221 th_main->ProcessMessages(200); 222 th2.Stop(); 223 224 // Make sure the results were correct 225 EXPECT_EQ(5, msg_client.count); 226 EXPECT_EQ(34, msg_client.last); 227 EXPECT_EQ(5, sock_client.count); 228 EXPECT_EQ(55, sock_client.last); 229 } 230 231 // Test that setting thread names doesn't cause a malfunction. 232 // There's no easy way to verify the name was set properly at this time. 233 TEST(ThreadTest, Names) { 234 // Default name 235 Thread *thread; 236 thread = new Thread(); 237 EXPECT_TRUE(thread->Start()); 238 thread->Stop(); 239 delete thread; 240 thread = new Thread(); 241 // Name with no object parameter 242 EXPECT_TRUE(thread->SetName("No object", NULL)); 243 EXPECT_TRUE(thread->Start()); 244 thread->Stop(); 245 delete thread; 246 // Really long name 247 thread = new Thread(); 248 EXPECT_TRUE(thread->SetName("Abcdefghijklmnopqrstuvwxyz1234567890", this)); 249 EXPECT_TRUE(thread->Start()); 250 thread->Stop(); 251 delete thread; 252 } 253 254 TEST(ThreadTest, Wrap) { 255 Thread* current_thread = Thread::Current(); 256 current_thread->UnwrapCurrent(); 257 CustomThread* cthread = new CustomThread(); 258 EXPECT_TRUE(cthread->WrapCurrent()); 259 EXPECT_TRUE(cthread->RunningForTest()); 260 EXPECT_FALSE(cthread->IsOwned()); 261 cthread->UnwrapCurrent(); 262 EXPECT_FALSE(cthread->RunningForTest()); 263 delete cthread; 264 current_thread->WrapCurrent(); 265 } 266 267 TEST(ThreadTest, Invoke) { 268 // Create and start the thread. 269 Thread thread; 270 thread.Start(); 271 // Try calling functors. 272 EXPECT_EQ(42, thread.Invoke<int>(FunctorA())); 273 AtomicBool called; 274 FunctorB f2(&called); 275 thread.Invoke<void>(f2); 276 EXPECT_TRUE(called.get()); 277 // Try calling bare functions. 278 struct LocalFuncs { 279 static int Func1() { return 999; } 280 static void Func2() {} 281 }; 282 EXPECT_EQ(999, thread.Invoke<int>(&LocalFuncs::Func1)); 283 thread.Invoke<void>(&LocalFuncs::Func2); 284 } 285 286 // Verifies that two threads calling Invoke on each other at the same time does 287 // not deadlock. 288 TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) { 289 AutoThread thread; 290 Thread* current_thread = Thread::Current(); 291 ASSERT_TRUE(current_thread != NULL); 292 293 Thread other_thread; 294 other_thread.Start(); 295 296 struct LocalFuncs { 297 static void Set(bool* out) { *out = true; } 298 static void InvokeSet(Thread* thread, bool* out) { 299 thread->Invoke<void>(Bind(&Set, out)); 300 } 301 }; 302 303 bool called = false; 304 other_thread.Invoke<void>( 305 Bind(&LocalFuncs::InvokeSet, current_thread, &called)); 306 307 EXPECT_TRUE(called); 308 } 309 310 // Verifies that if thread A invokes a call on thread B and thread C is trying 311 // to invoke A at the same time, thread A does not handle C's invoke while 312 // invoking B. 313 TEST(ThreadTest, ThreeThreadsInvoke) { 314 AutoThread thread; 315 Thread* thread_a = Thread::Current(); 316 Thread thread_b, thread_c; 317 thread_b.Start(); 318 thread_c.Start(); 319 320 class LockedBool { 321 public: 322 explicit LockedBool(bool value) : value_(value) {} 323 324 void Set(bool value) { 325 CritScope lock(&crit_); 326 value_ = value; 327 } 328 329 bool Get() { 330 CritScope lock(&crit_); 331 return value_; 332 } 333 334 private: 335 CriticalSection crit_; 336 bool value_ GUARDED_BY(crit_); 337 }; 338 339 struct LocalFuncs { 340 static void Set(LockedBool* out) { out->Set(true); } 341 static void InvokeSet(Thread* thread, LockedBool* out) { 342 thread->Invoke<void>(Bind(&Set, out)); 343 } 344 345 // Set |out| true and call InvokeSet on |thread|. 346 static void SetAndInvokeSet(LockedBool* out, 347 Thread* thread, 348 LockedBool* out_inner) { 349 out->Set(true); 350 InvokeSet(thread, out_inner); 351 } 352 353 // Asynchronously invoke SetAndInvokeSet on |thread1| and wait until 354 // |thread1| starts the call. 355 static void AsyncInvokeSetAndWait( 356 Thread* thread1, Thread* thread2, LockedBool* out) { 357 CriticalSection crit; 358 LockedBool async_invoked(false); 359 360 AsyncInvoker invoker; 361 invoker.AsyncInvoke<void>( 362 thread1, Bind(&SetAndInvokeSet, &async_invoked, thread2, out)); 363 364 EXPECT_TRUE_WAIT(async_invoked.Get(), 2000); 365 } 366 }; 367 368 LockedBool thread_a_called(false); 369 370 // Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A. 371 // Thread B returns when C receives the call and C should be blocked until A 372 // starts to process messages. 373 thread_b.Invoke<void>(Bind(&LocalFuncs::AsyncInvokeSetAndWait, 374 &thread_c, thread_a, &thread_a_called)); 375 EXPECT_FALSE(thread_a_called.Get()); 376 377 EXPECT_TRUE_WAIT(thread_a_called.Get(), 2000); 378 } 379 380 class AsyncInvokeTest : public testing::Test { 381 public: 382 void IntCallback(int value) { 383 EXPECT_EQ(expected_thread_, Thread::Current()); 384 int_value_ = value; 385 } 386 void AsyncInvokeIntCallback(AsyncInvoker* invoker, Thread* thread) { 387 expected_thread_ = thread; 388 invoker->AsyncInvoke(thread, FunctorC(), 389 &AsyncInvokeTest::IntCallback, 390 static_cast<AsyncInvokeTest*>(this)); 391 invoke_started_.Set(); 392 } 393 void SetExpectedThreadForIntCallback(Thread* thread) { 394 expected_thread_ = thread; 395 } 396 397 protected: 398 enum { kWaitTimeout = 1000 }; 399 AsyncInvokeTest() 400 : int_value_(0), 401 invoke_started_(true, false), 402 expected_thread_(NULL) {} 403 404 int int_value_; 405 Event invoke_started_; 406 Thread* expected_thread_; 407 }; 408 409 TEST_F(AsyncInvokeTest, FireAndForget) { 410 AsyncInvoker invoker; 411 // Create and start the thread. 412 Thread thread; 413 thread.Start(); 414 // Try calling functor. 415 AtomicBool called; 416 invoker.AsyncInvoke<void>(&thread, FunctorB(&called)); 417 EXPECT_TRUE_WAIT(called.get(), kWaitTimeout); 418 } 419 420 TEST_F(AsyncInvokeTest, WithCallback) { 421 AsyncInvoker invoker; 422 // Create and start the thread. 423 Thread thread; 424 thread.Start(); 425 // Try calling functor. 426 SetExpectedThreadForIntCallback(Thread::Current()); 427 invoker.AsyncInvoke(&thread, FunctorA(), 428 &AsyncInvokeTest::IntCallback, 429 static_cast<AsyncInvokeTest*>(this)); 430 EXPECT_EQ_WAIT(42, int_value_, kWaitTimeout); 431 } 432 433 TEST_F(AsyncInvokeTest, CancelInvoker) { 434 // Create and start the thread. 435 Thread thread; 436 thread.Start(); 437 // Try destroying invoker during call. 438 { 439 AsyncInvoker invoker; 440 invoker.AsyncInvoke(&thread, FunctorC(), 441 &AsyncInvokeTest::IntCallback, 442 static_cast<AsyncInvokeTest*>(this)); 443 } 444 // With invoker gone, callback should be cancelled. 445 Thread::Current()->ProcessMessages(kWaitTimeout); 446 EXPECT_EQ(0, int_value_); 447 } 448 449 TEST_F(AsyncInvokeTest, CancelCallingThread) { 450 AsyncInvoker invoker; 451 { // Create and start the thread. 452 Thread thread; 453 thread.Start(); 454 // Try calling functor. 455 thread.Invoke<void>(Bind(&AsyncInvokeTest::AsyncInvokeIntCallback, 456 static_cast<AsyncInvokeTest*>(this), 457 &invoker, Thread::Current())); 458 // Wait for the call to begin. 459 ASSERT_TRUE(invoke_started_.Wait(kWaitTimeout)); 460 } 461 // Calling thread is gone. Return message shouldn't happen. 462 Thread::Current()->ProcessMessages(kWaitTimeout); 463 EXPECT_EQ(0, int_value_); 464 } 465 466 TEST_F(AsyncInvokeTest, KillInvokerBeforeExecute) { 467 Thread thread; 468 thread.Start(); 469 { 470 AsyncInvoker invoker; 471 // Try calling functor. 472 thread.Invoke<void>(Bind(&AsyncInvokeTest::AsyncInvokeIntCallback, 473 static_cast<AsyncInvokeTest*>(this), 474 &invoker, Thread::Current())); 475 // Wait for the call to begin. 476 ASSERT_TRUE(invoke_started_.Wait(kWaitTimeout)); 477 } 478 // Invoker is destroyed. Function should not execute. 479 Thread::Current()->ProcessMessages(kWaitTimeout); 480 EXPECT_EQ(0, int_value_); 481 } 482 483 TEST_F(AsyncInvokeTest, Flush) { 484 AsyncInvoker invoker; 485 AtomicBool flag1; 486 AtomicBool flag2; 487 // Queue two async calls to the current thread. 488 invoker.AsyncInvoke<void>(Thread::Current(), 489 FunctorB(&flag1)); 490 invoker.AsyncInvoke<void>(Thread::Current(), 491 FunctorB(&flag2)); 492 // Because we haven't pumped messages, these should not have run yet. 493 EXPECT_FALSE(flag1.get()); 494 EXPECT_FALSE(flag2.get()); 495 // Force them to run now. 496 invoker.Flush(Thread::Current()); 497 EXPECT_TRUE(flag1.get()); 498 EXPECT_TRUE(flag2.get()); 499 } 500 501 TEST_F(AsyncInvokeTest, FlushWithIds) { 502 AsyncInvoker invoker; 503 AtomicBool flag1; 504 AtomicBool flag2; 505 // Queue two async calls to the current thread, one with a message id. 506 invoker.AsyncInvoke<void>(Thread::Current(), 507 FunctorB(&flag1), 508 5); 509 invoker.AsyncInvoke<void>(Thread::Current(), 510 FunctorB(&flag2)); 511 // Because we haven't pumped messages, these should not have run yet. 512 EXPECT_FALSE(flag1.get()); 513 EXPECT_FALSE(flag2.get()); 514 // Execute pending calls with id == 5. 515 invoker.Flush(Thread::Current(), 5); 516 EXPECT_TRUE(flag1.get()); 517 EXPECT_FALSE(flag2.get()); 518 flag1 = false; 519 // Execute all pending calls. The id == 5 call should not execute again. 520 invoker.Flush(Thread::Current()); 521 EXPECT_FALSE(flag1.get()); 522 EXPECT_TRUE(flag2.get()); 523 } 524 525 class GuardedAsyncInvokeTest : public testing::Test { 526 public: 527 void IntCallback(int value) { 528 EXPECT_EQ(expected_thread_, Thread::Current()); 529 int_value_ = value; 530 } 531 void AsyncInvokeIntCallback(GuardedAsyncInvoker* invoker, Thread* thread) { 532 expected_thread_ = thread; 533 invoker->AsyncInvoke(FunctorC(), &GuardedAsyncInvokeTest::IntCallback, 534 static_cast<GuardedAsyncInvokeTest*>(this)); 535 invoke_started_.Set(); 536 } 537 void SetExpectedThreadForIntCallback(Thread* thread) { 538 expected_thread_ = thread; 539 } 540 541 protected: 542 const static int kWaitTimeout = 1000; 543 GuardedAsyncInvokeTest() 544 : int_value_(0), 545 invoke_started_(true, false), 546 expected_thread_(nullptr) {} 547 548 int int_value_; 549 Event invoke_started_; 550 Thread* expected_thread_; 551 }; 552 553 // Functor for creating an invoker. 554 struct CreateInvoker { 555 CreateInvoker(scoped_ptr<GuardedAsyncInvoker>* invoker) : invoker_(invoker) {} 556 void operator()() { invoker_->reset(new GuardedAsyncInvoker()); } 557 scoped_ptr<GuardedAsyncInvoker>* invoker_; 558 }; 559 560 // Test that we can call AsyncInvoke<void>() after the thread died. 561 TEST_F(GuardedAsyncInvokeTest, KillThreadFireAndForget) { 562 // Create and start the thread. 563 scoped_ptr<Thread> thread(new Thread()); 564 thread->Start(); 565 scoped_ptr<GuardedAsyncInvoker> invoker; 566 // Create the invoker on |thread|. 567 thread->Invoke<void>(CreateInvoker(&invoker)); 568 // Kill |thread|. 569 thread = nullptr; 570 // Try calling functor. 571 AtomicBool called; 572 EXPECT_FALSE(invoker->AsyncInvoke<void>(FunctorB(&called))); 573 // With thread gone, nothing should happen. 574 WAIT(called.get(), kWaitTimeout); 575 EXPECT_FALSE(called.get()); 576 } 577 578 // Test that we can call AsyncInvoke with callback after the thread died. 579 TEST_F(GuardedAsyncInvokeTest, KillThreadWithCallback) { 580 // Create and start the thread. 581 scoped_ptr<Thread> thread(new Thread()); 582 thread->Start(); 583 scoped_ptr<GuardedAsyncInvoker> invoker; 584 // Create the invoker on |thread|. 585 thread->Invoke<void>(CreateInvoker(&invoker)); 586 // Kill |thread|. 587 thread = nullptr; 588 // Try calling functor. 589 EXPECT_FALSE( 590 invoker->AsyncInvoke(FunctorC(), &GuardedAsyncInvokeTest::IntCallback, 591 static_cast<GuardedAsyncInvokeTest*>(this))); 592 // With thread gone, callback should be cancelled. 593 Thread::Current()->ProcessMessages(kWaitTimeout); 594 EXPECT_EQ(0, int_value_); 595 } 596 597 // The remaining tests check that GuardedAsyncInvoker behaves as AsyncInvoker 598 // when Thread is still alive. 599 TEST_F(GuardedAsyncInvokeTest, FireAndForget) { 600 GuardedAsyncInvoker invoker; 601 // Try calling functor. 602 AtomicBool called; 603 EXPECT_TRUE(invoker.AsyncInvoke<void>(FunctorB(&called))); 604 EXPECT_TRUE_WAIT(called.get(), kWaitTimeout); 605 } 606 607 TEST_F(GuardedAsyncInvokeTest, WithCallback) { 608 GuardedAsyncInvoker invoker; 609 // Try calling functor. 610 SetExpectedThreadForIntCallback(Thread::Current()); 611 EXPECT_TRUE(invoker.AsyncInvoke(FunctorA(), 612 &GuardedAsyncInvokeTest::IntCallback, 613 static_cast<GuardedAsyncInvokeTest*>(this))); 614 EXPECT_EQ_WAIT(42, int_value_, kWaitTimeout); 615 } 616 617 TEST_F(GuardedAsyncInvokeTest, CancelInvoker) { 618 // Try destroying invoker during call. 619 { 620 GuardedAsyncInvoker invoker; 621 EXPECT_TRUE( 622 invoker.AsyncInvoke(FunctorC(), &GuardedAsyncInvokeTest::IntCallback, 623 static_cast<GuardedAsyncInvokeTest*>(this))); 624 } 625 // With invoker gone, callback should be cancelled. 626 Thread::Current()->ProcessMessages(kWaitTimeout); 627 EXPECT_EQ(0, int_value_); 628 } 629 630 TEST_F(GuardedAsyncInvokeTest, CancelCallingThread) { 631 GuardedAsyncInvoker invoker; 632 // Try destroying calling thread during call. 633 { 634 Thread thread; 635 thread.Start(); 636 // Try calling functor. 637 thread.Invoke<void>(Bind(&GuardedAsyncInvokeTest::AsyncInvokeIntCallback, 638 static_cast<GuardedAsyncInvokeTest*>(this), 639 &invoker, Thread::Current())); 640 // Wait for the call to begin. 641 ASSERT_TRUE(invoke_started_.Wait(kWaitTimeout)); 642 } 643 // Calling thread is gone. Return message shouldn't happen. 644 Thread::Current()->ProcessMessages(kWaitTimeout); 645 EXPECT_EQ(0, int_value_); 646 } 647 648 TEST_F(GuardedAsyncInvokeTest, KillInvokerBeforeExecute) { 649 Thread thread; 650 thread.Start(); 651 { 652 GuardedAsyncInvoker invoker; 653 // Try calling functor. 654 thread.Invoke<void>(Bind(&GuardedAsyncInvokeTest::AsyncInvokeIntCallback, 655 static_cast<GuardedAsyncInvokeTest*>(this), 656 &invoker, Thread::Current())); 657 // Wait for the call to begin. 658 ASSERT_TRUE(invoke_started_.Wait(kWaitTimeout)); 659 } 660 // Invoker is destroyed. Function should not execute. 661 Thread::Current()->ProcessMessages(kWaitTimeout); 662 EXPECT_EQ(0, int_value_); 663 } 664 665 TEST_F(GuardedAsyncInvokeTest, Flush) { 666 GuardedAsyncInvoker invoker; 667 AtomicBool flag1; 668 AtomicBool flag2; 669 // Queue two async calls to the current thread. 670 EXPECT_TRUE(invoker.AsyncInvoke<void>(FunctorB(&flag1))); 671 EXPECT_TRUE(invoker.AsyncInvoke<void>(FunctorB(&flag2))); 672 // Because we haven't pumped messages, these should not have run yet. 673 EXPECT_FALSE(flag1.get()); 674 EXPECT_FALSE(flag2.get()); 675 // Force them to run now. 676 EXPECT_TRUE(invoker.Flush()); 677 EXPECT_TRUE(flag1.get()); 678 EXPECT_TRUE(flag2.get()); 679 } 680 681 TEST_F(GuardedAsyncInvokeTest, FlushWithIds) { 682 GuardedAsyncInvoker invoker; 683 AtomicBool flag1; 684 AtomicBool flag2; 685 // Queue two async calls to the current thread, one with a message id. 686 EXPECT_TRUE(invoker.AsyncInvoke<void>(FunctorB(&flag1), 5)); 687 EXPECT_TRUE(invoker.AsyncInvoke<void>(FunctorB(&flag2))); 688 // Because we haven't pumped messages, these should not have run yet. 689 EXPECT_FALSE(flag1.get()); 690 EXPECT_FALSE(flag2.get()); 691 // Execute pending calls with id == 5. 692 EXPECT_TRUE(invoker.Flush(5)); 693 EXPECT_TRUE(flag1.get()); 694 EXPECT_FALSE(flag2.get()); 695 flag1 = false; 696 // Execute all pending calls. The id == 5 call should not execute again. 697 EXPECT_TRUE(invoker.Flush()); 698 EXPECT_FALSE(flag1.get()); 699 EXPECT_TRUE(flag2.get()); 700 } 701 702 #if defined(WEBRTC_WIN) 703 class ComThreadTest : public testing::Test, public MessageHandler { 704 public: 705 ComThreadTest() : done_(false) {} 706 protected: 707 virtual void OnMessage(Message* message) { 708 HRESULT hr = CoInitializeEx(NULL, COINIT_MULTITHREADED); 709 // S_FALSE means the thread was already inited for a multithread apartment. 710 EXPECT_EQ(S_FALSE, hr); 711 if (SUCCEEDED(hr)) { 712 CoUninitialize(); 713 } 714 done_ = true; 715 } 716 bool done_; 717 }; 718 719 TEST_F(ComThreadTest, ComInited) { 720 Thread* thread = new ComThread(); 721 EXPECT_TRUE(thread->Start()); 722 thread->Post(this, 0); 723 EXPECT_TRUE_WAIT(done_, 1000); 724 delete thread; 725 } 726 #endif 727