1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/threading/thread.h" 6 7 #include "base/bind.h" 8 #include "base/bind_helpers.h" 9 #include "base/lazy_instance.h" 10 #include "base/location.h" 11 #include "base/logging.h" 12 #include "base/run_loop.h" 13 #include "base/synchronization/waitable_event.h" 14 #include "base/third_party/dynamic_annotations/dynamic_annotations.h" 15 #include "base/threading/thread_id_name_manager.h" 16 #include "base/threading/thread_local.h" 17 #include "base/threading/thread_restrictions.h" 18 #include "build/build_config.h" 19 20 #if defined(OS_POSIX) && !defined(OS_NACL) 21 #include "base/files/file_descriptor_watcher_posix.h" 22 #endif 23 24 #if defined(OS_WIN) 25 #include "base/win/scoped_com_initializer.h" 26 #endif 27 28 namespace base { 29 30 namespace { 31 32 // We use this thread-local variable to record whether or not a thread exited 33 // because its Stop method was called. This allows us to catch cases where 34 // MessageLoop::QuitWhenIdle() is called directly, which is unexpected when 35 // using a Thread to setup and run a MessageLoop. 36 base::LazyInstance<base::ThreadLocalBoolean>::Leaky lazy_tls_bool = 37 LAZY_INSTANCE_INITIALIZER; 38 39 } // namespace 40 41 Thread::Options::Options() = default; 42 43 Thread::Options::Options(MessageLoop::Type type, size_t size) 44 : message_loop_type(type), stack_size(size) {} 45 46 Thread::Options::Options(const Options& other) = default; 47 48 Thread::Options::~Options() = default; 49 50 Thread::Thread(const std::string& name) 51 : id_event_(WaitableEvent::ResetPolicy::MANUAL, 52 WaitableEvent::InitialState::NOT_SIGNALED), 53 name_(name), 54 start_event_(WaitableEvent::ResetPolicy::MANUAL, 55 WaitableEvent::InitialState::NOT_SIGNALED) { 56 // Only bind the sequence on Start(): the state is constant between 57 // construction and Start() and it's thus valid for Start() to be called on 58 // another sequence as long as every other operation is then performed on that 59 // sequence. 60 owning_sequence_checker_.DetachFromSequence(); 61 } 62 63 Thread::~Thread() { 64 Stop(); 65 } 66 67 bool Thread::Start() { 68 DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 69 70 Options options; 71 #if defined(OS_WIN) 72 if (com_status_ == STA) 73 options.message_loop_type = MessageLoop::TYPE_UI; 74 #endif 75 return StartWithOptions(options); 76 } 77 78 bool Thread::StartWithOptions(const Options& options) { 79 DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 80 DCHECK(!message_loop_); 81 DCHECK(!IsRunning()); 82 DCHECK(!stopping_) << "Starting a non-joinable thread a second time? That's " 83 << "not allowed!"; 84 #if defined(OS_WIN) 85 DCHECK((com_status_ != STA) || 86 (options.message_loop_type == MessageLoop::TYPE_UI)); 87 #endif 88 89 // Reset |id_| here to support restarting the thread. 90 id_event_.Reset(); 91 id_ = kInvalidThreadId; 92 93 SetThreadWasQuitProperly(false); 94 95 MessageLoop::Type type = options.message_loop_type; 96 if (!options.message_pump_factory.is_null()) 97 type = MessageLoop::TYPE_CUSTOM; 98 99 message_loop_timer_slack_ = options.timer_slack; 100 std::unique_ptr<MessageLoop> message_loop_owned = 101 MessageLoop::CreateUnbound(type, options.message_pump_factory); 102 message_loop_ = message_loop_owned.get(); 103 start_event_.Reset(); 104 105 // Hold |thread_lock_| while starting the new thread to synchronize with 106 // Stop() while it's not guaranteed to be sequenced (until crbug/629139 is 107 // fixed). 108 { 109 AutoLock lock(thread_lock_); 110 bool success = 111 options.joinable 112 ? PlatformThread::CreateWithPriority(options.stack_size, this, 113 &thread_, options.priority) 114 : PlatformThread::CreateNonJoinableWithPriority( 115 options.stack_size, this, options.priority); 116 if (!success) { 117 DLOG(ERROR) << "failed to create thread"; 118 message_loop_ = nullptr; 119 return false; 120 } 121 } 122 123 joinable_ = options.joinable; 124 125 // The ownership of |message_loop_| is managed by the newly created thread 126 // within the ThreadMain. 127 ignore_result(message_loop_owned.release()); 128 129 DCHECK(message_loop_); 130 return true; 131 } 132 133 bool Thread::StartAndWaitForTesting() { 134 DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 135 bool result = Start(); 136 if (!result) 137 return false; 138 WaitUntilThreadStarted(); 139 return true; 140 } 141 142 bool Thread::WaitUntilThreadStarted() const { 143 DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 144 if (!message_loop_) 145 return false; 146 base::ThreadRestrictions::ScopedAllowWait allow_wait; 147 start_event_.Wait(); 148 return true; 149 } 150 151 void Thread::FlushForTesting() { 152 DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 153 if (!message_loop_) 154 return; 155 156 WaitableEvent done(WaitableEvent::ResetPolicy::AUTOMATIC, 157 WaitableEvent::InitialState::NOT_SIGNALED); 158 task_runner()->PostTask(FROM_HERE, 159 BindOnce(&WaitableEvent::Signal, Unretained(&done))); 160 done.Wait(); 161 } 162 163 void Thread::Stop() { 164 DCHECK(joinable_); 165 166 // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and 167 // enable this check, until then synchronization with Start() via 168 // |thread_lock_| is required... 169 // DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 170 AutoLock lock(thread_lock_); 171 172 StopSoon(); 173 174 // Can't join if the |thread_| is either already gone or is non-joinable. 175 if (thread_.is_null()) 176 return; 177 178 // Wait for the thread to exit. 179 // 180 // TODO(darin): Unfortunately, we need to keep |message_loop_| around until 181 // the thread exits. Some consumers are abusing the API. Make them stop. 182 // 183 PlatformThread::Join(thread_); 184 thread_ = base::PlatformThreadHandle(); 185 186 // The thread should nullify |message_loop_| on exit (note: Join() adds an 187 // implicit memory barrier and no lock is thus required for this check). 188 DCHECK(!message_loop_); 189 190 stopping_ = false; 191 } 192 193 void Thread::StopSoon() { 194 // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and 195 // enable this check. 196 // DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 197 198 if (stopping_ || !message_loop_) 199 return; 200 201 stopping_ = true; 202 203 if (using_external_message_loop_) { 204 // Setting |stopping_| to true above should have been sufficient for this 205 // thread to be considered "stopped" per it having never set its |running_| 206 // bit by lack of its own ThreadMain. 207 DCHECK(!IsRunning()); 208 message_loop_ = nullptr; 209 return; 210 } 211 212 task_runner()->PostTask( 213 FROM_HERE, base::BindOnce(&Thread::ThreadQuitHelper, Unretained(this))); 214 } 215 216 void Thread::DetachFromSequence() { 217 DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 218 owning_sequence_checker_.DetachFromSequence(); 219 } 220 221 PlatformThreadId Thread::GetThreadId() const { 222 // If the thread is created but not started yet, wait for |id_| being ready. 223 base::ThreadRestrictions::ScopedAllowWait allow_wait; 224 id_event_.Wait(); 225 return id_; 226 } 227 228 PlatformThreadHandle Thread::GetThreadHandle() const { 229 AutoLock lock(thread_lock_); 230 return thread_; 231 } 232 233 bool Thread::IsRunning() const { 234 // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and 235 // enable this check. 236 // DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 237 238 // If the thread's already started (i.e. |message_loop_| is non-null) and not 239 // yet requested to stop (i.e. |stopping_| is false) we can just return true. 240 // (Note that |stopping_| is touched only on the same sequence that starts / 241 // started the new thread so we need no locking here.) 242 if (message_loop_ && !stopping_) 243 return true; 244 // Otherwise check the |running_| flag, which is set to true by the new thread 245 // only while it is inside Run(). 246 AutoLock lock(running_lock_); 247 return running_; 248 } 249 250 void Thread::Run(RunLoop* run_loop) { 251 // Overridable protected method to be called from our |thread_| only. 252 DCHECK(id_event_.IsSignaled()); 253 DCHECK_EQ(id_, PlatformThread::CurrentId()); 254 255 run_loop->Run(); 256 } 257 258 // static 259 void Thread::SetThreadWasQuitProperly(bool flag) { 260 lazy_tls_bool.Pointer()->Set(flag); 261 } 262 263 // static 264 bool Thread::GetThreadWasQuitProperly() { 265 bool quit_properly = true; 266 #ifndef NDEBUG 267 quit_properly = lazy_tls_bool.Pointer()->Get(); 268 #endif 269 return quit_properly; 270 } 271 272 void Thread::SetMessageLoop(MessageLoop* message_loop) { 273 DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 274 DCHECK(message_loop); 275 276 // Setting |message_loop_| should suffice for this thread to be considered 277 // as "running", until Stop() is invoked. 278 DCHECK(!IsRunning()); 279 message_loop_ = message_loop; 280 DCHECK(IsRunning()); 281 282 using_external_message_loop_ = true; 283 } 284 285 void Thread::ThreadMain() { 286 // First, make GetThreadId() available to avoid deadlocks. It could be called 287 // any place in the following thread initialization code. 288 DCHECK(!id_event_.IsSignaled()); 289 // Note: this read of |id_| while |id_event_| isn't signaled is exceptionally 290 // okay because ThreadMain has a happens-after relationship with the other 291 // write in StartWithOptions(). 292 DCHECK_EQ(kInvalidThreadId, id_); 293 id_ = PlatformThread::CurrentId(); 294 DCHECK_NE(kInvalidThreadId, id_); 295 id_event_.Signal(); 296 297 // Complete the initialization of our Thread object. 298 PlatformThread::SetName(name_.c_str()); 299 ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector. 300 301 // Lazily initialize the |message_loop| so that it can run on this thread. 302 DCHECK(message_loop_); 303 std::unique_ptr<MessageLoop> message_loop(message_loop_); 304 message_loop_->BindToCurrentThread(); 305 message_loop_->SetTimerSlack(message_loop_timer_slack_); 306 307 #if defined(OS_POSIX) && !defined(OS_NACL) 308 // Allow threads running a MessageLoopForIO to use FileDescriptorWatcher API. 309 std::unique_ptr<FileDescriptorWatcher> file_descriptor_watcher; 310 if (MessageLoopForIO::IsCurrent()) { 311 file_descriptor_watcher.reset(new FileDescriptorWatcher( 312 static_cast<MessageLoopForIO*>(message_loop_))); 313 } 314 #endif 315 316 #if defined(OS_WIN) 317 std::unique_ptr<win::ScopedCOMInitializer> com_initializer; 318 if (com_status_ != NONE) { 319 com_initializer.reset((com_status_ == STA) ? 320 new win::ScopedCOMInitializer() : 321 new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA)); 322 } 323 #endif 324 325 // Let the thread do extra initialization. 326 Init(); 327 328 { 329 AutoLock lock(running_lock_); 330 running_ = true; 331 } 332 333 start_event_.Signal(); 334 335 RunLoop run_loop; 336 run_loop_ = &run_loop; 337 Run(run_loop_); 338 339 { 340 AutoLock lock(running_lock_); 341 running_ = false; 342 } 343 344 // Let the thread do extra cleanup. 345 CleanUp(); 346 347 #if defined(OS_WIN) 348 com_initializer.reset(); 349 #endif 350 351 if (message_loop->type() != MessageLoop::TYPE_CUSTOM) { 352 // Assert that RunLoop::QuitWhenIdle was called by ThreadQuitHelper. Don't 353 // check for custom message pumps, because their shutdown might not allow 354 // this. 355 DCHECK(GetThreadWasQuitProperly()); 356 } 357 358 // We can't receive messages anymore. 359 // (The message loop is destructed at the end of this block) 360 message_loop_ = nullptr; 361 run_loop_ = nullptr; 362 } 363 364 void Thread::ThreadQuitHelper() { 365 DCHECK(run_loop_); 366 run_loop_->QuitWhenIdle(); 367 SetThreadWasQuitProperly(true); 368 } 369 370 } // namespace base 371