Home | History | Annotate | Download | only in base
      1 /*
      2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/base/asyncinvoker.h"
     12 #include "webrtc/base/asyncudpsocket.h"
     13 #include "webrtc/base/event.h"
     14 #include "webrtc/base/gunit.h"
     15 #include "webrtc/base/physicalsocketserver.h"
     16 #include "webrtc/base/socketaddress.h"
     17 #include "webrtc/base/thread.h"
     18 
     19 #if defined(WEBRTC_WIN)
     20 #include <comdef.h>  // NOLINT
     21 #endif
     22 
     23 using namespace rtc;
     24 
     25 // Generates a sequence of numbers (collaboratively).
     26 class TestGenerator {
     27  public:
     28   TestGenerator() : last(0), count(0) {}
     29 
     30   int Next(int prev) {
     31     int result = prev + last;
     32     last = result;
     33     count += 1;
     34     return result;
     35   }
     36 
     37   int last;
     38   int count;
     39 };
     40 
     41 struct TestMessage : public MessageData {
     42   explicit TestMessage(int v) : value(v) {}
     43   virtual ~TestMessage() {}
     44 
     45   int value;
     46 };
     47 
     48 // Receives on a socket and sends by posting messages.
     49 class SocketClient : public TestGenerator, public sigslot::has_slots<> {
     50  public:
     51   SocketClient(AsyncSocket* socket, const SocketAddress& addr,
     52                Thread* post_thread, MessageHandler* phandler)
     53       : socket_(AsyncUDPSocket::Create(socket, addr)),
     54         post_thread_(post_thread),
     55         post_handler_(phandler) {
     56     socket_->SignalReadPacket.connect(this, &SocketClient::OnPacket);
     57   }
     58 
     59   ~SocketClient() {
     60     delete socket_;
     61   }
     62 
     63   SocketAddress address() const { return socket_->GetLocalAddress(); }
     64 
     65   void OnPacket(AsyncPacketSocket* socket, const char* buf, size_t size,
     66                 const SocketAddress& remote_addr,
     67                 const PacketTime& packet_time) {
     68     EXPECT_EQ(size, sizeof(uint32_t));
     69     uint32_t prev = reinterpret_cast<const uint32_t*>(buf)[0];
     70     uint32_t result = Next(prev);
     71 
     72     post_thread_->PostDelayed(200, post_handler_, 0, new TestMessage(result));
     73   }
     74 
     75  private:
     76   AsyncUDPSocket* socket_;
     77   Thread* post_thread_;
     78   MessageHandler* post_handler_;
     79 };
     80 
     81 // Receives messages and sends on a socket.
     82 class MessageClient : public MessageHandler, public TestGenerator {
     83  public:
     84   MessageClient(Thread* pth, Socket* socket)
     85       : socket_(socket) {
     86   }
     87 
     88   virtual ~MessageClient() {
     89     delete socket_;
     90   }
     91 
     92   virtual void OnMessage(Message *pmsg) {
     93     TestMessage* msg = static_cast<TestMessage*>(pmsg->pdata);
     94     int result = Next(msg->value);
     95     EXPECT_GE(socket_->Send(&result, sizeof(result)), 0);
     96     delete msg;
     97   }
     98 
     99  private:
    100   Socket* socket_;
    101 };
    102 
    103 class CustomThread : public rtc::Thread {
    104  public:
    105   CustomThread() {}
    106   virtual ~CustomThread() { Stop(); }
    107   bool Start() { return false; }
    108 
    109   bool WrapCurrent() {
    110     return Thread::WrapCurrent();
    111   }
    112   void UnwrapCurrent() {
    113     Thread::UnwrapCurrent();
    114   }
    115 };
    116 
    117 
    118 // A thread that does nothing when it runs and signals an event
    119 // when it is destroyed.
    120 class SignalWhenDestroyedThread : public Thread {
    121  public:
    122   SignalWhenDestroyedThread(Event* event)
    123       : event_(event) {
    124   }
    125 
    126   virtual ~SignalWhenDestroyedThread() {
    127     Stop();
    128     event_->Set();
    129   }
    130 
    131   virtual void Run() {
    132     // Do nothing.
    133   }
    134 
    135  private:
    136   Event* event_;
    137 };
    138 
    139 // A bool wrapped in a mutex, to avoid data races. Using a volatile
    140 // bool should be sufficient for correct code ("eventual consistency"
    141 // between caches is sufficient), but we can't tell the compiler about
    142 // that, and then tsan complains about a data race.
    143 
    144 // See also discussion at
    145 // http://stackoverflow.com/questions/7223164/is-mutex-needed-to-synchronize-a-simple-flag-between-pthreads
    146 
    147 // Using std::atomic<bool> or std::atomic_flag in C++11 is probably
    148 // the right thing to do, but those features are not yet allowed. Or
    149 // rtc::AtomicInt, if/when that is added. Since the use isn't
    150 // performance critical, use a plain critical section for the time
    151 // being.
    152 
    153 class AtomicBool {
    154  public:
    155   explicit AtomicBool(bool value = false) : flag_(value) {}
    156   AtomicBool& operator=(bool value) {
    157     CritScope scoped_lock(&cs_);
    158     flag_ = value;
    159     return *this;
    160   }
    161   bool get() const {
    162     CritScope scoped_lock(&cs_);
    163     return flag_;
    164   }
    165 
    166  private:
    167   mutable CriticalSection cs_;
    168   bool flag_;
    169 };
    170 
    171 // Function objects to test Thread::Invoke.
    172 struct FunctorA {
    173   int operator()() { return 42; }
    174 };
    175 class FunctorB {
    176  public:
    177   explicit FunctorB(AtomicBool* flag) : flag_(flag) {}
    178   void operator()() { if (flag_) *flag_ = true; }
    179  private:
    180   AtomicBool* flag_;
    181 };
    182 struct FunctorC {
    183   int operator()() {
    184     Thread::Current()->ProcessMessages(50);
    185     return 24;
    186   }
    187 };
    188 
    189 // See: https://code.google.com/p/webrtc/issues/detail?id=2409
    190 TEST(ThreadTest, DISABLED_Main) {
    191   const SocketAddress addr("127.0.0.1", 0);
    192 
    193   // Create the messaging client on its own thread.
    194   Thread th1;
    195   Socket* socket = th1.socketserver()->CreateAsyncSocket(addr.family(),
    196                                                          SOCK_DGRAM);
    197   MessageClient msg_client(&th1, socket);
    198 
    199   // Create the socket client on its own thread.
    200   Thread th2;
    201   AsyncSocket* asocket =
    202       th2.socketserver()->CreateAsyncSocket(addr.family(), SOCK_DGRAM);
    203   SocketClient sock_client(asocket, addr, &th1, &msg_client);
    204 
    205   socket->Connect(sock_client.address());
    206 
    207   th1.Start();
    208   th2.Start();
    209 
    210   // Get the messages started.
    211   th1.PostDelayed(100, &msg_client, 0, new TestMessage(1));
    212 
    213   // Give the clients a little while to run.
    214   // Messages will be processed at 100, 300, 500, 700, 900.
    215   Thread* th_main = Thread::Current();
    216   th_main->ProcessMessages(1000);
    217 
    218   // Stop the sending client. Give the receiver a bit longer to run, in case
    219   // it is running on a machine that is under load (e.g. the build machine).
    220   th1.Stop();
    221   th_main->ProcessMessages(200);
    222   th2.Stop();
    223 
    224   // Make sure the results were correct
    225   EXPECT_EQ(5, msg_client.count);
    226   EXPECT_EQ(34, msg_client.last);
    227   EXPECT_EQ(5, sock_client.count);
    228   EXPECT_EQ(55, sock_client.last);
    229 }
    230 
    231 // Test that setting thread names doesn't cause a malfunction.
    232 // There's no easy way to verify the name was set properly at this time.
    233 TEST(ThreadTest, Names) {
    234   // Default name
    235   Thread *thread;
    236   thread = new Thread();
    237   EXPECT_TRUE(thread->Start());
    238   thread->Stop();
    239   delete thread;
    240   thread = new Thread();
    241   // Name with no object parameter
    242   EXPECT_TRUE(thread->SetName("No object", NULL));
    243   EXPECT_TRUE(thread->Start());
    244   thread->Stop();
    245   delete thread;
    246   // Really long name
    247   thread = new Thread();
    248   EXPECT_TRUE(thread->SetName("Abcdefghijklmnopqrstuvwxyz1234567890", this));
    249   EXPECT_TRUE(thread->Start());
    250   thread->Stop();
    251   delete thread;
    252 }
    253 
    254 TEST(ThreadTest, Wrap) {
    255   Thread* current_thread = Thread::Current();
    256   current_thread->UnwrapCurrent();
    257   CustomThread* cthread = new CustomThread();
    258   EXPECT_TRUE(cthread->WrapCurrent());
    259   EXPECT_TRUE(cthread->RunningForTest());
    260   EXPECT_FALSE(cthread->IsOwned());
    261   cthread->UnwrapCurrent();
    262   EXPECT_FALSE(cthread->RunningForTest());
    263   delete cthread;
    264   current_thread->WrapCurrent();
    265 }
    266 
    267 TEST(ThreadTest, Invoke) {
    268   // Create and start the thread.
    269   Thread thread;
    270   thread.Start();
    271   // Try calling functors.
    272   EXPECT_EQ(42, thread.Invoke<int>(FunctorA()));
    273   AtomicBool called;
    274   FunctorB f2(&called);
    275   thread.Invoke<void>(f2);
    276   EXPECT_TRUE(called.get());
    277   // Try calling bare functions.
    278   struct LocalFuncs {
    279     static int Func1() { return 999; }
    280     static void Func2() {}
    281   };
    282   EXPECT_EQ(999, thread.Invoke<int>(&LocalFuncs::Func1));
    283   thread.Invoke<void>(&LocalFuncs::Func2);
    284 }
    285 
    286 // Verifies that two threads calling Invoke on each other at the same time does
    287 // not deadlock.
    288 TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) {
    289   AutoThread thread;
    290   Thread* current_thread = Thread::Current();
    291   ASSERT_TRUE(current_thread != NULL);
    292 
    293   Thread other_thread;
    294   other_thread.Start();
    295 
    296   struct LocalFuncs {
    297     static void Set(bool* out) { *out = true; }
    298     static void InvokeSet(Thread* thread, bool* out) {
    299       thread->Invoke<void>(Bind(&Set, out));
    300     }
    301   };
    302 
    303   bool called = false;
    304   other_thread.Invoke<void>(
    305       Bind(&LocalFuncs::InvokeSet, current_thread, &called));
    306 
    307   EXPECT_TRUE(called);
    308 }
    309 
    310 // Verifies that if thread A invokes a call on thread B and thread C is trying
    311 // to invoke A at the same time, thread A does not handle C's invoke while
    312 // invoking B.
    313 TEST(ThreadTest, ThreeThreadsInvoke) {
    314   AutoThread thread;
    315   Thread* thread_a = Thread::Current();
    316   Thread thread_b, thread_c;
    317   thread_b.Start();
    318   thread_c.Start();
    319 
    320   class LockedBool {
    321    public:
    322     explicit LockedBool(bool value) : value_(value) {}
    323 
    324     void Set(bool value) {
    325       CritScope lock(&crit_);
    326       value_ = value;
    327     }
    328 
    329     bool Get() {
    330       CritScope lock(&crit_);
    331       return value_;
    332     }
    333 
    334    private:
    335     CriticalSection crit_;
    336     bool value_ GUARDED_BY(crit_);
    337   };
    338 
    339   struct LocalFuncs {
    340     static void Set(LockedBool* out) { out->Set(true); }
    341     static void InvokeSet(Thread* thread, LockedBool* out) {
    342       thread->Invoke<void>(Bind(&Set, out));
    343     }
    344 
    345     // Set |out| true and call InvokeSet on |thread|.
    346     static void SetAndInvokeSet(LockedBool* out,
    347                                 Thread* thread,
    348                                 LockedBool* out_inner) {
    349       out->Set(true);
    350       InvokeSet(thread, out_inner);
    351     }
    352 
    353     // Asynchronously invoke SetAndInvokeSet on |thread1| and wait until
    354     // |thread1| starts the call.
    355     static void AsyncInvokeSetAndWait(
    356         Thread* thread1, Thread* thread2, LockedBool* out) {
    357       CriticalSection crit;
    358       LockedBool async_invoked(false);
    359 
    360       AsyncInvoker invoker;
    361       invoker.AsyncInvoke<void>(
    362           thread1, Bind(&SetAndInvokeSet, &async_invoked, thread2, out));
    363 
    364       EXPECT_TRUE_WAIT(async_invoked.Get(), 2000);
    365     }
    366   };
    367 
    368   LockedBool thread_a_called(false);
    369 
    370   // Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A.
    371   // Thread B returns when C receives the call and C should be blocked until A
    372   // starts to process messages.
    373   thread_b.Invoke<void>(Bind(&LocalFuncs::AsyncInvokeSetAndWait,
    374                              &thread_c, thread_a, &thread_a_called));
    375   EXPECT_FALSE(thread_a_called.Get());
    376 
    377   EXPECT_TRUE_WAIT(thread_a_called.Get(), 2000);
    378 }
    379 
    380 class AsyncInvokeTest : public testing::Test {
    381  public:
    382   void IntCallback(int value) {
    383     EXPECT_EQ(expected_thread_, Thread::Current());
    384     int_value_ = value;
    385   }
    386   void AsyncInvokeIntCallback(AsyncInvoker* invoker, Thread* thread) {
    387     expected_thread_ = thread;
    388     invoker->AsyncInvoke(thread, FunctorC(),
    389                          &AsyncInvokeTest::IntCallback,
    390                          static_cast<AsyncInvokeTest*>(this));
    391     invoke_started_.Set();
    392   }
    393   void SetExpectedThreadForIntCallback(Thread* thread) {
    394     expected_thread_ = thread;
    395   }
    396 
    397  protected:
    398   enum { kWaitTimeout = 1000 };
    399   AsyncInvokeTest()
    400       : int_value_(0),
    401         invoke_started_(true, false),
    402         expected_thread_(NULL) {}
    403 
    404   int int_value_;
    405   Event invoke_started_;
    406   Thread* expected_thread_;
    407 };
    408 
    409 TEST_F(AsyncInvokeTest, FireAndForget) {
    410   AsyncInvoker invoker;
    411   // Create and start the thread.
    412   Thread thread;
    413   thread.Start();
    414   // Try calling functor.
    415   AtomicBool called;
    416   invoker.AsyncInvoke<void>(&thread, FunctorB(&called));
    417   EXPECT_TRUE_WAIT(called.get(), kWaitTimeout);
    418 }
    419 
    420 TEST_F(AsyncInvokeTest, WithCallback) {
    421   AsyncInvoker invoker;
    422   // Create and start the thread.
    423   Thread thread;
    424   thread.Start();
    425   // Try calling functor.
    426   SetExpectedThreadForIntCallback(Thread::Current());
    427   invoker.AsyncInvoke(&thread, FunctorA(),
    428                       &AsyncInvokeTest::IntCallback,
    429                       static_cast<AsyncInvokeTest*>(this));
    430   EXPECT_EQ_WAIT(42, int_value_, kWaitTimeout);
    431 }
    432 
    433 TEST_F(AsyncInvokeTest, CancelInvoker) {
    434   // Create and start the thread.
    435   Thread thread;
    436   thread.Start();
    437   // Try destroying invoker during call.
    438   {
    439     AsyncInvoker invoker;
    440     invoker.AsyncInvoke(&thread, FunctorC(),
    441                         &AsyncInvokeTest::IntCallback,
    442                         static_cast<AsyncInvokeTest*>(this));
    443   }
    444   // With invoker gone, callback should be cancelled.
    445   Thread::Current()->ProcessMessages(kWaitTimeout);
    446   EXPECT_EQ(0, int_value_);
    447 }
    448 
    449 TEST_F(AsyncInvokeTest, CancelCallingThread) {
    450   AsyncInvoker invoker;
    451   { // Create and start the thread.
    452     Thread thread;
    453     thread.Start();
    454     // Try calling functor.
    455     thread.Invoke<void>(Bind(&AsyncInvokeTest::AsyncInvokeIntCallback,
    456                              static_cast<AsyncInvokeTest*>(this),
    457                              &invoker, Thread::Current()));
    458     // Wait for the call to begin.
    459     ASSERT_TRUE(invoke_started_.Wait(kWaitTimeout));
    460   }
    461   // Calling thread is gone. Return message shouldn't happen.
    462   Thread::Current()->ProcessMessages(kWaitTimeout);
    463   EXPECT_EQ(0, int_value_);
    464 }
    465 
    466 TEST_F(AsyncInvokeTest, KillInvokerBeforeExecute) {
    467   Thread thread;
    468   thread.Start();
    469   {
    470     AsyncInvoker invoker;
    471     // Try calling functor.
    472     thread.Invoke<void>(Bind(&AsyncInvokeTest::AsyncInvokeIntCallback,
    473                              static_cast<AsyncInvokeTest*>(this),
    474                              &invoker, Thread::Current()));
    475     // Wait for the call to begin.
    476     ASSERT_TRUE(invoke_started_.Wait(kWaitTimeout));
    477   }
    478   // Invoker is destroyed. Function should not execute.
    479   Thread::Current()->ProcessMessages(kWaitTimeout);
    480   EXPECT_EQ(0, int_value_);
    481 }
    482 
    483 TEST_F(AsyncInvokeTest, Flush) {
    484   AsyncInvoker invoker;
    485   AtomicBool flag1;
    486   AtomicBool flag2;
    487   // Queue two async calls to the current thread.
    488   invoker.AsyncInvoke<void>(Thread::Current(),
    489                             FunctorB(&flag1));
    490   invoker.AsyncInvoke<void>(Thread::Current(),
    491                             FunctorB(&flag2));
    492   // Because we haven't pumped messages, these should not have run yet.
    493   EXPECT_FALSE(flag1.get());
    494   EXPECT_FALSE(flag2.get());
    495   // Force them to run now.
    496   invoker.Flush(Thread::Current());
    497   EXPECT_TRUE(flag1.get());
    498   EXPECT_TRUE(flag2.get());
    499 }
    500 
    501 TEST_F(AsyncInvokeTest, FlushWithIds) {
    502   AsyncInvoker invoker;
    503   AtomicBool flag1;
    504   AtomicBool flag2;
    505   // Queue two async calls to the current thread, one with a message id.
    506   invoker.AsyncInvoke<void>(Thread::Current(),
    507                             FunctorB(&flag1),
    508                             5);
    509   invoker.AsyncInvoke<void>(Thread::Current(),
    510                             FunctorB(&flag2));
    511   // Because we haven't pumped messages, these should not have run yet.
    512   EXPECT_FALSE(flag1.get());
    513   EXPECT_FALSE(flag2.get());
    514   // Execute pending calls with id == 5.
    515   invoker.Flush(Thread::Current(), 5);
    516   EXPECT_TRUE(flag1.get());
    517   EXPECT_FALSE(flag2.get());
    518   flag1 = false;
    519   // Execute all pending calls. The id == 5 call should not execute again.
    520   invoker.Flush(Thread::Current());
    521   EXPECT_FALSE(flag1.get());
    522   EXPECT_TRUE(flag2.get());
    523 }
    524 
    525 class GuardedAsyncInvokeTest : public testing::Test {
    526  public:
    527   void IntCallback(int value) {
    528     EXPECT_EQ(expected_thread_, Thread::Current());
    529     int_value_ = value;
    530   }
    531   void AsyncInvokeIntCallback(GuardedAsyncInvoker* invoker, Thread* thread) {
    532     expected_thread_ = thread;
    533     invoker->AsyncInvoke(FunctorC(), &GuardedAsyncInvokeTest::IntCallback,
    534                          static_cast<GuardedAsyncInvokeTest*>(this));
    535     invoke_started_.Set();
    536   }
    537   void SetExpectedThreadForIntCallback(Thread* thread) {
    538     expected_thread_ = thread;
    539   }
    540 
    541  protected:
    542   const static int kWaitTimeout = 1000;
    543   GuardedAsyncInvokeTest()
    544       : int_value_(0),
    545         invoke_started_(true, false),
    546         expected_thread_(nullptr) {}
    547 
    548   int int_value_;
    549   Event invoke_started_;
    550   Thread* expected_thread_;
    551 };
    552 
    553 // Functor for creating an invoker.
    554 struct CreateInvoker {
    555   CreateInvoker(scoped_ptr<GuardedAsyncInvoker>* invoker) : invoker_(invoker) {}
    556   void operator()() { invoker_->reset(new GuardedAsyncInvoker()); }
    557   scoped_ptr<GuardedAsyncInvoker>* invoker_;
    558 };
    559 
    560 // Test that we can call AsyncInvoke<void>() after the thread died.
    561 TEST_F(GuardedAsyncInvokeTest, KillThreadFireAndForget) {
    562   // Create and start the thread.
    563   scoped_ptr<Thread> thread(new Thread());
    564   thread->Start();
    565   scoped_ptr<GuardedAsyncInvoker> invoker;
    566   // Create the invoker on |thread|.
    567   thread->Invoke<void>(CreateInvoker(&invoker));
    568   // Kill |thread|.
    569   thread = nullptr;
    570   // Try calling functor.
    571   AtomicBool called;
    572   EXPECT_FALSE(invoker->AsyncInvoke<void>(FunctorB(&called)));
    573   // With thread gone, nothing should happen.
    574   WAIT(called.get(), kWaitTimeout);
    575   EXPECT_FALSE(called.get());
    576 }
    577 
    578 // Test that we can call AsyncInvoke with callback after the thread died.
    579 TEST_F(GuardedAsyncInvokeTest, KillThreadWithCallback) {
    580   // Create and start the thread.
    581   scoped_ptr<Thread> thread(new Thread());
    582   thread->Start();
    583   scoped_ptr<GuardedAsyncInvoker> invoker;
    584   // Create the invoker on |thread|.
    585   thread->Invoke<void>(CreateInvoker(&invoker));
    586   // Kill |thread|.
    587   thread = nullptr;
    588   // Try calling functor.
    589   EXPECT_FALSE(
    590       invoker->AsyncInvoke(FunctorC(), &GuardedAsyncInvokeTest::IntCallback,
    591                            static_cast<GuardedAsyncInvokeTest*>(this)));
    592   // With thread gone, callback should be cancelled.
    593   Thread::Current()->ProcessMessages(kWaitTimeout);
    594   EXPECT_EQ(0, int_value_);
    595 }
    596 
    597 // The remaining tests check that GuardedAsyncInvoker behaves as AsyncInvoker
    598 // when Thread is still alive.
    599 TEST_F(GuardedAsyncInvokeTest, FireAndForget) {
    600   GuardedAsyncInvoker invoker;
    601   // Try calling functor.
    602   AtomicBool called;
    603   EXPECT_TRUE(invoker.AsyncInvoke<void>(FunctorB(&called)));
    604   EXPECT_TRUE_WAIT(called.get(), kWaitTimeout);
    605 }
    606 
    607 TEST_F(GuardedAsyncInvokeTest, WithCallback) {
    608   GuardedAsyncInvoker invoker;
    609   // Try calling functor.
    610   SetExpectedThreadForIntCallback(Thread::Current());
    611   EXPECT_TRUE(invoker.AsyncInvoke(FunctorA(),
    612                                   &GuardedAsyncInvokeTest::IntCallback,
    613                                   static_cast<GuardedAsyncInvokeTest*>(this)));
    614   EXPECT_EQ_WAIT(42, int_value_, kWaitTimeout);
    615 }
    616 
    617 TEST_F(GuardedAsyncInvokeTest, CancelInvoker) {
    618   // Try destroying invoker during call.
    619   {
    620     GuardedAsyncInvoker invoker;
    621     EXPECT_TRUE(
    622         invoker.AsyncInvoke(FunctorC(), &GuardedAsyncInvokeTest::IntCallback,
    623                             static_cast<GuardedAsyncInvokeTest*>(this)));
    624   }
    625   // With invoker gone, callback should be cancelled.
    626   Thread::Current()->ProcessMessages(kWaitTimeout);
    627   EXPECT_EQ(0, int_value_);
    628 }
    629 
    630 TEST_F(GuardedAsyncInvokeTest, CancelCallingThread) {
    631   GuardedAsyncInvoker invoker;
    632   // Try destroying calling thread during call.
    633   {
    634     Thread thread;
    635     thread.Start();
    636     // Try calling functor.
    637     thread.Invoke<void>(Bind(&GuardedAsyncInvokeTest::AsyncInvokeIntCallback,
    638                              static_cast<GuardedAsyncInvokeTest*>(this),
    639                              &invoker, Thread::Current()));
    640     // Wait for the call to begin.
    641     ASSERT_TRUE(invoke_started_.Wait(kWaitTimeout));
    642   }
    643   // Calling thread is gone. Return message shouldn't happen.
    644   Thread::Current()->ProcessMessages(kWaitTimeout);
    645   EXPECT_EQ(0, int_value_);
    646 }
    647 
    648 TEST_F(GuardedAsyncInvokeTest, KillInvokerBeforeExecute) {
    649   Thread thread;
    650   thread.Start();
    651   {
    652     GuardedAsyncInvoker invoker;
    653     // Try calling functor.
    654     thread.Invoke<void>(Bind(&GuardedAsyncInvokeTest::AsyncInvokeIntCallback,
    655                              static_cast<GuardedAsyncInvokeTest*>(this),
    656                              &invoker, Thread::Current()));
    657     // Wait for the call to begin.
    658     ASSERT_TRUE(invoke_started_.Wait(kWaitTimeout));
    659   }
    660   // Invoker is destroyed. Function should not execute.
    661   Thread::Current()->ProcessMessages(kWaitTimeout);
    662   EXPECT_EQ(0, int_value_);
    663 }
    664 
    665 TEST_F(GuardedAsyncInvokeTest, Flush) {
    666   GuardedAsyncInvoker invoker;
    667   AtomicBool flag1;
    668   AtomicBool flag2;
    669   // Queue two async calls to the current thread.
    670   EXPECT_TRUE(invoker.AsyncInvoke<void>(FunctorB(&flag1)));
    671   EXPECT_TRUE(invoker.AsyncInvoke<void>(FunctorB(&flag2)));
    672   // Because we haven't pumped messages, these should not have run yet.
    673   EXPECT_FALSE(flag1.get());
    674   EXPECT_FALSE(flag2.get());
    675   // Force them to run now.
    676   EXPECT_TRUE(invoker.Flush());
    677   EXPECT_TRUE(flag1.get());
    678   EXPECT_TRUE(flag2.get());
    679 }
    680 
    681 TEST_F(GuardedAsyncInvokeTest, FlushWithIds) {
    682   GuardedAsyncInvoker invoker;
    683   AtomicBool flag1;
    684   AtomicBool flag2;
    685   // Queue two async calls to the current thread, one with a message id.
    686   EXPECT_TRUE(invoker.AsyncInvoke<void>(FunctorB(&flag1), 5));
    687   EXPECT_TRUE(invoker.AsyncInvoke<void>(FunctorB(&flag2)));
    688   // Because we haven't pumped messages, these should not have run yet.
    689   EXPECT_FALSE(flag1.get());
    690   EXPECT_FALSE(flag2.get());
    691   // Execute pending calls with id == 5.
    692   EXPECT_TRUE(invoker.Flush(5));
    693   EXPECT_TRUE(flag1.get());
    694   EXPECT_FALSE(flag2.get());
    695   flag1 = false;
    696   // Execute all pending calls. The id == 5 call should not execute again.
    697   EXPECT_TRUE(invoker.Flush());
    698   EXPECT_FALSE(flag1.get());
    699   EXPECT_TRUE(flag2.get());
    700 }
    701 
    702 #if defined(WEBRTC_WIN)
    703 class ComThreadTest : public testing::Test, public MessageHandler {
    704  public:
    705   ComThreadTest() : done_(false) {}
    706  protected:
    707   virtual void OnMessage(Message* message) {
    708     HRESULT hr = CoInitializeEx(NULL, COINIT_MULTITHREADED);
    709     // S_FALSE means the thread was already inited for a multithread apartment.
    710     EXPECT_EQ(S_FALSE, hr);
    711     if (SUCCEEDED(hr)) {
    712       CoUninitialize();
    713     }
    714     done_ = true;
    715   }
    716   bool done_;
    717 };
    718 
    719 TEST_F(ComThreadTest, ComInited) {
    720   Thread* thread = new ComThread();
    721   EXPECT_TRUE(thread->Start());
    722   thread->Post(this, 0);
    723   EXPECT_TRUE_WAIT(done_, 1000);
    724   delete thread;
    725 }
    726 #endif
    727