Home | History | Annotate | Download | only in base
      1 /*
      2  * libjingle
      3  * Copyright 2004--2005, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #include "talk/base/thread.h"
     29 
     30 #if defined(WIN32)
     31 #include <comdef.h>
     32 #elif defined(POSIX)
     33 #include <time.h>
     34 #endif
     35 
     36 #include "talk/base/common.h"
     37 #include "talk/base/logging.h"
     38 #include "talk/base/stringutils.h"
     39 #include "talk/base/time.h"
     40 
     41 #ifdef OSX_USE_COCOA
     42 #ifndef OSX
     43 #error OSX_USE_COCOA is defined but not OSX
     44 #endif
     45 #include "talk/base/maccocoathreadhelper.h"
     46 #include "talk/base/scoped_autorelease_pool.h"
     47 #endif
     48 
     49 namespace talk_base {
     50 
     51 ThreadManager g_thmgr;
     52 
     53 #ifdef POSIX
     54 pthread_key_t ThreadManager::key_;
     55 
     56 ThreadManager::ThreadManager() {
     57   pthread_key_create(&key_, NULL);
     58   main_thread_ = WrapCurrentThread();
     59 #if defined(OSX_USE_COCOA)
     60   InitCocoaMultiThreading();
     61 #endif
     62 }
     63 
     64 ThreadManager::~ThreadManager() {
     65 #ifdef OSX_USE_COCOA
     66   // This is called during exit, at which point apparently no NSAutoreleasePools
     67   // are available; but we might still need them to do cleanup (or we get the
     68   // "no autoreleasepool in place, just leaking" warning when exiting).
     69   ScopedAutoreleasePool pool;
     70 #endif
     71   UnwrapCurrentThread();
     72   // Unwrap deletes main_thread_ automatically.
     73   pthread_key_delete(key_);
     74 }
     75 
     76 Thread *ThreadManager::CurrentThread() {
     77   return static_cast<Thread *>(pthread_getspecific(key_));
     78 }
     79 
     80 void ThreadManager::SetCurrent(Thread *thread) {
     81   pthread_setspecific(key_, thread);
     82 }
     83 #endif
     84 
     85 #ifdef WIN32
     86 DWORD ThreadManager::key_;
     87 
     88 ThreadManager::ThreadManager() {
     89   key_ = TlsAlloc();
     90   main_thread_ = WrapCurrentThread();
     91 }
     92 
     93 ThreadManager::~ThreadManager() {
     94   UnwrapCurrentThread();
     95   TlsFree(key_);
     96 }
     97 
     98 Thread *ThreadManager::CurrentThread() {
     99   return static_cast<Thread *>(TlsGetValue(key_));
    100 }
    101 
    102 void ThreadManager::SetCurrent(Thread *thread) {
    103   TlsSetValue(key_, thread);
    104 }
    105 #endif
    106 
    107 // static
    108 Thread *ThreadManager::WrapCurrentThread() {
    109   Thread* result = CurrentThread();
    110   if (NULL == result) {
    111     result = new Thread();
    112 #if defined(WIN32)
    113     // We explicitly ask for no rights other than synchronization.
    114     // This gives us the best chance of succeeding.
    115     result->thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId());
    116     if (!result->thread_)
    117       LOG_GLE(LS_ERROR) << "Unable to get handle to thread.";
    118 #elif defined(POSIX)
    119     result->thread_ = pthread_self();
    120 #endif
    121     result->owned_ = false;
    122     result->started_ = true;
    123     SetCurrent(result);
    124   }
    125 
    126   return result;
    127 }
    128 
    129 // static
    130 void ThreadManager::UnwrapCurrentThread() {
    131   Thread* t = CurrentThread();
    132   if (t && !(t->IsOwned())) {
    133     // Clears the platform-specific thread-specific storage.
    134     SetCurrent(NULL);
    135 #ifdef WIN32
    136     if (!CloseHandle(t->thread_)) {
    137       LOG_GLE(LS_ERROR) << "When unwrapping thread, failed to close handle.";
    138     }
    139 #endif
    140     t->started_ = false;
    141     delete t;
    142   }
    143 }
    144 
    145 void ThreadManager::Add(Thread *thread) {
    146   CritScope cs(&crit_);
    147   threads_.push_back(thread);
    148 }
    149 
    150 void ThreadManager::Remove(Thread *thread) {
    151   CritScope cs(&crit_);
    152   threads_.erase(std::remove(threads_.begin(), threads_.end(), thread),
    153                  threads_.end());
    154 }
    155 
    156 void ThreadManager::StopAllThreads_() {
    157   // TODO: In order to properly implement, Threads need to be ref-counted.
    158   CritScope cs(&g_thmgr.crit_);
    159   for (size_t i = 0; i < g_thmgr.threads_.size(); ++i) {
    160     g_thmgr.threads_[i]->Stop();
    161   }
    162 }
    163 
    164 struct ThreadInit {
    165   Thread* thread;
    166   Runnable* runnable;
    167 };
    168 
    169 Thread::Thread(SocketServer* ss)
    170     : MessageQueue(ss),
    171       priority_(PRIORITY_NORMAL),
    172       started_(false),
    173       has_sends_(false),
    174 #if defined(WIN32)
    175       thread_(NULL),
    176 #endif
    177       owned_(true) {
    178   g_thmgr.Add(this);
    179   SetName("Thread", this);  // default name
    180 }
    181 
    182 Thread::~Thread() {
    183   Stop();
    184   if (active_)
    185     Clear(NULL);
    186   g_thmgr.Remove(this);
    187 }
    188 
    189 bool Thread::SleepMs(int milliseconds) {
    190 #ifdef WIN32
    191   ::Sleep(milliseconds);
    192   return true;
    193 #else
    194   // POSIX has both a usleep() and a nanosleep(), but the former is deprecated,
    195   // so we use nanosleep() even though it has greater precision than necessary.
    196   struct timespec ts;
    197   ts.tv_sec = milliseconds / 1000;
    198   ts.tv_nsec = (milliseconds % 1000) * 1000000;
    199   int ret = nanosleep(&ts, NULL);
    200   if (ret != 0) {
    201     LOG_ERR(LS_WARNING) << "nanosleep() returning early";
    202     return false;
    203   }
    204   return true;
    205 #endif
    206 }
    207 
    208 bool Thread::SetName(const std::string& name, const void* obj) {
    209   if (started_) return false;
    210   name_ = name;
    211   if (obj) {
    212     char buf[16];
    213     sprintfn(buf, sizeof(buf), " 0x%p", obj);
    214     name_ += buf;
    215   }
    216   return true;
    217 }
    218 
    219 bool Thread::SetPriority(ThreadPriority priority) {
    220   if (started_) return false;
    221   priority_ = priority;
    222   return true;
    223 }
    224 
    225 bool Thread::Start(Runnable* runnable) {
    226   ASSERT(owned_);
    227   if (!owned_) return false;
    228   ASSERT(!started_);
    229   if (started_) return false;
    230 
    231   ThreadInit* init = new ThreadInit;
    232   init->thread = this;
    233   init->runnable = runnable;
    234 #if defined(WIN32)
    235   DWORD flags = 0;
    236   if (priority_ != PRIORITY_NORMAL) {
    237     flags = CREATE_SUSPENDED;
    238   }
    239   thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PreRun, init, flags,
    240                          NULL);
    241   if (thread_) {
    242     if (priority_ != PRIORITY_NORMAL) {
    243       if (priority_ == PRIORITY_HIGH) {
    244         ::SetThreadPriority(thread_, THREAD_PRIORITY_HIGHEST);
    245       } else if (priority_ == PRIORITY_ABOVE_NORMAL) {
    246         ::SetThreadPriority(thread_, THREAD_PRIORITY_ABOVE_NORMAL);
    247       } else if (priority_ == PRIORITY_IDLE) {
    248         ::SetThreadPriority(thread_, THREAD_PRIORITY_IDLE);
    249       }
    250       ::ResumeThread(thread_);
    251     }
    252   } else {
    253     return false;
    254   }
    255 #elif defined(POSIX)
    256   pthread_attr_t attr;
    257   pthread_attr_init(&attr);
    258   if (priority_ != PRIORITY_NORMAL) {
    259     if (priority_ == PRIORITY_IDLE) {
    260       // There is no POSIX-standard way to set a below-normal priority for an
    261       // individual thread (only whole process), so let's not support it.
    262       LOG(LS_WARNING) << "PRIORITY_IDLE not supported";
    263     } else {
    264       // Set real-time round-robin policy.
    265       if (pthread_attr_setschedpolicy(&attr, SCHED_RR) != 0) {
    266         LOG(LS_ERROR) << "pthread_attr_setschedpolicy";
    267       }
    268       struct sched_param param;
    269       if (pthread_attr_getschedparam(&attr, &param) != 0) {
    270         LOG(LS_ERROR) << "pthread_attr_getschedparam";
    271       } else {
    272         // The numbers here are arbitrary.
    273         if (priority_ == PRIORITY_HIGH) {
    274           param.sched_priority = 6;           // 6 = HIGH
    275         } else {
    276           ASSERT(priority_ == PRIORITY_ABOVE_NORMAL);
    277           param.sched_priority = 4;           // 4 = ABOVE_NORMAL
    278         }
    279         if (pthread_attr_setschedparam(&attr, &param) != 0) {
    280           LOG(LS_ERROR) << "pthread_attr_setschedparam";
    281         }
    282       }
    283     }
    284   }
    285   int error_code = pthread_create(&thread_, &attr, PreRun, init);
    286   if (0 != error_code) {
    287     LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;
    288     return false;
    289   }
    290 #endif
    291   started_ = true;
    292   return true;
    293 }
    294 
    295 void Thread::Join() {
    296   if (started_) {
    297     ASSERT(!IsCurrent());
    298 #if defined(WIN32)
    299     WaitForSingleObject(thread_, INFINITE);
    300     CloseHandle(thread_);
    301     thread_ = NULL;
    302 #elif defined(POSIX)
    303     void *pv;
    304     pthread_join(thread_, &pv);
    305 #endif
    306     started_ = false;
    307   }
    308 }
    309 
    310 #ifdef WIN32
    311 // As seen on MSDN.
    312 // http://msdn.microsoft.com/en-us/library/xcb2z8hs(VS.71).aspx
    313 #define MSDEV_SET_THREAD_NAME  0x406D1388
    314 typedef struct tagTHREADNAME_INFO {
    315   DWORD dwType;
    316   LPCSTR szName;
    317   DWORD dwThreadID;
    318   DWORD dwFlags;
    319 } THREADNAME_INFO;
    320 
    321 void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName) {
    322   THREADNAME_INFO info;
    323   info.dwType = 0x1000;
    324   info.szName = szThreadName;
    325   info.dwThreadID = dwThreadID;
    326   info.dwFlags = 0;
    327 
    328   __try {
    329     RaiseException(MSDEV_SET_THREAD_NAME, 0, sizeof(info) / sizeof(DWORD),
    330                    reinterpret_cast<DWORD*>(&info));
    331   }
    332   __except(EXCEPTION_CONTINUE_EXECUTION) {
    333   }
    334 }
    335 #endif  // WIN32
    336 
    337 void* Thread::PreRun(void* pv) {
    338   ThreadInit* init = static_cast<ThreadInit*>(pv);
    339   ThreadManager::SetCurrent(init->thread);
    340 #if defined(WIN32)
    341   SetThreadName(GetCurrentThreadId(), init->thread->name_.c_str());
    342 #elif defined(POSIX)
    343   // TODO: See if naming exists for pthreads.
    344 #endif
    345 #ifdef OSX_USE_COCOA
    346   // Make sure the new thread has an autoreleasepool
    347   ScopedAutoreleasePool pool;
    348 #endif
    349   if (init->runnable) {
    350     init->runnable->Run(init->thread);
    351   } else {
    352     init->thread->Run();
    353   }
    354   delete init;
    355   return NULL;
    356 }
    357 
    358 void Thread::Run() {
    359   ProcessMessages(kForever);
    360 }
    361 
    362 bool Thread::IsOwned() {
    363   return owned_;
    364 }
    365 
    366 void Thread::Stop() {
    367   MessageQueue::Quit();
    368   Join();
    369 }
    370 
    371 void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
    372   if (fStop_)
    373     return;
    374 
    375   // Sent messages are sent to the MessageHandler directly, in the context
    376   // of "thread", like Win32 SendMessage. If in the right context,
    377   // call the handler directly.
    378 
    379   Message msg;
    380   msg.phandler = phandler;
    381   msg.message_id = id;
    382   msg.pdata = pdata;
    383   if (IsCurrent()) {
    384     phandler->OnMessage(&msg);
    385     return;
    386   }
    387 
    388   AutoThread thread;
    389   Thread *current_thread = Thread::Current();
    390   ASSERT(current_thread != NULL);  // AutoThread ensures this
    391 
    392   bool ready = false;
    393   {
    394     CritScope cs(&crit_);
    395     EnsureActive();
    396     _SendMessage smsg;
    397     smsg.thread = current_thread;
    398     smsg.msg = msg;
    399     smsg.ready = &ready;
    400     sendlist_.push_back(smsg);
    401     has_sends_ = true;
    402   }
    403 
    404   // Wait for a reply
    405 
    406   ss_->WakeUp();
    407 
    408   bool waited = false;
    409   while (!ready) {
    410     current_thread->ReceiveSends();
    411     current_thread->socketserver()->Wait(kForever, false);
    412     waited = true;
    413   }
    414 
    415   // Our Wait loop above may have consumed some WakeUp events for this
    416   // MessageQueue, that weren't relevant to this Send.  Losing these WakeUps can
    417   // cause problems for some SocketServers.
    418   //
    419   // Concrete example:
    420   // Win32SocketServer on thread A calls Send on thread B.  While processing the
    421   // message, thread B Posts a message to A.  We consume the wakeup for that
    422   // Post while waiting for the Send to complete, which means that when we exit
    423   // this loop, we need to issue another WakeUp, or else the Posted message
    424   // won't be processed in a timely manner.
    425 
    426   if (waited) {
    427     current_thread->socketserver()->WakeUp();
    428   }
    429 }
    430 
    431 void Thread::ReceiveSends() {
    432   // Before entering critical section, check boolean.
    433 
    434   if (!has_sends_)
    435     return;
    436 
    437   // Receive a sent message. Cleanup scenarios:
    438   // - thread sending exits: We don't allow this, since thread can exit
    439   //   only via Join, so Send must complete.
    440   // - thread receiving exits: Wakeup/set ready in Thread::Clear()
    441   // - object target cleared: Wakeup/set ready in Thread::Clear()
    442   crit_.Enter();
    443   while (!sendlist_.empty()) {
    444     _SendMessage smsg = sendlist_.front();
    445     sendlist_.pop_front();
    446     crit_.Leave();
    447     smsg.msg.phandler->OnMessage(&smsg.msg);
    448     crit_.Enter();
    449     *smsg.ready = true;
    450     smsg.thread->socketserver()->WakeUp();
    451   }
    452   has_sends_ = false;
    453   crit_.Leave();
    454 }
    455 
    456 void Thread::Clear(MessageHandler *phandler, uint32 id,
    457                    MessageList* removed) {
    458   CritScope cs(&crit_);
    459 
    460   // Remove messages on sendlist_ with phandler
    461   // Object target cleared: remove from send list, wakeup/set ready
    462   // if sender not NULL.
    463 
    464   std::list<_SendMessage>::iterator iter = sendlist_.begin();
    465   while (iter != sendlist_.end()) {
    466     _SendMessage smsg = *iter;
    467     if (smsg.msg.Match(phandler, id)) {
    468       if (removed) {
    469         removed->push_back(smsg.msg);
    470       } else {
    471         delete smsg.msg.pdata;
    472       }
    473       iter = sendlist_.erase(iter);
    474       *smsg.ready = true;
    475       smsg.thread->socketserver()->WakeUp();
    476       continue;
    477     }
    478     ++iter;
    479   }
    480 
    481   MessageQueue::Clear(phandler, id, removed);
    482 }
    483 
    484 bool Thread::ProcessMessages(int cmsLoop) {
    485   uint32 msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
    486   int cmsNext = cmsLoop;
    487 
    488   while (true) {
    489     Message msg;
    490     if (!Get(&msg, cmsNext))
    491       return !IsQuitting();
    492     Dispatch(&msg);
    493 
    494     if (cmsLoop != kForever) {
    495       cmsNext = TimeUntil(msEnd);
    496       if (cmsNext < 0)
    497         return true;
    498     }
    499   }
    500 }
    501 
    502 AutoThread::AutoThread(SocketServer* ss) : Thread(ss) {
    503   if (!ThreadManager::CurrentThread()) {
    504     ThreadManager::SetCurrent(this);
    505   }
    506 }
    507 
    508 AutoThread::~AutoThread() {
    509   if (ThreadManager::CurrentThread() == this) {
    510     ThreadManager::SetCurrent(NULL);
    511   }
    512 }
    513 
    514 #ifdef WIN32
    515 void ComThread::Run() {
    516   HRESULT hr = CoInitializeEx(NULL, COINIT_MULTITHREADED);
    517   ASSERT(SUCCEEDED(hr));
    518   if (SUCCEEDED(hr)) {
    519     Thread::Run();
    520     CoUninitialize();
    521   } else {
    522     LOG(LS_ERROR) << "CoInitialize failed, hr=" << hr;
    523   }
    524 }
    525 #endif
    526 
    527 }  // namespace talk_base
    528