Home | History | Annotate | Download | only in base
      1 /*
      2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/base/thread.h"
     12 
     13 #ifndef __has_feature
     14 #define __has_feature(x) 0  // Compatibility with non-clang or LLVM compilers.
     15 #endif  // __has_feature
     16 
     17 #if defined(WEBRTC_WIN)
     18 #include <comdef.h>
     19 #elif defined(WEBRTC_POSIX)
     20 #include <time.h>
     21 #endif
     22 
     23 #include "webrtc/base/common.h"
     24 #include "webrtc/base/logging.h"
     25 #include "webrtc/base/stringutils.h"
     26 #include "webrtc/base/timeutils.h"
     27 
     28 #if !__has_feature(objc_arc) && (defined(WEBRTC_MAC))
     29 #include "webrtc/base/maccocoathreadhelper.h"
     30 #include "webrtc/base/scoped_autorelease_pool.h"
     31 #endif
     32 
     33 namespace rtc {
     34 
     35 ThreadManager* ThreadManager::Instance() {
     36   LIBJINGLE_DEFINE_STATIC_LOCAL(ThreadManager, thread_manager, ());
     37   return &thread_manager;
     38 }
     39 
     40 // static
     41 Thread* Thread::Current() {
     42   return ThreadManager::Instance()->CurrentThread();
     43 }
     44 
     45 #if defined(WEBRTC_POSIX)
     46 ThreadManager::ThreadManager() {
     47   pthread_key_create(&key_, NULL);
     48 #ifndef NO_MAIN_THREAD_WRAPPING
     49   WrapCurrentThread();
     50 #endif
     51 #if !__has_feature(objc_arc) && (defined(WEBRTC_MAC))
     52   // Under Automatic Reference Counting (ARC), you cannot use autorelease pools
     53   // directly. Instead, you use @autoreleasepool blocks instead.  Also, we are
     54   // maintaining thread safety using immutability within context of GCD dispatch
     55   // queues in this case.
     56   InitCocoaMultiThreading();
     57 #endif
     58 }
     59 
     60 ThreadManager::~ThreadManager() {
     61 #if __has_feature(objc_arc)
     62   @autoreleasepool
     63 #elif defined(WEBRTC_MAC)
     64   // This is called during exit, at which point apparently no NSAutoreleasePools
     65   // are available; but we might still need them to do cleanup (or we get the
     66   // "no autoreleasepool in place, just leaking" warning when exiting).
     67   ScopedAutoreleasePool pool;
     68 #endif
     69   {
     70     UnwrapCurrentThread();
     71     pthread_key_delete(key_);
     72   }
     73 }
     74 
     75 Thread *ThreadManager::CurrentThread() {
     76   return static_cast<Thread *>(pthread_getspecific(key_));
     77 }
     78 
     79 void ThreadManager::SetCurrentThread(Thread *thread) {
     80   pthread_setspecific(key_, thread);
     81 }
     82 #endif
     83 
     84 #if defined(WEBRTC_WIN)
     85 ThreadManager::ThreadManager() {
     86   key_ = TlsAlloc();
     87 #ifndef NO_MAIN_THREAD_WRAPPING
     88   WrapCurrentThread();
     89 #endif
     90 }
     91 
     92 ThreadManager::~ThreadManager() {
     93   UnwrapCurrentThread();
     94   TlsFree(key_);
     95 }
     96 
     97 Thread *ThreadManager::CurrentThread() {
     98   return static_cast<Thread *>(TlsGetValue(key_));
     99 }
    100 
    101 void ThreadManager::SetCurrentThread(Thread *thread) {
    102   TlsSetValue(key_, thread);
    103 }
    104 #endif
    105 
    106 Thread *ThreadManager::WrapCurrentThread() {
    107   Thread* result = CurrentThread();
    108   if (NULL == result) {
    109     result = new Thread();
    110     result->WrapCurrentWithThreadManager(this, true);
    111   }
    112   return result;
    113 }
    114 
    115 void ThreadManager::UnwrapCurrentThread() {
    116   Thread* t = CurrentThread();
    117   if (t && !(t->IsOwned())) {
    118     t->UnwrapCurrent();
    119     delete t;
    120   }
    121 }
    122 
    123 struct ThreadInit {
    124   Thread* thread;
    125   Runnable* runnable;
    126 };
    127 
    128 Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls()
    129   : thread_(Thread::Current()),
    130     previous_state_(thread_->SetAllowBlockingCalls(false)) {
    131 }
    132 
    133 Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() {
    134   ASSERT(thread_->IsCurrent());
    135   thread_->SetAllowBlockingCalls(previous_state_);
    136 }
    137 
    138 Thread::Thread(SocketServer* ss)
    139     : MessageQueue(ss),
    140       priority_(PRIORITY_NORMAL),
    141       running_(true, false),
    142 #if defined(WEBRTC_WIN)
    143       thread_(NULL),
    144       thread_id_(0),
    145 #endif
    146       owned_(true),
    147       blocking_calls_allowed_(true) {
    148   SetName("Thread", this);  // default name
    149 }
    150 
    151 Thread::~Thread() {
    152   Stop();
    153   Clear(NULL);
    154 }
    155 
    156 bool Thread::SleepMs(int milliseconds) {
    157   AssertBlockingIsAllowedOnCurrentThread();
    158 
    159 #if defined(WEBRTC_WIN)
    160   ::Sleep(milliseconds);
    161   return true;
    162 #else
    163   // POSIX has both a usleep() and a nanosleep(), but the former is deprecated,
    164   // so we use nanosleep() even though it has greater precision than necessary.
    165   struct timespec ts;
    166   ts.tv_sec = milliseconds / 1000;
    167   ts.tv_nsec = (milliseconds % 1000) * 1000000;
    168   int ret = nanosleep(&ts, NULL);
    169   if (ret != 0) {
    170     LOG_ERR(LS_WARNING) << "nanosleep() returning early";
    171     return false;
    172   }
    173   return true;
    174 #endif
    175 }
    176 
    177 bool Thread::SetName(const std::string& name, const void* obj) {
    178   if (running()) return false;
    179   name_ = name;
    180   if (obj) {
    181     char buf[16];
    182     sprintfn(buf, sizeof(buf), " 0x%p", obj);
    183     name_ += buf;
    184   }
    185   return true;
    186 }
    187 
    188 bool Thread::SetPriority(ThreadPriority priority) {
    189 #if defined(WEBRTC_WIN)
    190   if (running()) {
    191     ASSERT(thread_ != NULL);
    192     BOOL ret = FALSE;
    193     if (priority == PRIORITY_NORMAL) {
    194       ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_NORMAL);
    195     } else if (priority == PRIORITY_HIGH) {
    196       ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_HIGHEST);
    197     } else if (priority == PRIORITY_ABOVE_NORMAL) {
    198       ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_ABOVE_NORMAL);
    199     } else if (priority == PRIORITY_IDLE) {
    200       ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_IDLE);
    201     }
    202     if (!ret) {
    203       return false;
    204     }
    205   }
    206   priority_ = priority;
    207   return true;
    208 #else
    209   // TODO: Implement for Linux/Mac if possible.
    210   if (running()) return false;
    211   priority_ = priority;
    212   return true;
    213 #endif
    214 }
    215 
    216 bool Thread::Start(Runnable* runnable) {
    217   ASSERT(owned_);
    218   if (!owned_) return false;
    219   ASSERT(!running());
    220   if (running()) return false;
    221 
    222   Restart();  // reset fStop_ if the thread is being restarted
    223 
    224   // Make sure that ThreadManager is created on the main thread before
    225   // we start a new thread.
    226   ThreadManager::Instance();
    227 
    228   ThreadInit* init = new ThreadInit;
    229   init->thread = this;
    230   init->runnable = runnable;
    231 #if defined(WEBRTC_WIN)
    232   DWORD flags = 0;
    233   if (priority_ != PRIORITY_NORMAL) {
    234     flags = CREATE_SUSPENDED;
    235   }
    236   thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PreRun, init, flags,
    237                          &thread_id_);
    238   if (thread_) {
    239     running_.Set();
    240     if (priority_ != PRIORITY_NORMAL) {
    241       SetPriority(priority_);
    242       ::ResumeThread(thread_);
    243     }
    244   } else {
    245     return false;
    246   }
    247 #elif defined(WEBRTC_POSIX)
    248   pthread_attr_t attr;
    249   pthread_attr_init(&attr);
    250 
    251   // Thread priorities are not supported in NaCl.
    252 #if !defined(__native_client__)
    253   if (priority_ != PRIORITY_NORMAL) {
    254     if (priority_ == PRIORITY_IDLE) {
    255       // There is no POSIX-standard way to set a below-normal priority for an
    256       // individual thread (only whole process), so let's not support it.
    257       LOG(LS_WARNING) << "PRIORITY_IDLE not supported";
    258     } else {
    259       // Set real-time round-robin policy.
    260       if (pthread_attr_setschedpolicy(&attr, SCHED_RR) != 0) {
    261         LOG(LS_ERROR) << "pthread_attr_setschedpolicy";
    262       }
    263       struct sched_param param;
    264       if (pthread_attr_getschedparam(&attr, &param) != 0) {
    265         LOG(LS_ERROR) << "pthread_attr_getschedparam";
    266       } else {
    267         // The numbers here are arbitrary.
    268         if (priority_ == PRIORITY_HIGH) {
    269           param.sched_priority = 6;           // 6 = HIGH
    270         } else {
    271           ASSERT(priority_ == PRIORITY_ABOVE_NORMAL);
    272           param.sched_priority = 4;           // 4 = ABOVE_NORMAL
    273         }
    274         if (pthread_attr_setschedparam(&attr, &param) != 0) {
    275           LOG(LS_ERROR) << "pthread_attr_setschedparam";
    276         }
    277       }
    278     }
    279   }
    280 #endif  // !defined(__native_client__)
    281 
    282   int error_code = pthread_create(&thread_, &attr, PreRun, init);
    283   if (0 != error_code) {
    284     LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;
    285     return false;
    286   }
    287   running_.Set();
    288 #endif
    289   return true;
    290 }
    291 
    292 bool Thread::WrapCurrent() {
    293   return WrapCurrentWithThreadManager(ThreadManager::Instance(), true);
    294 }
    295 
    296 void Thread::UnwrapCurrent() {
    297   // Clears the platform-specific thread-specific storage.
    298   ThreadManager::Instance()->SetCurrentThread(NULL);
    299 #if defined(WEBRTC_WIN)
    300   if (thread_ != NULL) {
    301     if (!CloseHandle(thread_)) {
    302       LOG_GLE(LS_ERROR) << "When unwrapping thread, failed to close handle.";
    303     }
    304     thread_ = NULL;
    305   }
    306 #endif
    307   running_.Reset();
    308 }
    309 
    310 void Thread::SafeWrapCurrent() {
    311   WrapCurrentWithThreadManager(ThreadManager::Instance(), false);
    312 }
    313 
    314 void Thread::Join() {
    315   AssertBlockingIsAllowedOnCurrentThread();
    316 
    317   if (running()) {
    318     ASSERT(!IsCurrent());
    319 #if defined(WEBRTC_WIN)
    320     ASSERT(thread_ != NULL);
    321     WaitForSingleObject(thread_, INFINITE);
    322     CloseHandle(thread_);
    323     thread_ = NULL;
    324     thread_id_ = 0;
    325 #elif defined(WEBRTC_POSIX)
    326     void *pv;
    327     pthread_join(thread_, &pv);
    328 #endif
    329     running_.Reset();
    330   }
    331 }
    332 
    333 bool Thread::SetAllowBlockingCalls(bool allow) {
    334   ASSERT(IsCurrent());
    335   bool previous = blocking_calls_allowed_;
    336   blocking_calls_allowed_ = allow;
    337   return previous;
    338 }
    339 
    340 // static
    341 void Thread::AssertBlockingIsAllowedOnCurrentThread() {
    342 #ifdef _DEBUG
    343   Thread* current = Thread::Current();
    344   ASSERT(!current || current->blocking_calls_allowed_);
    345 #endif
    346 }
    347 
    348 #if defined(WEBRTC_WIN)
    349 // As seen on MSDN.
    350 // http://msdn.microsoft.com/en-us/library/xcb2z8hs(VS.71).aspx
    351 #define MSDEV_SET_THREAD_NAME  0x406D1388
    352 typedef struct tagTHREADNAME_INFO {
    353   DWORD dwType;
    354   LPCSTR szName;
    355   DWORD dwThreadID;
    356   DWORD dwFlags;
    357 } THREADNAME_INFO;
    358 
    359 void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName) {
    360   THREADNAME_INFO info;
    361   info.dwType = 0x1000;
    362   info.szName = szThreadName;
    363   info.dwThreadID = dwThreadID;
    364   info.dwFlags = 0;
    365 
    366   __try {
    367     RaiseException(MSDEV_SET_THREAD_NAME, 0, sizeof(info) / sizeof(DWORD),
    368                    reinterpret_cast<ULONG_PTR*>(&info));
    369   }
    370   __except(EXCEPTION_CONTINUE_EXECUTION) {
    371   }
    372 }
    373 #endif  // WEBRTC_WIN
    374 
    375 void* Thread::PreRun(void* pv) {
    376   ThreadInit* init = static_cast<ThreadInit*>(pv);
    377   ThreadManager::Instance()->SetCurrentThread(init->thread);
    378 #if defined(WEBRTC_WIN)
    379   SetThreadName(GetCurrentThreadId(), init->thread->name_.c_str());
    380 #elif defined(WEBRTC_POSIX)
    381   // TODO: See if naming exists for pthreads.
    382 #endif
    383 #if __has_feature(objc_arc)
    384   @autoreleasepool
    385 #elif defined(WEBRTC_MAC)
    386   // Make sure the new thread has an autoreleasepool
    387   ScopedAutoreleasePool pool;
    388 #endif
    389   {
    390     if (init->runnable) {
    391       init->runnable->Run(init->thread);
    392     } else {
    393       init->thread->Run();
    394     }
    395     delete init;
    396     return NULL;
    397   }
    398 }
    399 
    400 void Thread::Run() {
    401   ProcessMessages(kForever);
    402 }
    403 
    404 bool Thread::IsOwned() {
    405   return owned_;
    406 }
    407 
    408 void Thread::Stop() {
    409   MessageQueue::Quit();
    410   Join();
    411 }
    412 
    413 void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
    414   if (fStop_)
    415     return;
    416 
    417   // Sent messages are sent to the MessageHandler directly, in the context
    418   // of "thread", like Win32 SendMessage. If in the right context,
    419   // call the handler directly.
    420   Message msg;
    421   msg.phandler = phandler;
    422   msg.message_id = id;
    423   msg.pdata = pdata;
    424   if (IsCurrent()) {
    425     phandler->OnMessage(&msg);
    426     return;
    427   }
    428 
    429   AssertBlockingIsAllowedOnCurrentThread();
    430 
    431   AutoThread thread;
    432   Thread *current_thread = Thread::Current();
    433   ASSERT(current_thread != NULL);  // AutoThread ensures this
    434 
    435   bool ready = false;
    436   {
    437     CritScope cs(&crit_);
    438     _SendMessage smsg;
    439     smsg.thread = current_thread;
    440     smsg.msg = msg;
    441     smsg.ready = &ready;
    442     sendlist_.push_back(smsg);
    443   }
    444 
    445   // Wait for a reply
    446 
    447   ss_->WakeUp();
    448 
    449   bool waited = false;
    450   crit_.Enter();
    451   while (!ready) {
    452     crit_.Leave();
    453     // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary
    454     // thread invoking calls on the current thread.
    455     current_thread->ReceiveSendsFromThread(this);
    456     current_thread->socketserver()->Wait(kForever, false);
    457     waited = true;
    458     crit_.Enter();
    459   }
    460   crit_.Leave();
    461 
    462   // Our Wait loop above may have consumed some WakeUp events for this
    463   // MessageQueue, that weren't relevant to this Send.  Losing these WakeUps can
    464   // cause problems for some SocketServers.
    465   //
    466   // Concrete example:
    467   // Win32SocketServer on thread A calls Send on thread B.  While processing the
    468   // message, thread B Posts a message to A.  We consume the wakeup for that
    469   // Post while waiting for the Send to complete, which means that when we exit
    470   // this loop, we need to issue another WakeUp, or else the Posted message
    471   // won't be processed in a timely manner.
    472 
    473   if (waited) {
    474     current_thread->socketserver()->WakeUp();
    475   }
    476 }
    477 
    478 void Thread::ReceiveSends() {
    479   ReceiveSendsFromThread(NULL);
    480 }
    481 
    482 void Thread::ReceiveSendsFromThread(const Thread* source) {
    483   // Receive a sent message. Cleanup scenarios:
    484   // - thread sending exits: We don't allow this, since thread can exit
    485   //   only via Join, so Send must complete.
    486   // - thread receiving exits: Wakeup/set ready in Thread::Clear()
    487   // - object target cleared: Wakeup/set ready in Thread::Clear()
    488   _SendMessage smsg;
    489 
    490   crit_.Enter();
    491   while (PopSendMessageFromThread(source, &smsg)) {
    492     crit_.Leave();
    493 
    494     smsg.msg.phandler->OnMessage(&smsg.msg);
    495 
    496     crit_.Enter();
    497     *smsg.ready = true;
    498     smsg.thread->socketserver()->WakeUp();
    499   }
    500   crit_.Leave();
    501 }
    502 
    503 bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) {
    504   for (std::list<_SendMessage>::iterator it = sendlist_.begin();
    505        it != sendlist_.end(); ++it) {
    506     if (it->thread == source || source == NULL) {
    507       *msg = *it;
    508       sendlist_.erase(it);
    509       return true;
    510     }
    511   }
    512   return false;
    513 }
    514 
    515 void Thread::Clear(MessageHandler *phandler, uint32 id,
    516                    MessageList* removed) {
    517   CritScope cs(&crit_);
    518 
    519   // Remove messages on sendlist_ with phandler
    520   // Object target cleared: remove from send list, wakeup/set ready
    521   // if sender not NULL.
    522 
    523   std::list<_SendMessage>::iterator iter = sendlist_.begin();
    524   while (iter != sendlist_.end()) {
    525     _SendMessage smsg = *iter;
    526     if (smsg.msg.Match(phandler, id)) {
    527       if (removed) {
    528         removed->push_back(smsg.msg);
    529       } else {
    530         delete smsg.msg.pdata;
    531       }
    532       iter = sendlist_.erase(iter);
    533       *smsg.ready = true;
    534       smsg.thread->socketserver()->WakeUp();
    535       continue;
    536     }
    537     ++iter;
    538   }
    539 
    540   MessageQueue::Clear(phandler, id, removed);
    541 }
    542 
    543 bool Thread::ProcessMessages(int cmsLoop) {
    544   uint32 msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
    545   int cmsNext = cmsLoop;
    546 
    547   while (true) {
    548 #if __has_feature(objc_arc)
    549     @autoreleasepool
    550 #elif defined(WEBRTC_MAC)
    551     // see: http://developer.apple.com/library/mac/#documentation/Cocoa/Reference/Foundation/Classes/NSAutoreleasePool_Class/Reference/Reference.html
    552     // Each thread is supposed to have an autorelease pool. Also for event loops
    553     // like this, autorelease pool needs to be created and drained/released
    554     // for each cycle.
    555     ScopedAutoreleasePool pool;
    556 #endif
    557     {
    558       Message msg;
    559       if (!Get(&msg, cmsNext))
    560         return !IsQuitting();
    561       Dispatch(&msg);
    562 
    563       if (cmsLoop != kForever) {
    564         cmsNext = TimeUntil(msEnd);
    565         if (cmsNext < 0)
    566           return true;
    567       }
    568     }
    569   }
    570 }
    571 
    572 bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager,
    573                                           bool need_synchronize_access) {
    574   if (running())
    575     return false;
    576 
    577 #if defined(WEBRTC_WIN)
    578   if (need_synchronize_access) {
    579     // We explicitly ask for no rights other than synchronization.
    580     // This gives us the best chance of succeeding.
    581     thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId());
    582     if (!thread_) {
    583       LOG_GLE(LS_ERROR) << "Unable to get handle to thread.";
    584       return false;
    585     }
    586     thread_id_ = GetCurrentThreadId();
    587   }
    588 #elif defined(WEBRTC_POSIX)
    589   thread_ = pthread_self();
    590 #endif
    591 
    592   owned_ = false;
    593   running_.Set();
    594   thread_manager->SetCurrentThread(this);
    595   return true;
    596 }
    597 
    598 AutoThread::AutoThread(SocketServer* ss) : Thread(ss) {
    599   if (!ThreadManager::Instance()->CurrentThread()) {
    600     ThreadManager::Instance()->SetCurrentThread(this);
    601   }
    602 }
    603 
    604 AutoThread::~AutoThread() {
    605   Stop();
    606   if (ThreadManager::Instance()->CurrentThread() == this) {
    607     ThreadManager::Instance()->SetCurrentThread(NULL);
    608   }
    609 }
    610 
    611 #if defined(WEBRTC_WIN)
    612 void ComThread::Run() {
    613   HRESULT hr = CoInitializeEx(NULL, COINIT_MULTITHREADED);
    614   ASSERT(SUCCEEDED(hr));
    615   if (SUCCEEDED(hr)) {
    616     Thread::Run();
    617     CoUninitialize();
    618   } else {
    619     LOG(LS_ERROR) << "CoInitialize failed, hr=" << hr;
    620   }
    621 }
    622 #endif
    623 
    624 }  // namespace rtc
    625