1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/threading/thread.h" 6 7 #include "base/bind.h" 8 #include "base/bind_helpers.h" 9 #include "base/lazy_instance.h" 10 #include "base/location.h" 11 #include "base/logging.h" 12 #include "base/run_loop.h" 13 #include "base/synchronization/waitable_event.h" 14 #include "base/third_party/dynamic_annotations/dynamic_annotations.h" 15 #include "base/threading/thread_id_name_manager.h" 16 #include "base/threading/thread_local.h" 17 #include "base/threading/thread_restrictions.h" 18 #include "build/build_config.h" 19 20 #if defined(OS_POSIX) && !defined(OS_NACL) 21 #include "base/files/file_descriptor_watcher_posix.h" 22 #endif 23 24 #if defined(OS_WIN) 25 #include "base/win/scoped_com_initializer.h" 26 #endif 27 28 namespace base { 29 30 namespace { 31 32 // We use this thread-local variable to record whether or not a thread exited 33 // because its Stop method was called. This allows us to catch cases where 34 // MessageLoop::QuitWhenIdle() is called directly, which is unexpected when 35 // using a Thread to setup and run a MessageLoop. 36 base::LazyInstance<base::ThreadLocalBoolean>::Leaky lazy_tls_bool = 37 LAZY_INSTANCE_INITIALIZER; 38 39 } // namespace 40 41 Thread::Options::Options() = default; 42 43 Thread::Options::Options(MessageLoop::Type type, size_t size) 44 : message_loop_type(type), stack_size(size) {} 45 46 Thread::Options::Options(const Options& other) = default; 47 48 Thread::Options::~Options() = default; 49 50 Thread::Thread(const std::string& name) 51 : id_event_(WaitableEvent::ResetPolicy::MANUAL, 52 WaitableEvent::InitialState::NOT_SIGNALED), 53 name_(name), 54 start_event_(WaitableEvent::ResetPolicy::MANUAL, 55 WaitableEvent::InitialState::NOT_SIGNALED) { 56 // Only bind the sequence on Start(): the state is constant between 57 // construction and Start() and it's thus valid for Start() to be called on 58 // another sequence as long as every other operation is then performed on that 59 // sequence. 60 owning_sequence_checker_.DetachFromSequence(); 61 } 62 63 Thread::~Thread() { 64 Stop(); 65 } 66 67 bool Thread::Start() { 68 DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 69 70 Options options; 71 #if defined(OS_WIN) 72 if (com_status_ == STA) 73 options.message_loop_type = MessageLoop::TYPE_UI; 74 #endif 75 return StartWithOptions(options); 76 } 77 78 bool Thread::StartWithOptions(const Options& options) { 79 DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 80 DCHECK(!message_loop_); 81 DCHECK(!IsRunning()); 82 DCHECK(!stopping_) << "Starting a non-joinable thread a second time? That's " 83 << "not allowed!"; 84 #if defined(OS_WIN) 85 DCHECK((com_status_ != STA) || 86 (options.message_loop_type == MessageLoop::TYPE_UI)); 87 #endif 88 89 // Reset |id_| here to support restarting the thread. 90 id_event_.Reset(); 91 id_ = kInvalidThreadId; 92 93 SetThreadWasQuitProperly(false); 94 95 MessageLoop::Type type = options.message_loop_type; 96 if (!options.message_pump_factory.is_null()) 97 type = MessageLoop::TYPE_CUSTOM; 98 99 message_loop_timer_slack_ = options.timer_slack; 100 std::unique_ptr<MessageLoop> message_loop_owned = 101 MessageLoop::CreateUnbound(type, options.message_pump_factory); 102 message_loop_ = message_loop_owned.get(); 103 start_event_.Reset(); 104 105 // Hold |thread_lock_| while starting the new thread to synchronize with 106 // Stop() while it's not guaranteed to be sequenced (until crbug/629139 is 107 // fixed). 108 { 109 AutoLock lock(thread_lock_); 110 bool success = 111 options.joinable 112 ? PlatformThread::CreateWithPriority(options.stack_size, this, 113 &thread_, options.priority) 114 : PlatformThread::CreateNonJoinableWithPriority( 115 options.stack_size, this, options.priority); 116 if (!success) { 117 DLOG(ERROR) << "failed to create thread"; 118 message_loop_ = nullptr; 119 return false; 120 } 121 } 122 123 joinable_ = options.joinable; 124 125 // The ownership of |message_loop_| is managed by the newly created thread 126 // within the ThreadMain. 127 ignore_result(message_loop_owned.release()); 128 129 DCHECK(message_loop_); 130 return true; 131 } 132 133 bool Thread::StartAndWaitForTesting() { 134 DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 135 bool result = Start(); 136 if (!result) 137 return false; 138 WaitUntilThreadStarted(); 139 return true; 140 } 141 142 bool Thread::WaitUntilThreadStarted() const { 143 DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 144 if (!message_loop_) 145 return false; 146 base::ThreadRestrictions::ScopedAllowWait allow_wait; 147 start_event_.Wait(); 148 return true; 149 } 150 151 void Thread::FlushForTesting() { 152 DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 153 if (!message_loop_) 154 return; 155 156 WaitableEvent done(WaitableEvent::ResetPolicy::AUTOMATIC, 157 WaitableEvent::InitialState::NOT_SIGNALED); 158 task_runner()->PostTask(FROM_HERE, 159 Bind(&WaitableEvent::Signal, Unretained(&done))); 160 done.Wait(); 161 } 162 163 void Thread::Stop() { 164 DCHECK(joinable_); 165 166 // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and 167 // enable this check, until then synchronization with Start() via 168 // |thread_lock_| is required... 169 // DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 170 AutoLock lock(thread_lock_); 171 172 StopSoon(); 173 174 // Can't join if the |thread_| is either already gone or is non-joinable. 175 if (thread_.is_null()) 176 return; 177 178 // Wait for the thread to exit. 179 // 180 // TODO(darin): Unfortunately, we need to keep |message_loop_| around until 181 // the thread exits. Some consumers are abusing the API. Make them stop. 182 // 183 PlatformThread::Join(thread_); 184 thread_ = base::PlatformThreadHandle(); 185 186 // The thread should nullify |message_loop_| on exit (note: Join() adds an 187 // implicit memory barrier and no lock is thus required for this check). 188 DCHECK(!message_loop_); 189 190 stopping_ = false; 191 } 192 193 void Thread::StopSoon() { 194 // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and 195 // enable this check. 196 // DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 197 198 if (stopping_ || !message_loop_) 199 return; 200 201 stopping_ = true; 202 203 if (using_external_message_loop_) { 204 // Setting |stopping_| to true above should have been sufficient for this 205 // thread to be considered "stopped" per it having never set its |running_| 206 // bit by lack of its own ThreadMain. 207 DCHECK(!IsRunning()); 208 message_loop_ = nullptr; 209 return; 210 } 211 212 task_runner()->PostTask( 213 FROM_HERE, base::Bind(&Thread::ThreadQuitHelper, Unretained(this))); 214 } 215 216 void Thread::DetachFromSequence() { 217 DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 218 owning_sequence_checker_.DetachFromSequence(); 219 } 220 221 PlatformThreadId Thread::GetThreadId() const { 222 // If the thread is created but not started yet, wait for |id_| being ready. 223 base::ThreadRestrictions::ScopedAllowWait allow_wait; 224 id_event_.Wait(); 225 return id_; 226 } 227 228 bool Thread::IsRunning() const { 229 // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and 230 // enable this check. 231 // DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 232 233 // If the thread's already started (i.e. |message_loop_| is non-null) and not 234 // yet requested to stop (i.e. |stopping_| is false) we can just return true. 235 // (Note that |stopping_| is touched only on the same sequence that starts / 236 // started the new thread so we need no locking here.) 237 if (message_loop_ && !stopping_) 238 return true; 239 // Otherwise check the |running_| flag, which is set to true by the new thread 240 // only while it is inside Run(). 241 AutoLock lock(running_lock_); 242 return running_; 243 } 244 245 void Thread::Run(RunLoop* run_loop) { 246 // Overridable protected method to be called from our |thread_| only. 247 DCHECK(id_event_.IsSignaled()); 248 DCHECK_EQ(id_, PlatformThread::CurrentId()); 249 250 run_loop->Run(); 251 } 252 253 // static 254 void Thread::SetThreadWasQuitProperly(bool flag) { 255 lazy_tls_bool.Pointer()->Set(flag); 256 } 257 258 // static 259 bool Thread::GetThreadWasQuitProperly() { 260 bool quit_properly = true; 261 #ifndef NDEBUG 262 quit_properly = lazy_tls_bool.Pointer()->Get(); 263 #endif 264 return quit_properly; 265 } 266 267 void Thread::SetMessageLoop(MessageLoop* message_loop) { 268 DCHECK(owning_sequence_checker_.CalledOnValidSequence()); 269 DCHECK(message_loop); 270 271 // Setting |message_loop_| should suffice for this thread to be considered 272 // as "running", until Stop() is invoked. 273 DCHECK(!IsRunning()); 274 message_loop_ = message_loop; 275 DCHECK(IsRunning()); 276 277 using_external_message_loop_ = true; 278 } 279 280 void Thread::ThreadMain() { 281 // First, make GetThreadId() available to avoid deadlocks. It could be called 282 // any place in the following thread initialization code. 283 DCHECK(!id_event_.IsSignaled()); 284 // Note: this read of |id_| while |id_event_| isn't signaled is exceptionally 285 // okay because ThreadMain has a happens-after relationship with the other 286 // write in StartWithOptions(). 287 DCHECK_EQ(kInvalidThreadId, id_); 288 id_ = PlatformThread::CurrentId(); 289 DCHECK_NE(kInvalidThreadId, id_); 290 id_event_.Signal(); 291 292 // Complete the initialization of our Thread object. 293 PlatformThread::SetName(name_.c_str()); 294 ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector. 295 296 // Lazily initialize the |message_loop| so that it can run on this thread. 297 DCHECK(message_loop_); 298 std::unique_ptr<MessageLoop> message_loop(message_loop_); 299 message_loop_->BindToCurrentThread(); 300 message_loop_->SetTimerSlack(message_loop_timer_slack_); 301 302 #if defined(OS_POSIX) && !defined(OS_NACL) 303 // Allow threads running a MessageLoopForIO to use FileDescriptorWatcher API. 304 std::unique_ptr<FileDescriptorWatcher> file_descriptor_watcher; 305 if (MessageLoopForIO::IsCurrent()) { 306 DCHECK_EQ(message_loop_, MessageLoopForIO::current()); 307 file_descriptor_watcher.reset( 308 new FileDescriptorWatcher(MessageLoopForIO::current())); 309 } 310 #endif 311 312 #if defined(OS_WIN) 313 std::unique_ptr<win::ScopedCOMInitializer> com_initializer; 314 if (com_status_ != NONE) { 315 com_initializer.reset((com_status_ == STA) ? 316 new win::ScopedCOMInitializer() : 317 new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA)); 318 } 319 #endif 320 321 // Let the thread do extra initialization. 322 Init(); 323 324 { 325 AutoLock lock(running_lock_); 326 running_ = true; 327 } 328 329 start_event_.Signal(); 330 331 RunLoop run_loop; 332 run_loop_ = &run_loop; 333 Run(run_loop_); 334 335 { 336 AutoLock lock(running_lock_); 337 running_ = false; 338 } 339 340 // Let the thread do extra cleanup. 341 CleanUp(); 342 343 #if defined(OS_WIN) 344 com_initializer.reset(); 345 #endif 346 347 if (message_loop->type() != MessageLoop::TYPE_CUSTOM) { 348 // Assert that RunLoop::QuitWhenIdle was called by ThreadQuitHelper. Don't 349 // check for custom message pumps, because their shutdown might not allow 350 // this. 351 DCHECK(GetThreadWasQuitProperly()); 352 } 353 354 // We can't receive messages anymore. 355 // (The message loop is destructed at the end of this block) 356 message_loop_ = nullptr; 357 run_loop_ = nullptr; 358 } 359 360 void Thread::ThreadQuitHelper() { 361 DCHECK(run_loop_); 362 run_loop_->QuitWhenIdle(); 363 SetThreadWasQuitProperly(true); 364 } 365 366 } // namespace base 367