Home | History | Annotate | Download | only in base
      1 /*
      2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/base/asyncinvoker.h"
     12 #include "webrtc/base/asyncudpsocket.h"
     13 #include "webrtc/base/event.h"
     14 #include "webrtc/base/gunit.h"
     15 #include "webrtc/base/physicalsocketserver.h"
     16 #include "webrtc/base/socketaddress.h"
     17 #include "webrtc/base/thread.h"
     18 #include "webrtc/test/testsupport/gtest_disable.h"
     19 
     20 #if defined(WEBRTC_WIN)
     21 #include <comdef.h>  // NOLINT
     22 #endif
     23 
     24 using namespace rtc;
     25 
     26 // Generates a sequence of numbers (collaboratively).
     27 class TestGenerator {
     28  public:
     29   TestGenerator() : last(0), count(0) {}
     30 
     31   int Next(int prev) {
     32     int result = prev + last;
     33     last = result;
     34     count += 1;
     35     return result;
     36   }
     37 
     38   int last;
     39   int count;
     40 };
     41 
     42 struct TestMessage : public MessageData {
     43   explicit TestMessage(int v) : value(v) {}
     44   virtual ~TestMessage() {}
     45 
     46   int value;
     47 };
     48 
     49 // Receives on a socket and sends by posting messages.
     50 class SocketClient : public TestGenerator, public sigslot::has_slots<> {
     51  public:
     52   SocketClient(AsyncSocket* socket, const SocketAddress& addr,
     53                Thread* post_thread, MessageHandler* phandler)
     54       : socket_(AsyncUDPSocket::Create(socket, addr)),
     55         post_thread_(post_thread),
     56         post_handler_(phandler) {
     57     socket_->SignalReadPacket.connect(this, &SocketClient::OnPacket);
     58   }
     59 
     60   ~SocketClient() {
     61     delete socket_;
     62   }
     63 
     64   SocketAddress address() const { return socket_->GetLocalAddress(); }
     65 
     66   void OnPacket(AsyncPacketSocket* socket, const char* buf, size_t size,
     67                 const SocketAddress& remote_addr,
     68                 const PacketTime& packet_time) {
     69     EXPECT_EQ(size, sizeof(uint32));
     70     uint32 prev = reinterpret_cast<const uint32*>(buf)[0];
     71     uint32 result = Next(prev);
     72 
     73     post_thread_->PostDelayed(200, post_handler_, 0, new TestMessage(result));
     74   }
     75 
     76  private:
     77   AsyncUDPSocket* socket_;
     78   Thread* post_thread_;
     79   MessageHandler* post_handler_;
     80 };
     81 
     82 // Receives messages and sends on a socket.
     83 class MessageClient : public MessageHandler, public TestGenerator {
     84  public:
     85   MessageClient(Thread* pth, Socket* socket)
     86       : socket_(socket) {
     87   }
     88 
     89   virtual ~MessageClient() {
     90     delete socket_;
     91   }
     92 
     93   virtual void OnMessage(Message *pmsg) {
     94     TestMessage* msg = static_cast<TestMessage*>(pmsg->pdata);
     95     int result = Next(msg->value);
     96     EXPECT_GE(socket_->Send(&result, sizeof(result)), 0);
     97     delete msg;
     98   }
     99 
    100  private:
    101   Socket* socket_;
    102 };
    103 
    104 class CustomThread : public rtc::Thread {
    105  public:
    106   CustomThread() {}
    107   virtual ~CustomThread() { Stop(); }
    108   bool Start() { return false; }
    109 
    110   bool WrapCurrent() {
    111     return Thread::WrapCurrent();
    112   }
    113   void UnwrapCurrent() {
    114     Thread::UnwrapCurrent();
    115   }
    116 };
    117 
    118 
    119 // A thread that does nothing when it runs and signals an event
    120 // when it is destroyed.
    121 class SignalWhenDestroyedThread : public Thread {
    122  public:
    123   SignalWhenDestroyedThread(Event* event)
    124       : event_(event) {
    125   }
    126 
    127   virtual ~SignalWhenDestroyedThread() {
    128     Stop();
    129     event_->Set();
    130   }
    131 
    132   virtual void Run() {
    133     // Do nothing.
    134   }
    135 
    136  private:
    137   Event* event_;
    138 };
    139 
    140 // Function objects to test Thread::Invoke.
    141 struct FunctorA {
    142   int operator()() { return 42; }
    143 };
    144 class FunctorB {
    145  public:
    146   explicit FunctorB(bool* flag) : flag_(flag) {}
    147   void operator()() { if (flag_) *flag_ = true; }
    148  private:
    149   bool* flag_;
    150 };
    151 struct FunctorC {
    152   int operator()() {
    153     Thread::Current()->ProcessMessages(50);
    154     return 24;
    155   }
    156 };
    157 
    158 // See: https://code.google.com/p/webrtc/issues/detail?id=2409
    159 TEST(ThreadTest, DISABLED_Main) {
    160   const SocketAddress addr("127.0.0.1", 0);
    161 
    162   // Create the messaging client on its own thread.
    163   Thread th1;
    164   Socket* socket = th1.socketserver()->CreateAsyncSocket(addr.family(),
    165                                                          SOCK_DGRAM);
    166   MessageClient msg_client(&th1, socket);
    167 
    168   // Create the socket client on its own thread.
    169   Thread th2;
    170   AsyncSocket* asocket =
    171       th2.socketserver()->CreateAsyncSocket(addr.family(), SOCK_DGRAM);
    172   SocketClient sock_client(asocket, addr, &th1, &msg_client);
    173 
    174   socket->Connect(sock_client.address());
    175 
    176   th1.Start();
    177   th2.Start();
    178 
    179   // Get the messages started.
    180   th1.PostDelayed(100, &msg_client, 0, new TestMessage(1));
    181 
    182   // Give the clients a little while to run.
    183   // Messages will be processed at 100, 300, 500, 700, 900.
    184   Thread* th_main = Thread::Current();
    185   th_main->ProcessMessages(1000);
    186 
    187   // Stop the sending client. Give the receiver a bit longer to run, in case
    188   // it is running on a machine that is under load (e.g. the build machine).
    189   th1.Stop();
    190   th_main->ProcessMessages(200);
    191   th2.Stop();
    192 
    193   // Make sure the results were correct
    194   EXPECT_EQ(5, msg_client.count);
    195   EXPECT_EQ(34, msg_client.last);
    196   EXPECT_EQ(5, sock_client.count);
    197   EXPECT_EQ(55, sock_client.last);
    198 }
    199 
    200 // Test that setting thread names doesn't cause a malfunction.
    201 // There's no easy way to verify the name was set properly at this time.
    202 TEST(ThreadTest, DISABLED_ON_MAC(Names)) {
    203   // Default name
    204   Thread *thread;
    205   thread = new Thread();
    206   EXPECT_TRUE(thread->Start());
    207   thread->Stop();
    208   delete thread;
    209   thread = new Thread();
    210   // Name with no object parameter
    211   EXPECT_TRUE(thread->SetName("No object", NULL));
    212   EXPECT_TRUE(thread->Start());
    213   thread->Stop();
    214   delete thread;
    215   // Really long name
    216   thread = new Thread();
    217   EXPECT_TRUE(thread->SetName("Abcdefghijklmnopqrstuvwxyz1234567890", this));
    218   EXPECT_TRUE(thread->Start());
    219   thread->Stop();
    220   delete thread;
    221 }
    222 
    223 // Test that setting thread priorities doesn't cause a malfunction.
    224 // There's no easy way to verify the priority was set properly at this time.
    225 TEST(ThreadTest, DISABLED_ON_MAC(Priorities)) {
    226   Thread *thread;
    227   thread = new Thread();
    228   EXPECT_TRUE(thread->SetPriority(PRIORITY_HIGH));
    229   EXPECT_TRUE(thread->Start());
    230   thread->Stop();
    231   delete thread;
    232   thread = new Thread();
    233   EXPECT_TRUE(thread->SetPriority(PRIORITY_ABOVE_NORMAL));
    234   EXPECT_TRUE(thread->Start());
    235   thread->Stop();
    236   delete thread;
    237 
    238   thread = new Thread();
    239   EXPECT_TRUE(thread->Start());
    240 #if defined(WEBRTC_WIN)
    241   EXPECT_TRUE(thread->SetPriority(PRIORITY_ABOVE_NORMAL));
    242 #else
    243   EXPECT_FALSE(thread->SetPriority(PRIORITY_ABOVE_NORMAL));
    244 #endif
    245   thread->Stop();
    246   delete thread;
    247 
    248 }
    249 
    250 TEST(ThreadTest, DISABLED_ON_MAC(Wrap)) {
    251   CustomThread* cthread = new CustomThread();
    252   EXPECT_TRUE(cthread->WrapCurrent());
    253   EXPECT_TRUE(cthread->RunningForTest());
    254   EXPECT_FALSE(cthread->IsOwned());
    255   cthread->UnwrapCurrent();
    256   EXPECT_FALSE(cthread->RunningForTest());
    257   delete cthread;
    258 }
    259 
    260 TEST(ThreadTest, DISABLED_ON_MAC(Invoke)) {
    261   // Create and start the thread.
    262   Thread thread;
    263   thread.Start();
    264   // Try calling functors.
    265   EXPECT_EQ(42, thread.Invoke<int>(FunctorA()));
    266   bool called = false;
    267   FunctorB f2(&called);
    268   thread.Invoke<void>(f2);
    269   EXPECT_TRUE(called);
    270   // Try calling bare functions.
    271   struct LocalFuncs {
    272     static int Func1() { return 999; }
    273     static void Func2() {}
    274   };
    275   EXPECT_EQ(999, thread.Invoke<int>(&LocalFuncs::Func1));
    276   thread.Invoke<void>(&LocalFuncs::Func2);
    277 }
    278 
    279 // Verifies that two threads calling Invoke on each other at the same time does
    280 // not deadlock.
    281 TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) {
    282   AutoThread thread;
    283   Thread* current_thread = Thread::Current();
    284   ASSERT_TRUE(current_thread != NULL);
    285 
    286   Thread other_thread;
    287   other_thread.Start();
    288 
    289   struct LocalFuncs {
    290     static void Set(bool* out) { *out = true; }
    291     static void InvokeSet(Thread* thread, bool* out) {
    292       thread->Invoke<void>(Bind(&Set, out));
    293     }
    294   };
    295 
    296   bool called = false;
    297   other_thread.Invoke<void>(
    298       Bind(&LocalFuncs::InvokeSet, current_thread, &called));
    299 
    300   EXPECT_TRUE(called);
    301 }
    302 
    303 // Verifies that if thread A invokes a call on thread B and thread C is trying
    304 // to invoke A at the same time, thread A does not handle C's invoke while
    305 // invoking B.
    306 TEST(ThreadTest, ThreeThreadsInvoke) {
    307   AutoThread thread;
    308   Thread* thread_a = Thread::Current();
    309   Thread thread_b, thread_c;
    310   thread_b.Start();
    311   thread_c.Start();
    312 
    313   struct LocalFuncs {
    314     static void Set(bool* out) { *out = true; }
    315     static void InvokeSet(Thread* thread, bool* out) {
    316       thread->Invoke<void>(Bind(&Set, out));
    317     }
    318 
    319     // Set |out| true and call InvokeSet on |thread|.
    320     static void SetAndInvokeSet(bool* out, Thread* thread, bool* out_inner) {
    321       *out = true;
    322       InvokeSet(thread, out_inner);
    323     }
    324 
    325     // Asynchronously invoke SetAndInvokeSet on |thread1| and wait until
    326     // |thread1| starts the call.
    327     static void AsyncInvokeSetAndWait(
    328         Thread* thread1, Thread* thread2, bool* out) {
    329       bool async_invoked = false;
    330 
    331       AsyncInvoker invoker;
    332       invoker.AsyncInvoke<void>(
    333           thread1, Bind(&SetAndInvokeSet, &async_invoked, thread2, out));
    334 
    335       EXPECT_TRUE_WAIT(async_invoked, 2000);
    336     }
    337   };
    338 
    339   bool thread_a_called = false;
    340 
    341   // Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A.
    342   // Thread B returns when C receives the call and C should be blocked until A
    343   // starts to process messages.
    344   thread_b.Invoke<void>(Bind(&LocalFuncs::AsyncInvokeSetAndWait,
    345                              &thread_c, thread_a, &thread_a_called));
    346   EXPECT_FALSE(thread_a_called);
    347 
    348   EXPECT_TRUE_WAIT(thread_a_called, 2000);
    349 }
    350 
    351 class AsyncInvokeTest : public testing::Test {
    352  public:
    353   void IntCallback(int value) {
    354     EXPECT_EQ(expected_thread_, Thread::Current());
    355     int_value_ = value;
    356   }
    357   void AsyncInvokeIntCallback(AsyncInvoker* invoker, Thread* thread) {
    358     expected_thread_ = thread;
    359     invoker->AsyncInvoke(thread, FunctorC(),
    360                          &AsyncInvokeTest::IntCallback,
    361                          static_cast<AsyncInvokeTest*>(this));
    362     invoke_started_.Set();
    363   }
    364   void SetExpectedThreadForIntCallback(Thread* thread) {
    365     expected_thread_ = thread;
    366   }
    367 
    368  protected:
    369   enum { kWaitTimeout = 1000 };
    370   AsyncInvokeTest()
    371       : int_value_(0),
    372         invoke_started_(true, false),
    373         expected_thread_(NULL) {}
    374 
    375   int int_value_;
    376   Event invoke_started_;
    377   Thread* expected_thread_;
    378 };
    379 
    380 TEST_F(AsyncInvokeTest, DISABLED_FireAndForget) {
    381   AsyncInvoker invoker;
    382   // Create and start the thread.
    383   Thread thread;
    384   thread.Start();
    385   // Try calling functor.
    386   bool called = false;
    387   invoker.AsyncInvoke<void>(&thread, FunctorB(&called));
    388   EXPECT_TRUE_WAIT(called, kWaitTimeout);
    389 }
    390 
    391 TEST_F(AsyncInvokeTest, DISABLED_WithCallback) {
    392   AsyncInvoker invoker;
    393   // Create and start the thread.
    394   Thread thread;
    395   thread.Start();
    396   // Try calling functor.
    397   SetExpectedThreadForIntCallback(Thread::Current());
    398   invoker.AsyncInvoke(&thread, FunctorA(),
    399                       &AsyncInvokeTest::IntCallback,
    400                       static_cast<AsyncInvokeTest*>(this));
    401   EXPECT_EQ_WAIT(42, int_value_, kWaitTimeout);
    402 }
    403 
    404 TEST_F(AsyncInvokeTest, DISABLED_CancelInvoker) {
    405   // Create and start the thread.
    406   Thread thread;
    407   thread.Start();
    408   // Try destroying invoker during call.
    409   {
    410     AsyncInvoker invoker;
    411     invoker.AsyncInvoke(&thread, FunctorC(),
    412                         &AsyncInvokeTest::IntCallback,
    413                         static_cast<AsyncInvokeTest*>(this));
    414   }
    415   // With invoker gone, callback should be cancelled.
    416   Thread::Current()->ProcessMessages(kWaitTimeout);
    417   EXPECT_EQ(0, int_value_);
    418 }
    419 
    420 TEST_F(AsyncInvokeTest, DISABLED_CancelCallingThread) {
    421   AsyncInvoker invoker;
    422   { // Create and start the thread.
    423     Thread thread;
    424     thread.Start();
    425     // Try calling functor.
    426     thread.Invoke<void>(Bind(&AsyncInvokeTest::AsyncInvokeIntCallback,
    427                              static_cast<AsyncInvokeTest*>(this),
    428                              &invoker, Thread::Current()));
    429     // Wait for the call to begin.
    430     ASSERT_TRUE(invoke_started_.Wait(kWaitTimeout));
    431   }
    432   // Calling thread is gone. Return message shouldn't happen.
    433   Thread::Current()->ProcessMessages(kWaitTimeout);
    434   EXPECT_EQ(0, int_value_);
    435 }
    436 
    437 TEST_F(AsyncInvokeTest, DISABLED_KillInvokerBeforeExecute) {
    438   Thread thread;
    439   thread.Start();
    440   {
    441     AsyncInvoker invoker;
    442     // Try calling functor.
    443     thread.Invoke<void>(Bind(&AsyncInvokeTest::AsyncInvokeIntCallback,
    444                              static_cast<AsyncInvokeTest*>(this),
    445                              &invoker, Thread::Current()));
    446     // Wait for the call to begin.
    447     ASSERT_TRUE(invoke_started_.Wait(kWaitTimeout));
    448   }
    449   // Invoker is destroyed. Function should not execute.
    450   Thread::Current()->ProcessMessages(kWaitTimeout);
    451   EXPECT_EQ(0, int_value_);
    452 }
    453 
    454 TEST_F(AsyncInvokeTest, DISABLED_Flush) {
    455   AsyncInvoker invoker;
    456   bool flag1 = false;
    457   bool flag2 = false;
    458   // Queue two async calls to the current thread.
    459   invoker.AsyncInvoke<void>(Thread::Current(),
    460                             FunctorB(&flag1));
    461   invoker.AsyncInvoke<void>(Thread::Current(),
    462                             FunctorB(&flag2));
    463   // Because we haven't pumped messages, these should not have run yet.
    464   EXPECT_FALSE(flag1);
    465   EXPECT_FALSE(flag2);
    466   // Force them to run now.
    467   invoker.Flush(Thread::Current());
    468   EXPECT_TRUE(flag1);
    469   EXPECT_TRUE(flag2);
    470 }
    471 
    472 TEST_F(AsyncInvokeTest, DISABLED_FlushWithIds) {
    473   AsyncInvoker invoker;
    474   bool flag1 = false;
    475   bool flag2 = false;
    476   // Queue two async calls to the current thread, one with a message id.
    477   invoker.AsyncInvoke<void>(Thread::Current(),
    478                             FunctorB(&flag1),
    479                             5);
    480   invoker.AsyncInvoke<void>(Thread::Current(),
    481                             FunctorB(&flag2));
    482   // Because we haven't pumped messages, these should not have run yet.
    483   EXPECT_FALSE(flag1);
    484   EXPECT_FALSE(flag2);
    485   // Execute pending calls with id == 5.
    486   invoker.Flush(Thread::Current(), 5);
    487   EXPECT_TRUE(flag1);
    488   EXPECT_FALSE(flag2);
    489   flag1 = false;
    490   // Execute all pending calls. The id == 5 call should not execute again.
    491   invoker.Flush(Thread::Current());
    492   EXPECT_FALSE(flag1);
    493   EXPECT_TRUE(flag2);
    494 }
    495 
    496 
    497 #if defined(WEBRTC_WIN)
    498 class ComThreadTest : public testing::Test, public MessageHandler {
    499  public:
    500   ComThreadTest() : done_(false) {}
    501  protected:
    502   virtual void OnMessage(Message* message) {
    503     HRESULT hr = CoInitializeEx(NULL, COINIT_MULTITHREADED);
    504     // S_FALSE means the thread was already inited for a multithread apartment.
    505     EXPECT_EQ(S_FALSE, hr);
    506     if (SUCCEEDED(hr)) {
    507       CoUninitialize();
    508     }
    509     done_ = true;
    510   }
    511   bool done_;
    512 };
    513 
    514 TEST_F(ComThreadTest, ComInited) {
    515   Thread* thread = new ComThread();
    516   EXPECT_TRUE(thread->Start());
    517   thread->Post(this, 0);
    518   EXPECT_TRUE_WAIT(done_, 1000);
    519   delete thread;
    520 }
    521 #endif
    522