Home | History | Annotate | Download | only in base
      1 /*
      2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/base/thread.h"
     12 
     13 #ifndef __has_feature
     14 #define __has_feature(x) 0  // Compatibility with non-clang or LLVM compilers.
     15 #endif  // __has_feature
     16 
     17 #if defined(WEBRTC_WIN)
     18 #include <comdef.h>
     19 #elif defined(WEBRTC_POSIX)
     20 #include <time.h>
     21 #endif
     22 
     23 #include "webrtc/base/common.h"
     24 #include "webrtc/base/logging.h"
     25 #include "webrtc/base/platform_thread.h"
     26 #include "webrtc/base/stringutils.h"
     27 #include "webrtc/base/timeutils.h"
     28 
     29 #if !__has_feature(objc_arc) && (defined(WEBRTC_MAC))
     30 #include "webrtc/base/maccocoathreadhelper.h"
     31 #include "webrtc/base/scoped_autorelease_pool.h"
     32 #endif
     33 
     34 #include "webrtc/base/trace_event.h"
     35 
     36 namespace rtc {
     37 
     38 ThreadManager* ThreadManager::Instance() {
     39   RTC_DEFINE_STATIC_LOCAL(ThreadManager, thread_manager, ());
     40   return &thread_manager;
     41 }
     42 
     43 // static
     44 Thread* Thread::Current() {
     45   return ThreadManager::Instance()->CurrentThread();
     46 }
     47 
     48 #if defined(WEBRTC_POSIX)
     49 ThreadManager::ThreadManager() {
     50   pthread_key_create(&key_, NULL);
     51 #ifndef NO_MAIN_THREAD_WRAPPING
     52   WrapCurrentThread();
     53 #endif
     54 #if !__has_feature(objc_arc) && (defined(WEBRTC_MAC))
     55   // Under Automatic Reference Counting (ARC), you cannot use autorelease pools
     56   // directly. Instead, you use @autoreleasepool blocks instead.  Also, we are
     57   // maintaining thread safety using immutability within context of GCD dispatch
     58   // queues in this case.
     59   InitCocoaMultiThreading();
     60 #endif
     61 }
     62 
     63 ThreadManager::~ThreadManager() {
     64 #if __has_feature(objc_arc)
     65   @autoreleasepool
     66 #elif defined(WEBRTC_MAC)
     67   // This is called during exit, at which point apparently no NSAutoreleasePools
     68   // are available; but we might still need them to do cleanup (or we get the
     69   // "no autoreleasepool in place, just leaking" warning when exiting).
     70   ScopedAutoreleasePool pool;
     71 #endif
     72   {
     73     UnwrapCurrentThread();
     74     pthread_key_delete(key_);
     75   }
     76 }
     77 
     78 Thread *ThreadManager::CurrentThread() {
     79   return static_cast<Thread *>(pthread_getspecific(key_));
     80 }
     81 
     82 void ThreadManager::SetCurrentThread(Thread *thread) {
     83   pthread_setspecific(key_, thread);
     84 }
     85 #endif
     86 
     87 #if defined(WEBRTC_WIN)
     88 ThreadManager::ThreadManager() {
     89   key_ = TlsAlloc();
     90 #ifndef NO_MAIN_THREAD_WRAPPING
     91   WrapCurrentThread();
     92 #endif
     93 }
     94 
     95 ThreadManager::~ThreadManager() {
     96   UnwrapCurrentThread();
     97   TlsFree(key_);
     98 }
     99 
    100 Thread *ThreadManager::CurrentThread() {
    101   return static_cast<Thread *>(TlsGetValue(key_));
    102 }
    103 
    104 void ThreadManager::SetCurrentThread(Thread *thread) {
    105   TlsSetValue(key_, thread);
    106 }
    107 #endif
    108 
    109 Thread *ThreadManager::WrapCurrentThread() {
    110   Thread* result = CurrentThread();
    111   if (NULL == result) {
    112     result = new Thread();
    113     result->WrapCurrentWithThreadManager(this, true);
    114   }
    115   return result;
    116 }
    117 
    118 void ThreadManager::UnwrapCurrentThread() {
    119   Thread* t = CurrentThread();
    120   if (t && !(t->IsOwned())) {
    121     t->UnwrapCurrent();
    122     delete t;
    123   }
    124 }
    125 
    126 struct ThreadInit {
    127   Thread* thread;
    128   Runnable* runnable;
    129 };
    130 
    131 Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls()
    132   : thread_(Thread::Current()),
    133     previous_state_(thread_->SetAllowBlockingCalls(false)) {
    134 }
    135 
    136 Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() {
    137   ASSERT(thread_->IsCurrent());
    138   thread_->SetAllowBlockingCalls(previous_state_);
    139 }
    140 
    141 Thread::Thread(SocketServer* ss)
    142     : MessageQueue(ss),
    143       running_(true, false),
    144 #if defined(WEBRTC_WIN)
    145       thread_(NULL),
    146       thread_id_(0),
    147 #endif
    148       owned_(true),
    149       blocking_calls_allowed_(true) {
    150   SetName("Thread", this);  // default name
    151 }
    152 
    153 Thread::~Thread() {
    154   Stop();
    155   Clear(NULL);
    156 }
    157 
    158 bool Thread::SleepMs(int milliseconds) {
    159   AssertBlockingIsAllowedOnCurrentThread();
    160 
    161 #if defined(WEBRTC_WIN)
    162   ::Sleep(milliseconds);
    163   return true;
    164 #else
    165   // POSIX has both a usleep() and a nanosleep(), but the former is deprecated,
    166   // so we use nanosleep() even though it has greater precision than necessary.
    167   struct timespec ts;
    168   ts.tv_sec = milliseconds / 1000;
    169   ts.tv_nsec = (milliseconds % 1000) * 1000000;
    170   int ret = nanosleep(&ts, NULL);
    171   if (ret != 0) {
    172     LOG_ERR(LS_WARNING) << "nanosleep() returning early";
    173     return false;
    174   }
    175   return true;
    176 #endif
    177 }
    178 
    179 bool Thread::SetName(const std::string& name, const void* obj) {
    180   if (running()) return false;
    181   name_ = name;
    182   if (obj) {
    183     char buf[16];
    184     sprintfn(buf, sizeof(buf), " 0x%p", obj);
    185     name_ += buf;
    186   }
    187   return true;
    188 }
    189 
    190 bool Thread::Start(Runnable* runnable) {
    191   ASSERT(owned_);
    192   if (!owned_) return false;
    193   ASSERT(!running());
    194   if (running()) return false;
    195 
    196   Restart();  // reset fStop_ if the thread is being restarted
    197 
    198   // Make sure that ThreadManager is created on the main thread before
    199   // we start a new thread.
    200   ThreadManager::Instance();
    201 
    202   ThreadInit* init = new ThreadInit;
    203   init->thread = this;
    204   init->runnable = runnable;
    205 #if defined(WEBRTC_WIN)
    206   thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PreRun, init, 0,
    207                          &thread_id_);
    208   if (thread_) {
    209     running_.Set();
    210   } else {
    211     return false;
    212   }
    213 #elif defined(WEBRTC_POSIX)
    214   pthread_attr_t attr;
    215   pthread_attr_init(&attr);
    216 
    217   int error_code = pthread_create(&thread_, &attr, PreRun, init);
    218   if (0 != error_code) {
    219     LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;
    220     return false;
    221   }
    222   running_.Set();
    223 #endif
    224   return true;
    225 }
    226 
    227 bool Thread::WrapCurrent() {
    228   return WrapCurrentWithThreadManager(ThreadManager::Instance(), true);
    229 }
    230 
    231 void Thread::UnwrapCurrent() {
    232   // Clears the platform-specific thread-specific storage.
    233   ThreadManager::Instance()->SetCurrentThread(NULL);
    234 #if defined(WEBRTC_WIN)
    235   if (thread_ != NULL) {
    236     if (!CloseHandle(thread_)) {
    237       LOG_GLE(LS_ERROR) << "When unwrapping thread, failed to close handle.";
    238     }
    239     thread_ = NULL;
    240   }
    241 #endif
    242   running_.Reset();
    243 }
    244 
    245 void Thread::SafeWrapCurrent() {
    246   WrapCurrentWithThreadManager(ThreadManager::Instance(), false);
    247 }
    248 
    249 void Thread::Join() {
    250   if (running()) {
    251     ASSERT(!IsCurrent());
    252     if (Current() && !Current()->blocking_calls_allowed_) {
    253       LOG(LS_WARNING) << "Waiting for the thread to join, "
    254                       << "but blocking calls have been disallowed";
    255     }
    256 
    257 #if defined(WEBRTC_WIN)
    258     ASSERT(thread_ != NULL);
    259     WaitForSingleObject(thread_, INFINITE);
    260     CloseHandle(thread_);
    261     thread_ = NULL;
    262     thread_id_ = 0;
    263 #elif defined(WEBRTC_POSIX)
    264     void *pv;
    265     pthread_join(thread_, &pv);
    266 #endif
    267     running_.Reset();
    268   }
    269 }
    270 
    271 bool Thread::SetAllowBlockingCalls(bool allow) {
    272   ASSERT(IsCurrent());
    273   bool previous = blocking_calls_allowed_;
    274   blocking_calls_allowed_ = allow;
    275   return previous;
    276 }
    277 
    278 // static
    279 void Thread::AssertBlockingIsAllowedOnCurrentThread() {
    280 #if !defined(NDEBUG)
    281   Thread* current = Thread::Current();
    282   ASSERT(!current || current->blocking_calls_allowed_);
    283 #endif
    284 }
    285 
    286 void* Thread::PreRun(void* pv) {
    287   ThreadInit* init = static_cast<ThreadInit*>(pv);
    288   ThreadManager::Instance()->SetCurrentThread(init->thread);
    289   rtc::SetCurrentThreadName(init->thread->name_.c_str());
    290 #if __has_feature(objc_arc)
    291   @autoreleasepool
    292 #elif defined(WEBRTC_MAC)
    293   // Make sure the new thread has an autoreleasepool
    294   ScopedAutoreleasePool pool;
    295 #endif
    296   {
    297     if (init->runnable) {
    298       init->runnable->Run(init->thread);
    299     } else {
    300       init->thread->Run();
    301     }
    302     delete init;
    303     return NULL;
    304   }
    305 }
    306 
    307 void Thread::Run() {
    308   ProcessMessages(kForever);
    309 }
    310 
    311 bool Thread::IsOwned() {
    312   return owned_;
    313 }
    314 
    315 void Thread::Stop() {
    316   MessageQueue::Quit();
    317   Join();
    318 }
    319 
    320 void Thread::Send(MessageHandler* phandler, uint32_t id, MessageData* pdata) {
    321   if (fStop_)
    322     return;
    323 
    324   // Sent messages are sent to the MessageHandler directly, in the context
    325   // of "thread", like Win32 SendMessage. If in the right context,
    326   // call the handler directly.
    327   Message msg;
    328   msg.phandler = phandler;
    329   msg.message_id = id;
    330   msg.pdata = pdata;
    331   if (IsCurrent()) {
    332     phandler->OnMessage(&msg);
    333     return;
    334   }
    335 
    336   AssertBlockingIsAllowedOnCurrentThread();
    337 
    338   AutoThread thread;
    339   Thread *current_thread = Thread::Current();
    340   ASSERT(current_thread != NULL);  // AutoThread ensures this
    341 
    342   bool ready = false;
    343   {
    344     CritScope cs(&crit_);
    345     _SendMessage smsg;
    346     smsg.thread = current_thread;
    347     smsg.msg = msg;
    348     smsg.ready = &ready;
    349     sendlist_.push_back(smsg);
    350   }
    351 
    352   // Wait for a reply
    353 
    354   ss_->WakeUp();
    355 
    356   bool waited = false;
    357   crit_.Enter();
    358   while (!ready) {
    359     crit_.Leave();
    360     // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary
    361     // thread invoking calls on the current thread.
    362     current_thread->ReceiveSendsFromThread(this);
    363     current_thread->socketserver()->Wait(kForever, false);
    364     waited = true;
    365     crit_.Enter();
    366   }
    367   crit_.Leave();
    368 
    369   // Our Wait loop above may have consumed some WakeUp events for this
    370   // MessageQueue, that weren't relevant to this Send.  Losing these WakeUps can
    371   // cause problems for some SocketServers.
    372   //
    373   // Concrete example:
    374   // Win32SocketServer on thread A calls Send on thread B.  While processing the
    375   // message, thread B Posts a message to A.  We consume the wakeup for that
    376   // Post while waiting for the Send to complete, which means that when we exit
    377   // this loop, we need to issue another WakeUp, or else the Posted message
    378   // won't be processed in a timely manner.
    379 
    380   if (waited) {
    381     current_thread->socketserver()->WakeUp();
    382   }
    383 }
    384 
    385 void Thread::ReceiveSends() {
    386   ReceiveSendsFromThread(NULL);
    387 }
    388 
    389 void Thread::ReceiveSendsFromThread(const Thread* source) {
    390   // Receive a sent message. Cleanup scenarios:
    391   // - thread sending exits: We don't allow this, since thread can exit
    392   //   only via Join, so Send must complete.
    393   // - thread receiving exits: Wakeup/set ready in Thread::Clear()
    394   // - object target cleared: Wakeup/set ready in Thread::Clear()
    395   _SendMessage smsg;
    396 
    397   crit_.Enter();
    398   while (PopSendMessageFromThread(source, &smsg)) {
    399     crit_.Leave();
    400 
    401     smsg.msg.phandler->OnMessage(&smsg.msg);
    402 
    403     crit_.Enter();
    404     *smsg.ready = true;
    405     smsg.thread->socketserver()->WakeUp();
    406   }
    407   crit_.Leave();
    408 }
    409 
    410 bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) {
    411   for (std::list<_SendMessage>::iterator it = sendlist_.begin();
    412        it != sendlist_.end(); ++it) {
    413     if (it->thread == source || source == NULL) {
    414       *msg = *it;
    415       sendlist_.erase(it);
    416       return true;
    417     }
    418   }
    419   return false;
    420 }
    421 
    422 void Thread::InvokeBegin() {
    423   TRACE_EVENT_BEGIN0("webrtc", "Thread::Invoke");
    424 }
    425 
    426 void Thread::InvokeEnd() {
    427   TRACE_EVENT_END0("webrtc", "Thread::Invoke");
    428 }
    429 
    430 void Thread::Clear(MessageHandler* phandler,
    431                    uint32_t id,
    432                    MessageList* removed) {
    433   CritScope cs(&crit_);
    434 
    435   // Remove messages on sendlist_ with phandler
    436   // Object target cleared: remove from send list, wakeup/set ready
    437   // if sender not NULL.
    438 
    439   std::list<_SendMessage>::iterator iter = sendlist_.begin();
    440   while (iter != sendlist_.end()) {
    441     _SendMessage smsg = *iter;
    442     if (smsg.msg.Match(phandler, id)) {
    443       if (removed) {
    444         removed->push_back(smsg.msg);
    445       } else {
    446         delete smsg.msg.pdata;
    447       }
    448       iter = sendlist_.erase(iter);
    449       *smsg.ready = true;
    450       smsg.thread->socketserver()->WakeUp();
    451       continue;
    452     }
    453     ++iter;
    454   }
    455 
    456   MessageQueue::Clear(phandler, id, removed);
    457 }
    458 
    459 bool Thread::ProcessMessages(int cmsLoop) {
    460   uint32_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
    461   int cmsNext = cmsLoop;
    462 
    463   while (true) {
    464 #if __has_feature(objc_arc)
    465     @autoreleasepool
    466 #elif defined(WEBRTC_MAC)
    467     // see: http://developer.apple.com/library/mac/#documentation/Cocoa/Reference/Foundation/Classes/NSAutoreleasePool_Class/Reference/Reference.html
    468     // Each thread is supposed to have an autorelease pool. Also for event loops
    469     // like this, autorelease pool needs to be created and drained/released
    470     // for each cycle.
    471     ScopedAutoreleasePool pool;
    472 #endif
    473     {
    474       Message msg;
    475       if (!Get(&msg, cmsNext))
    476         return !IsQuitting();
    477       Dispatch(&msg);
    478 
    479       if (cmsLoop != kForever) {
    480         cmsNext = TimeUntil(msEnd);
    481         if (cmsNext < 0)
    482           return true;
    483       }
    484     }
    485   }
    486 }
    487 
    488 bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager,
    489                                           bool need_synchronize_access) {
    490   if (running())
    491     return false;
    492 
    493 #if defined(WEBRTC_WIN)
    494   if (need_synchronize_access) {
    495     // We explicitly ask for no rights other than synchronization.
    496     // This gives us the best chance of succeeding.
    497     thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId());
    498     if (!thread_) {
    499       LOG_GLE(LS_ERROR) << "Unable to get handle to thread.";
    500       return false;
    501     }
    502     thread_id_ = GetCurrentThreadId();
    503   }
    504 #elif defined(WEBRTC_POSIX)
    505   thread_ = pthread_self();
    506 #endif
    507 
    508   owned_ = false;
    509   running_.Set();
    510   thread_manager->SetCurrentThread(this);
    511   return true;
    512 }
    513 
    514 AutoThread::AutoThread(SocketServer* ss) : Thread(ss) {
    515   if (!ThreadManager::Instance()->CurrentThread()) {
    516     ThreadManager::Instance()->SetCurrentThread(this);
    517   }
    518 }
    519 
    520 AutoThread::~AutoThread() {
    521   Stop();
    522   if (ThreadManager::Instance()->CurrentThread() == this) {
    523     ThreadManager::Instance()->SetCurrentThread(NULL);
    524   }
    525 }
    526 
    527 #if defined(WEBRTC_WIN)
    528 void ComThread::Run() {
    529   HRESULT hr = CoInitializeEx(NULL, COINIT_MULTITHREADED);
    530   ASSERT(SUCCEEDED(hr));
    531   if (SUCCEEDED(hr)) {
    532     Thread::Run();
    533     CoUninitialize();
    534   } else {
    535     LOG(LS_ERROR) << "CoInitialize failed, hr=" << hr;
    536   }
    537 }
    538 #endif
    539 
    540 }  // namespace rtc
    541