1 /* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "webrtc/base/thread.h" 12 13 #ifndef __has_feature 14 #define __has_feature(x) 0 // Compatibility with non-clang or LLVM compilers. 15 #endif // __has_feature 16 17 #if defined(WEBRTC_WIN) 18 #include <comdef.h> 19 #elif defined(WEBRTC_POSIX) 20 #include <time.h> 21 #endif 22 23 #include "webrtc/base/common.h" 24 #include "webrtc/base/logging.h" 25 #include "webrtc/base/platform_thread.h" 26 #include "webrtc/base/stringutils.h" 27 #include "webrtc/base/timeutils.h" 28 29 #if !__has_feature(objc_arc) && (defined(WEBRTC_MAC)) 30 #include "webrtc/base/maccocoathreadhelper.h" 31 #include "webrtc/base/scoped_autorelease_pool.h" 32 #endif 33 34 #include "webrtc/base/trace_event.h" 35 36 namespace rtc { 37 38 ThreadManager* ThreadManager::Instance() { 39 RTC_DEFINE_STATIC_LOCAL(ThreadManager, thread_manager, ()); 40 return &thread_manager; 41 } 42 43 // static 44 Thread* Thread::Current() { 45 return ThreadManager::Instance()->CurrentThread(); 46 } 47 48 #if defined(WEBRTC_POSIX) 49 ThreadManager::ThreadManager() { 50 pthread_key_create(&key_, NULL); 51 #ifndef NO_MAIN_THREAD_WRAPPING 52 WrapCurrentThread(); 53 #endif 54 #if !__has_feature(objc_arc) && (defined(WEBRTC_MAC)) 55 // Under Automatic Reference Counting (ARC), you cannot use autorelease pools 56 // directly. Instead, you use @autoreleasepool blocks instead. Also, we are 57 // maintaining thread safety using immutability within context of GCD dispatch 58 // queues in this case. 59 InitCocoaMultiThreading(); 60 #endif 61 } 62 63 ThreadManager::~ThreadManager() { 64 #if __has_feature(objc_arc) 65 @autoreleasepool 66 #elif defined(WEBRTC_MAC) 67 // This is called during exit, at which point apparently no NSAutoreleasePools 68 // are available; but we might still need them to do cleanup (or we get the 69 // "no autoreleasepool in place, just leaking" warning when exiting). 70 ScopedAutoreleasePool pool; 71 #endif 72 { 73 UnwrapCurrentThread(); 74 pthread_key_delete(key_); 75 } 76 } 77 78 Thread *ThreadManager::CurrentThread() { 79 return static_cast<Thread *>(pthread_getspecific(key_)); 80 } 81 82 void ThreadManager::SetCurrentThread(Thread *thread) { 83 pthread_setspecific(key_, thread); 84 } 85 #endif 86 87 #if defined(WEBRTC_WIN) 88 ThreadManager::ThreadManager() { 89 key_ = TlsAlloc(); 90 #ifndef NO_MAIN_THREAD_WRAPPING 91 WrapCurrentThread(); 92 #endif 93 } 94 95 ThreadManager::~ThreadManager() { 96 UnwrapCurrentThread(); 97 TlsFree(key_); 98 } 99 100 Thread *ThreadManager::CurrentThread() { 101 return static_cast<Thread *>(TlsGetValue(key_)); 102 } 103 104 void ThreadManager::SetCurrentThread(Thread *thread) { 105 TlsSetValue(key_, thread); 106 } 107 #endif 108 109 Thread *ThreadManager::WrapCurrentThread() { 110 Thread* result = CurrentThread(); 111 if (NULL == result) { 112 result = new Thread(); 113 result->WrapCurrentWithThreadManager(this, true); 114 } 115 return result; 116 } 117 118 void ThreadManager::UnwrapCurrentThread() { 119 Thread* t = CurrentThread(); 120 if (t && !(t->IsOwned())) { 121 t->UnwrapCurrent(); 122 delete t; 123 } 124 } 125 126 struct ThreadInit { 127 Thread* thread; 128 Runnable* runnable; 129 }; 130 131 Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls() 132 : thread_(Thread::Current()), 133 previous_state_(thread_->SetAllowBlockingCalls(false)) { 134 } 135 136 Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() { 137 ASSERT(thread_->IsCurrent()); 138 thread_->SetAllowBlockingCalls(previous_state_); 139 } 140 141 Thread::Thread(SocketServer* ss) 142 : MessageQueue(ss), 143 running_(true, false), 144 #if defined(WEBRTC_WIN) 145 thread_(NULL), 146 thread_id_(0), 147 #endif 148 owned_(true), 149 blocking_calls_allowed_(true) { 150 SetName("Thread", this); // default name 151 } 152 153 Thread::~Thread() { 154 Stop(); 155 Clear(NULL); 156 } 157 158 bool Thread::SleepMs(int milliseconds) { 159 AssertBlockingIsAllowedOnCurrentThread(); 160 161 #if defined(WEBRTC_WIN) 162 ::Sleep(milliseconds); 163 return true; 164 #else 165 // POSIX has both a usleep() and a nanosleep(), but the former is deprecated, 166 // so we use nanosleep() even though it has greater precision than necessary. 167 struct timespec ts; 168 ts.tv_sec = milliseconds / 1000; 169 ts.tv_nsec = (milliseconds % 1000) * 1000000; 170 int ret = nanosleep(&ts, NULL); 171 if (ret != 0) { 172 LOG_ERR(LS_WARNING) << "nanosleep() returning early"; 173 return false; 174 } 175 return true; 176 #endif 177 } 178 179 bool Thread::SetName(const std::string& name, const void* obj) { 180 if (running()) return false; 181 name_ = name; 182 if (obj) { 183 char buf[16]; 184 sprintfn(buf, sizeof(buf), " 0x%p", obj); 185 name_ += buf; 186 } 187 return true; 188 } 189 190 bool Thread::Start(Runnable* runnable) { 191 ASSERT(owned_); 192 if (!owned_) return false; 193 ASSERT(!running()); 194 if (running()) return false; 195 196 Restart(); // reset fStop_ if the thread is being restarted 197 198 // Make sure that ThreadManager is created on the main thread before 199 // we start a new thread. 200 ThreadManager::Instance(); 201 202 ThreadInit* init = new ThreadInit; 203 init->thread = this; 204 init->runnable = runnable; 205 #if defined(WEBRTC_WIN) 206 thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PreRun, init, 0, 207 &thread_id_); 208 if (thread_) { 209 running_.Set(); 210 } else { 211 return false; 212 } 213 #elif defined(WEBRTC_POSIX) 214 pthread_attr_t attr; 215 pthread_attr_init(&attr); 216 217 int error_code = pthread_create(&thread_, &attr, PreRun, init); 218 if (0 != error_code) { 219 LOG(LS_ERROR) << "Unable to create pthread, error " << error_code; 220 return false; 221 } 222 running_.Set(); 223 #endif 224 return true; 225 } 226 227 bool Thread::WrapCurrent() { 228 return WrapCurrentWithThreadManager(ThreadManager::Instance(), true); 229 } 230 231 void Thread::UnwrapCurrent() { 232 // Clears the platform-specific thread-specific storage. 233 ThreadManager::Instance()->SetCurrentThread(NULL); 234 #if defined(WEBRTC_WIN) 235 if (thread_ != NULL) { 236 if (!CloseHandle(thread_)) { 237 LOG_GLE(LS_ERROR) << "When unwrapping thread, failed to close handle."; 238 } 239 thread_ = NULL; 240 } 241 #endif 242 running_.Reset(); 243 } 244 245 void Thread::SafeWrapCurrent() { 246 WrapCurrentWithThreadManager(ThreadManager::Instance(), false); 247 } 248 249 void Thread::Join() { 250 if (running()) { 251 ASSERT(!IsCurrent()); 252 if (Current() && !Current()->blocking_calls_allowed_) { 253 LOG(LS_WARNING) << "Waiting for the thread to join, " 254 << "but blocking calls have been disallowed"; 255 } 256 257 #if defined(WEBRTC_WIN) 258 ASSERT(thread_ != NULL); 259 WaitForSingleObject(thread_, INFINITE); 260 CloseHandle(thread_); 261 thread_ = NULL; 262 thread_id_ = 0; 263 #elif defined(WEBRTC_POSIX) 264 void *pv; 265 pthread_join(thread_, &pv); 266 #endif 267 running_.Reset(); 268 } 269 } 270 271 bool Thread::SetAllowBlockingCalls(bool allow) { 272 ASSERT(IsCurrent()); 273 bool previous = blocking_calls_allowed_; 274 blocking_calls_allowed_ = allow; 275 return previous; 276 } 277 278 // static 279 void Thread::AssertBlockingIsAllowedOnCurrentThread() { 280 #if !defined(NDEBUG) 281 Thread* current = Thread::Current(); 282 ASSERT(!current || current->blocking_calls_allowed_); 283 #endif 284 } 285 286 void* Thread::PreRun(void* pv) { 287 ThreadInit* init = static_cast<ThreadInit*>(pv); 288 ThreadManager::Instance()->SetCurrentThread(init->thread); 289 rtc::SetCurrentThreadName(init->thread->name_.c_str()); 290 #if __has_feature(objc_arc) 291 @autoreleasepool 292 #elif defined(WEBRTC_MAC) 293 // Make sure the new thread has an autoreleasepool 294 ScopedAutoreleasePool pool; 295 #endif 296 { 297 if (init->runnable) { 298 init->runnable->Run(init->thread); 299 } else { 300 init->thread->Run(); 301 } 302 delete init; 303 return NULL; 304 } 305 } 306 307 void Thread::Run() { 308 ProcessMessages(kForever); 309 } 310 311 bool Thread::IsOwned() { 312 return owned_; 313 } 314 315 void Thread::Stop() { 316 MessageQueue::Quit(); 317 Join(); 318 } 319 320 void Thread::Send(MessageHandler* phandler, uint32_t id, MessageData* pdata) { 321 if (fStop_) 322 return; 323 324 // Sent messages are sent to the MessageHandler directly, in the context 325 // of "thread", like Win32 SendMessage. If in the right context, 326 // call the handler directly. 327 Message msg; 328 msg.phandler = phandler; 329 msg.message_id = id; 330 msg.pdata = pdata; 331 if (IsCurrent()) { 332 phandler->OnMessage(&msg); 333 return; 334 } 335 336 AssertBlockingIsAllowedOnCurrentThread(); 337 338 AutoThread thread; 339 Thread *current_thread = Thread::Current(); 340 ASSERT(current_thread != NULL); // AutoThread ensures this 341 342 bool ready = false; 343 { 344 CritScope cs(&crit_); 345 _SendMessage smsg; 346 smsg.thread = current_thread; 347 smsg.msg = msg; 348 smsg.ready = &ready; 349 sendlist_.push_back(smsg); 350 } 351 352 // Wait for a reply 353 354 ss_->WakeUp(); 355 356 bool waited = false; 357 crit_.Enter(); 358 while (!ready) { 359 crit_.Leave(); 360 // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary 361 // thread invoking calls on the current thread. 362 current_thread->ReceiveSendsFromThread(this); 363 current_thread->socketserver()->Wait(kForever, false); 364 waited = true; 365 crit_.Enter(); 366 } 367 crit_.Leave(); 368 369 // Our Wait loop above may have consumed some WakeUp events for this 370 // MessageQueue, that weren't relevant to this Send. Losing these WakeUps can 371 // cause problems for some SocketServers. 372 // 373 // Concrete example: 374 // Win32SocketServer on thread A calls Send on thread B. While processing the 375 // message, thread B Posts a message to A. We consume the wakeup for that 376 // Post while waiting for the Send to complete, which means that when we exit 377 // this loop, we need to issue another WakeUp, or else the Posted message 378 // won't be processed in a timely manner. 379 380 if (waited) { 381 current_thread->socketserver()->WakeUp(); 382 } 383 } 384 385 void Thread::ReceiveSends() { 386 ReceiveSendsFromThread(NULL); 387 } 388 389 void Thread::ReceiveSendsFromThread(const Thread* source) { 390 // Receive a sent message. Cleanup scenarios: 391 // - thread sending exits: We don't allow this, since thread can exit 392 // only via Join, so Send must complete. 393 // - thread receiving exits: Wakeup/set ready in Thread::Clear() 394 // - object target cleared: Wakeup/set ready in Thread::Clear() 395 _SendMessage smsg; 396 397 crit_.Enter(); 398 while (PopSendMessageFromThread(source, &smsg)) { 399 crit_.Leave(); 400 401 smsg.msg.phandler->OnMessage(&smsg.msg); 402 403 crit_.Enter(); 404 *smsg.ready = true; 405 smsg.thread->socketserver()->WakeUp(); 406 } 407 crit_.Leave(); 408 } 409 410 bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) { 411 for (std::list<_SendMessage>::iterator it = sendlist_.begin(); 412 it != sendlist_.end(); ++it) { 413 if (it->thread == source || source == NULL) { 414 *msg = *it; 415 sendlist_.erase(it); 416 return true; 417 } 418 } 419 return false; 420 } 421 422 void Thread::InvokeBegin() { 423 TRACE_EVENT_BEGIN0("webrtc", "Thread::Invoke"); 424 } 425 426 void Thread::InvokeEnd() { 427 TRACE_EVENT_END0("webrtc", "Thread::Invoke"); 428 } 429 430 void Thread::Clear(MessageHandler* phandler, 431 uint32_t id, 432 MessageList* removed) { 433 CritScope cs(&crit_); 434 435 // Remove messages on sendlist_ with phandler 436 // Object target cleared: remove from send list, wakeup/set ready 437 // if sender not NULL. 438 439 std::list<_SendMessage>::iterator iter = sendlist_.begin(); 440 while (iter != sendlist_.end()) { 441 _SendMessage smsg = *iter; 442 if (smsg.msg.Match(phandler, id)) { 443 if (removed) { 444 removed->push_back(smsg.msg); 445 } else { 446 delete smsg.msg.pdata; 447 } 448 iter = sendlist_.erase(iter); 449 *smsg.ready = true; 450 smsg.thread->socketserver()->WakeUp(); 451 continue; 452 } 453 ++iter; 454 } 455 456 MessageQueue::Clear(phandler, id, removed); 457 } 458 459 bool Thread::ProcessMessages(int cmsLoop) { 460 uint32_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop); 461 int cmsNext = cmsLoop; 462 463 while (true) { 464 #if __has_feature(objc_arc) 465 @autoreleasepool 466 #elif defined(WEBRTC_MAC) 467 // see: http://developer.apple.com/library/mac/#documentation/Cocoa/Reference/Foundation/Classes/NSAutoreleasePool_Class/Reference/Reference.html 468 // Each thread is supposed to have an autorelease pool. Also for event loops 469 // like this, autorelease pool needs to be created and drained/released 470 // for each cycle. 471 ScopedAutoreleasePool pool; 472 #endif 473 { 474 Message msg; 475 if (!Get(&msg, cmsNext)) 476 return !IsQuitting(); 477 Dispatch(&msg); 478 479 if (cmsLoop != kForever) { 480 cmsNext = TimeUntil(msEnd); 481 if (cmsNext < 0) 482 return true; 483 } 484 } 485 } 486 } 487 488 bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager, 489 bool need_synchronize_access) { 490 if (running()) 491 return false; 492 493 #if defined(WEBRTC_WIN) 494 if (need_synchronize_access) { 495 // We explicitly ask for no rights other than synchronization. 496 // This gives us the best chance of succeeding. 497 thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId()); 498 if (!thread_) { 499 LOG_GLE(LS_ERROR) << "Unable to get handle to thread."; 500 return false; 501 } 502 thread_id_ = GetCurrentThreadId(); 503 } 504 #elif defined(WEBRTC_POSIX) 505 thread_ = pthread_self(); 506 #endif 507 508 owned_ = false; 509 running_.Set(); 510 thread_manager->SetCurrentThread(this); 511 return true; 512 } 513 514 AutoThread::AutoThread(SocketServer* ss) : Thread(ss) { 515 if (!ThreadManager::Instance()->CurrentThread()) { 516 ThreadManager::Instance()->SetCurrentThread(this); 517 } 518 } 519 520 AutoThread::~AutoThread() { 521 Stop(); 522 if (ThreadManager::Instance()->CurrentThread() == this) { 523 ThreadManager::Instance()->SetCurrentThread(NULL); 524 } 525 } 526 527 #if defined(WEBRTC_WIN) 528 void ComThread::Run() { 529 HRESULT hr = CoInitializeEx(NULL, COINIT_MULTITHREADED); 530 ASSERT(SUCCEEDED(hr)); 531 if (SUCCEEDED(hr)) { 532 Thread::Run(); 533 CoUninitialize(); 534 } else { 535 LOG(LS_ERROR) << "CoInitialize failed, hr=" << hr; 536 } 537 } 538 #endif 539 540 } // namespace rtc 541