1 /* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "webrtc/base/thread.h" 12 13 #ifndef __has_feature 14 #define __has_feature(x) 0 // Compatibility with non-clang or LLVM compilers. 15 #endif // __has_feature 16 17 #if defined(WEBRTC_WIN) 18 #include <comdef.h> 19 #elif defined(WEBRTC_POSIX) 20 #include <time.h> 21 #endif 22 23 #include "webrtc/base/common.h" 24 #include "webrtc/base/logging.h" 25 #include "webrtc/base/stringutils.h" 26 #include "webrtc/base/timeutils.h" 27 28 #if !__has_feature(objc_arc) && (defined(WEBRTC_MAC)) 29 #include "webrtc/base/maccocoathreadhelper.h" 30 #include "webrtc/base/scoped_autorelease_pool.h" 31 #endif 32 33 namespace rtc { 34 35 ThreadManager* ThreadManager::Instance() { 36 LIBJINGLE_DEFINE_STATIC_LOCAL(ThreadManager, thread_manager, ()); 37 return &thread_manager; 38 } 39 40 // static 41 Thread* Thread::Current() { 42 return ThreadManager::Instance()->CurrentThread(); 43 } 44 45 #if defined(WEBRTC_POSIX) 46 ThreadManager::ThreadManager() { 47 pthread_key_create(&key_, NULL); 48 #ifndef NO_MAIN_THREAD_WRAPPING 49 WrapCurrentThread(); 50 #endif 51 #if !__has_feature(objc_arc) && (defined(WEBRTC_MAC)) 52 // Under Automatic Reference Counting (ARC), you cannot use autorelease pools 53 // directly. Instead, you use @autoreleasepool blocks instead. Also, we are 54 // maintaining thread safety using immutability within context of GCD dispatch 55 // queues in this case. 56 InitCocoaMultiThreading(); 57 #endif 58 } 59 60 ThreadManager::~ThreadManager() { 61 #if __has_feature(objc_arc) 62 @autoreleasepool 63 #elif defined(WEBRTC_MAC) 64 // This is called during exit, at which point apparently no NSAutoreleasePools 65 // are available; but we might still need them to do cleanup (or we get the 66 // "no autoreleasepool in place, just leaking" warning when exiting). 67 ScopedAutoreleasePool pool; 68 #endif 69 { 70 UnwrapCurrentThread(); 71 pthread_key_delete(key_); 72 } 73 } 74 75 Thread *ThreadManager::CurrentThread() { 76 return static_cast<Thread *>(pthread_getspecific(key_)); 77 } 78 79 void ThreadManager::SetCurrentThread(Thread *thread) { 80 pthread_setspecific(key_, thread); 81 } 82 #endif 83 84 #if defined(WEBRTC_WIN) 85 ThreadManager::ThreadManager() { 86 key_ = TlsAlloc(); 87 #ifndef NO_MAIN_THREAD_WRAPPING 88 WrapCurrentThread(); 89 #endif 90 } 91 92 ThreadManager::~ThreadManager() { 93 UnwrapCurrentThread(); 94 TlsFree(key_); 95 } 96 97 Thread *ThreadManager::CurrentThread() { 98 return static_cast<Thread *>(TlsGetValue(key_)); 99 } 100 101 void ThreadManager::SetCurrentThread(Thread *thread) { 102 TlsSetValue(key_, thread); 103 } 104 #endif 105 106 Thread *ThreadManager::WrapCurrentThread() { 107 Thread* result = CurrentThread(); 108 if (NULL == result) { 109 result = new Thread(); 110 result->WrapCurrentWithThreadManager(this, true); 111 } 112 return result; 113 } 114 115 void ThreadManager::UnwrapCurrentThread() { 116 Thread* t = CurrentThread(); 117 if (t && !(t->IsOwned())) { 118 t->UnwrapCurrent(); 119 delete t; 120 } 121 } 122 123 struct ThreadInit { 124 Thread* thread; 125 Runnable* runnable; 126 }; 127 128 Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls() 129 : thread_(Thread::Current()), 130 previous_state_(thread_->SetAllowBlockingCalls(false)) { 131 } 132 133 Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() { 134 ASSERT(thread_->IsCurrent()); 135 thread_->SetAllowBlockingCalls(previous_state_); 136 } 137 138 Thread::Thread(SocketServer* ss) 139 : MessageQueue(ss), 140 priority_(PRIORITY_NORMAL), 141 running_(true, false), 142 #if defined(WEBRTC_WIN) 143 thread_(NULL), 144 thread_id_(0), 145 #endif 146 owned_(true), 147 blocking_calls_allowed_(true) { 148 SetName("Thread", this); // default name 149 } 150 151 Thread::~Thread() { 152 Stop(); 153 Clear(NULL); 154 } 155 156 bool Thread::SleepMs(int milliseconds) { 157 AssertBlockingIsAllowedOnCurrentThread(); 158 159 #if defined(WEBRTC_WIN) 160 ::Sleep(milliseconds); 161 return true; 162 #else 163 // POSIX has both a usleep() and a nanosleep(), but the former is deprecated, 164 // so we use nanosleep() even though it has greater precision than necessary. 165 struct timespec ts; 166 ts.tv_sec = milliseconds / 1000; 167 ts.tv_nsec = (milliseconds % 1000) * 1000000; 168 int ret = nanosleep(&ts, NULL); 169 if (ret != 0) { 170 LOG_ERR(LS_WARNING) << "nanosleep() returning early"; 171 return false; 172 } 173 return true; 174 #endif 175 } 176 177 bool Thread::SetName(const std::string& name, const void* obj) { 178 if (running()) return false; 179 name_ = name; 180 if (obj) { 181 char buf[16]; 182 sprintfn(buf, sizeof(buf), " 0x%p", obj); 183 name_ += buf; 184 } 185 return true; 186 } 187 188 bool Thread::SetPriority(ThreadPriority priority) { 189 #if defined(WEBRTC_WIN) 190 if (running()) { 191 ASSERT(thread_ != NULL); 192 BOOL ret = FALSE; 193 if (priority == PRIORITY_NORMAL) { 194 ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_NORMAL); 195 } else if (priority == PRIORITY_HIGH) { 196 ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_HIGHEST); 197 } else if (priority == PRIORITY_ABOVE_NORMAL) { 198 ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_ABOVE_NORMAL); 199 } else if (priority == PRIORITY_IDLE) { 200 ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_IDLE); 201 } 202 if (!ret) { 203 return false; 204 } 205 } 206 priority_ = priority; 207 return true; 208 #else 209 // TODO: Implement for Linux/Mac if possible. 210 if (running()) return false; 211 priority_ = priority; 212 return true; 213 #endif 214 } 215 216 bool Thread::Start(Runnable* runnable) { 217 ASSERT(owned_); 218 if (!owned_) return false; 219 ASSERT(!running()); 220 if (running()) return false; 221 222 Restart(); // reset fStop_ if the thread is being restarted 223 224 // Make sure that ThreadManager is created on the main thread before 225 // we start a new thread. 226 ThreadManager::Instance(); 227 228 ThreadInit* init = new ThreadInit; 229 init->thread = this; 230 init->runnable = runnable; 231 #if defined(WEBRTC_WIN) 232 DWORD flags = 0; 233 if (priority_ != PRIORITY_NORMAL) { 234 flags = CREATE_SUSPENDED; 235 } 236 thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PreRun, init, flags, 237 &thread_id_); 238 if (thread_) { 239 running_.Set(); 240 if (priority_ != PRIORITY_NORMAL) { 241 SetPriority(priority_); 242 ::ResumeThread(thread_); 243 } 244 } else { 245 return false; 246 } 247 #elif defined(WEBRTC_POSIX) 248 pthread_attr_t attr; 249 pthread_attr_init(&attr); 250 251 // Thread priorities are not supported in NaCl. 252 #if !defined(__native_client__) 253 if (priority_ != PRIORITY_NORMAL) { 254 if (priority_ == PRIORITY_IDLE) { 255 // There is no POSIX-standard way to set a below-normal priority for an 256 // individual thread (only whole process), so let's not support it. 257 LOG(LS_WARNING) << "PRIORITY_IDLE not supported"; 258 } else { 259 // Set real-time round-robin policy. 260 if (pthread_attr_setschedpolicy(&attr, SCHED_RR) != 0) { 261 LOG(LS_ERROR) << "pthread_attr_setschedpolicy"; 262 } 263 struct sched_param param; 264 if (pthread_attr_getschedparam(&attr, ¶m) != 0) { 265 LOG(LS_ERROR) << "pthread_attr_getschedparam"; 266 } else { 267 // The numbers here are arbitrary. 268 if (priority_ == PRIORITY_HIGH) { 269 param.sched_priority = 6; // 6 = HIGH 270 } else { 271 ASSERT(priority_ == PRIORITY_ABOVE_NORMAL); 272 param.sched_priority = 4; // 4 = ABOVE_NORMAL 273 } 274 if (pthread_attr_setschedparam(&attr, ¶m) != 0) { 275 LOG(LS_ERROR) << "pthread_attr_setschedparam"; 276 } 277 } 278 } 279 } 280 #endif // !defined(__native_client__) 281 282 int error_code = pthread_create(&thread_, &attr, PreRun, init); 283 if (0 != error_code) { 284 LOG(LS_ERROR) << "Unable to create pthread, error " << error_code; 285 return false; 286 } 287 running_.Set(); 288 #endif 289 return true; 290 } 291 292 bool Thread::WrapCurrent() { 293 return WrapCurrentWithThreadManager(ThreadManager::Instance(), true); 294 } 295 296 void Thread::UnwrapCurrent() { 297 // Clears the platform-specific thread-specific storage. 298 ThreadManager::Instance()->SetCurrentThread(NULL); 299 #if defined(WEBRTC_WIN) 300 if (thread_ != NULL) { 301 if (!CloseHandle(thread_)) { 302 LOG_GLE(LS_ERROR) << "When unwrapping thread, failed to close handle."; 303 } 304 thread_ = NULL; 305 } 306 #endif 307 running_.Reset(); 308 } 309 310 void Thread::SafeWrapCurrent() { 311 WrapCurrentWithThreadManager(ThreadManager::Instance(), false); 312 } 313 314 void Thread::Join() { 315 AssertBlockingIsAllowedOnCurrentThread(); 316 317 if (running()) { 318 ASSERT(!IsCurrent()); 319 #if defined(WEBRTC_WIN) 320 ASSERT(thread_ != NULL); 321 WaitForSingleObject(thread_, INFINITE); 322 CloseHandle(thread_); 323 thread_ = NULL; 324 thread_id_ = 0; 325 #elif defined(WEBRTC_POSIX) 326 void *pv; 327 pthread_join(thread_, &pv); 328 #endif 329 running_.Reset(); 330 } 331 } 332 333 bool Thread::SetAllowBlockingCalls(bool allow) { 334 ASSERT(IsCurrent()); 335 bool previous = blocking_calls_allowed_; 336 blocking_calls_allowed_ = allow; 337 return previous; 338 } 339 340 // static 341 void Thread::AssertBlockingIsAllowedOnCurrentThread() { 342 #ifdef _DEBUG 343 Thread* current = Thread::Current(); 344 ASSERT(!current || current->blocking_calls_allowed_); 345 #endif 346 } 347 348 #if defined(WEBRTC_WIN) 349 // As seen on MSDN. 350 // http://msdn.microsoft.com/en-us/library/xcb2z8hs(VS.71).aspx 351 #define MSDEV_SET_THREAD_NAME 0x406D1388 352 typedef struct tagTHREADNAME_INFO { 353 DWORD dwType; 354 LPCSTR szName; 355 DWORD dwThreadID; 356 DWORD dwFlags; 357 } THREADNAME_INFO; 358 359 void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName) { 360 THREADNAME_INFO info; 361 info.dwType = 0x1000; 362 info.szName = szThreadName; 363 info.dwThreadID = dwThreadID; 364 info.dwFlags = 0; 365 366 __try { 367 RaiseException(MSDEV_SET_THREAD_NAME, 0, sizeof(info) / sizeof(DWORD), 368 reinterpret_cast<ULONG_PTR*>(&info)); 369 } 370 __except(EXCEPTION_CONTINUE_EXECUTION) { 371 } 372 } 373 #endif // WEBRTC_WIN 374 375 void* Thread::PreRun(void* pv) { 376 ThreadInit* init = static_cast<ThreadInit*>(pv); 377 ThreadManager::Instance()->SetCurrentThread(init->thread); 378 #if defined(WEBRTC_WIN) 379 SetThreadName(GetCurrentThreadId(), init->thread->name_.c_str()); 380 #elif defined(WEBRTC_POSIX) 381 // TODO: See if naming exists for pthreads. 382 #endif 383 #if __has_feature(objc_arc) 384 @autoreleasepool 385 #elif defined(WEBRTC_MAC) 386 // Make sure the new thread has an autoreleasepool 387 ScopedAutoreleasePool pool; 388 #endif 389 { 390 if (init->runnable) { 391 init->runnable->Run(init->thread); 392 } else { 393 init->thread->Run(); 394 } 395 delete init; 396 return NULL; 397 } 398 } 399 400 void Thread::Run() { 401 ProcessMessages(kForever); 402 } 403 404 bool Thread::IsOwned() { 405 return owned_; 406 } 407 408 void Thread::Stop() { 409 MessageQueue::Quit(); 410 Join(); 411 } 412 413 void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) { 414 if (fStop_) 415 return; 416 417 // Sent messages are sent to the MessageHandler directly, in the context 418 // of "thread", like Win32 SendMessage. If in the right context, 419 // call the handler directly. 420 Message msg; 421 msg.phandler = phandler; 422 msg.message_id = id; 423 msg.pdata = pdata; 424 if (IsCurrent()) { 425 phandler->OnMessage(&msg); 426 return; 427 } 428 429 AssertBlockingIsAllowedOnCurrentThread(); 430 431 AutoThread thread; 432 Thread *current_thread = Thread::Current(); 433 ASSERT(current_thread != NULL); // AutoThread ensures this 434 435 bool ready = false; 436 { 437 CritScope cs(&crit_); 438 _SendMessage smsg; 439 smsg.thread = current_thread; 440 smsg.msg = msg; 441 smsg.ready = &ready; 442 sendlist_.push_back(smsg); 443 } 444 445 // Wait for a reply 446 447 ss_->WakeUp(); 448 449 bool waited = false; 450 crit_.Enter(); 451 while (!ready) { 452 crit_.Leave(); 453 // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary 454 // thread invoking calls on the current thread. 455 current_thread->ReceiveSendsFromThread(this); 456 current_thread->socketserver()->Wait(kForever, false); 457 waited = true; 458 crit_.Enter(); 459 } 460 crit_.Leave(); 461 462 // Our Wait loop above may have consumed some WakeUp events for this 463 // MessageQueue, that weren't relevant to this Send. Losing these WakeUps can 464 // cause problems for some SocketServers. 465 // 466 // Concrete example: 467 // Win32SocketServer on thread A calls Send on thread B. While processing the 468 // message, thread B Posts a message to A. We consume the wakeup for that 469 // Post while waiting for the Send to complete, which means that when we exit 470 // this loop, we need to issue another WakeUp, or else the Posted message 471 // won't be processed in a timely manner. 472 473 if (waited) { 474 current_thread->socketserver()->WakeUp(); 475 } 476 } 477 478 void Thread::ReceiveSends() { 479 ReceiveSendsFromThread(NULL); 480 } 481 482 void Thread::ReceiveSendsFromThread(const Thread* source) { 483 // Receive a sent message. Cleanup scenarios: 484 // - thread sending exits: We don't allow this, since thread can exit 485 // only via Join, so Send must complete. 486 // - thread receiving exits: Wakeup/set ready in Thread::Clear() 487 // - object target cleared: Wakeup/set ready in Thread::Clear() 488 _SendMessage smsg; 489 490 crit_.Enter(); 491 while (PopSendMessageFromThread(source, &smsg)) { 492 crit_.Leave(); 493 494 smsg.msg.phandler->OnMessage(&smsg.msg); 495 496 crit_.Enter(); 497 *smsg.ready = true; 498 smsg.thread->socketserver()->WakeUp(); 499 } 500 crit_.Leave(); 501 } 502 503 bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) { 504 for (std::list<_SendMessage>::iterator it = sendlist_.begin(); 505 it != sendlist_.end(); ++it) { 506 if (it->thread == source || source == NULL) { 507 *msg = *it; 508 sendlist_.erase(it); 509 return true; 510 } 511 } 512 return false; 513 } 514 515 void Thread::Clear(MessageHandler *phandler, uint32 id, 516 MessageList* removed) { 517 CritScope cs(&crit_); 518 519 // Remove messages on sendlist_ with phandler 520 // Object target cleared: remove from send list, wakeup/set ready 521 // if sender not NULL. 522 523 std::list<_SendMessage>::iterator iter = sendlist_.begin(); 524 while (iter != sendlist_.end()) { 525 _SendMessage smsg = *iter; 526 if (smsg.msg.Match(phandler, id)) { 527 if (removed) { 528 removed->push_back(smsg.msg); 529 } else { 530 delete smsg.msg.pdata; 531 } 532 iter = sendlist_.erase(iter); 533 *smsg.ready = true; 534 smsg.thread->socketserver()->WakeUp(); 535 continue; 536 } 537 ++iter; 538 } 539 540 MessageQueue::Clear(phandler, id, removed); 541 } 542 543 bool Thread::ProcessMessages(int cmsLoop) { 544 uint32 msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop); 545 int cmsNext = cmsLoop; 546 547 while (true) { 548 #if __has_feature(objc_arc) 549 @autoreleasepool 550 #elif defined(WEBRTC_MAC) 551 // see: http://developer.apple.com/library/mac/#documentation/Cocoa/Reference/Foundation/Classes/NSAutoreleasePool_Class/Reference/Reference.html 552 // Each thread is supposed to have an autorelease pool. Also for event loops 553 // like this, autorelease pool needs to be created and drained/released 554 // for each cycle. 555 ScopedAutoreleasePool pool; 556 #endif 557 { 558 Message msg; 559 if (!Get(&msg, cmsNext)) 560 return !IsQuitting(); 561 Dispatch(&msg); 562 563 if (cmsLoop != kForever) { 564 cmsNext = TimeUntil(msEnd); 565 if (cmsNext < 0) 566 return true; 567 } 568 } 569 } 570 } 571 572 bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager, 573 bool need_synchronize_access) { 574 if (running()) 575 return false; 576 577 #if defined(WEBRTC_WIN) 578 if (need_synchronize_access) { 579 // We explicitly ask for no rights other than synchronization. 580 // This gives us the best chance of succeeding. 581 thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId()); 582 if (!thread_) { 583 LOG_GLE(LS_ERROR) << "Unable to get handle to thread."; 584 return false; 585 } 586 thread_id_ = GetCurrentThreadId(); 587 } 588 #elif defined(WEBRTC_POSIX) 589 thread_ = pthread_self(); 590 #endif 591 592 owned_ = false; 593 running_.Set(); 594 thread_manager->SetCurrentThread(this); 595 return true; 596 } 597 598 AutoThread::AutoThread(SocketServer* ss) : Thread(ss) { 599 if (!ThreadManager::Instance()->CurrentThread()) { 600 ThreadManager::Instance()->SetCurrentThread(this); 601 } 602 } 603 604 AutoThread::~AutoThread() { 605 Stop(); 606 if (ThreadManager::Instance()->CurrentThread() == this) { 607 ThreadManager::Instance()->SetCurrentThread(NULL); 608 } 609 } 610 611 #if defined(WEBRTC_WIN) 612 void ComThread::Run() { 613 HRESULT hr = CoInitializeEx(NULL, COINIT_MULTITHREADED); 614 ASSERT(SUCCEEDED(hr)); 615 if (SUCCEEDED(hr)) { 616 Thread::Run(); 617 CoUninitialize(); 618 } else { 619 LOG(LS_ERROR) << "CoInitialize failed, hr=" << hr; 620 } 621 } 622 #endif 623 624 } // namespace rtc 625