1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/threading/worker_pool_posix.h" 6 7 #include "base/bind.h" 8 #include "base/callback.h" 9 #include "base/debug/trace_event.h" 10 #include "base/lazy_instance.h" 11 #include "base/logging.h" 12 #include "base/memory/ref_counted.h" 13 #include "base/strings/stringprintf.h" 14 #include "base/threading/platform_thread.h" 15 #include "base/threading/thread_local.h" 16 #include "base/threading/worker_pool.h" 17 #include "base/tracked_objects.h" 18 19 using tracked_objects::TrackedTime; 20 21 namespace base { 22 23 namespace { 24 25 base::LazyInstance<ThreadLocalBoolean>::Leaky 26 g_worker_pool_running_on_this_thread = LAZY_INSTANCE_INITIALIZER; 27 28 const int kIdleSecondsBeforeExit = 10 * 60; 29 30 #ifdef ADDRESS_SANITIZER 31 const int kWorkerThreadStackSize = 256 * 1024; 32 #else 33 // A stack size of 64 KB is too small for the CERT_PKIXVerifyCert 34 // function of NSS because of NSS bug 439169. 35 const int kWorkerThreadStackSize = 128 * 1024; 36 #endif 37 38 class WorkerPoolImpl { 39 public: 40 WorkerPoolImpl(); 41 ~WorkerPoolImpl(); 42 43 void PostTask(const tracked_objects::Location& from_here, 44 const base::Closure& task, bool task_is_slow); 45 46 private: 47 scoped_refptr<base::PosixDynamicThreadPool> pool_; 48 }; 49 50 WorkerPoolImpl::WorkerPoolImpl() 51 : pool_(new base::PosixDynamicThreadPool("WorkerPool", 52 kIdleSecondsBeforeExit)) { 53 } 54 55 WorkerPoolImpl::~WorkerPoolImpl() { 56 pool_->Terminate(); 57 } 58 59 void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, 60 const base::Closure& task, bool task_is_slow) { 61 pool_->PostTask(from_here, task); 62 } 63 64 base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool = 65 LAZY_INSTANCE_INITIALIZER; 66 67 class WorkerThread : public PlatformThread::Delegate { 68 public: 69 WorkerThread(const std::string& name_prefix, 70 base::PosixDynamicThreadPool* pool) 71 : name_prefix_(name_prefix), 72 pool_(pool) {} 73 74 virtual void ThreadMain() OVERRIDE; 75 76 private: 77 const std::string name_prefix_; 78 scoped_refptr<base::PosixDynamicThreadPool> pool_; 79 80 DISALLOW_COPY_AND_ASSIGN(WorkerThread); 81 }; 82 83 void WorkerThread::ThreadMain() { 84 g_worker_pool_running_on_this_thread.Get().Set(true); 85 const std::string name = base::StringPrintf( 86 "%s/%d", name_prefix_.c_str(), PlatformThread::CurrentId()); 87 // Note |name.c_str()| must remain valid for for the whole life of the thread. 88 PlatformThread::SetName(name.c_str()); 89 90 for (;;) { 91 PendingTask pending_task = pool_->WaitForTask(); 92 if (pending_task.task.is_null()) 93 break; 94 TRACE_EVENT2("task", "WorkerThread::ThreadMain::Run", 95 "src_file", pending_task.posted_from.file_name(), 96 "src_func", pending_task.posted_from.function_name()); 97 98 TrackedTime start_time = 99 tracked_objects::ThreadData::NowForStartOfRun(pending_task.birth_tally); 100 101 pending_task.task.Run(); 102 103 tracked_objects::ThreadData::TallyRunOnWorkerThreadIfTracking( 104 pending_task.birth_tally, TrackedTime(pending_task.time_posted), 105 start_time, tracked_objects::ThreadData::NowForEndOfRun()); 106 } 107 108 // The WorkerThread is non-joinable, so it deletes itself. 109 delete this; 110 } 111 112 } // namespace 113 114 // static 115 bool WorkerPool::PostTask(const tracked_objects::Location& from_here, 116 const base::Closure& task, bool task_is_slow) { 117 g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); 118 return true; 119 } 120 121 // static 122 bool WorkerPool::RunsTasksOnCurrentThread() { 123 return g_worker_pool_running_on_this_thread.Get().Get(); 124 } 125 126 PosixDynamicThreadPool::PosixDynamicThreadPool(const std::string& name_prefix, 127 int idle_seconds_before_exit) 128 : name_prefix_(name_prefix), 129 idle_seconds_before_exit_(idle_seconds_before_exit), 130 pending_tasks_available_cv_(&lock_), 131 num_idle_threads_(0), 132 terminated_(false) {} 133 134 PosixDynamicThreadPool::~PosixDynamicThreadPool() { 135 while (!pending_tasks_.empty()) 136 pending_tasks_.pop(); 137 } 138 139 void PosixDynamicThreadPool::Terminate() { 140 { 141 AutoLock locked(lock_); 142 DCHECK(!terminated_) << "Thread pool is already terminated."; 143 terminated_ = true; 144 } 145 pending_tasks_available_cv_.Broadcast(); 146 } 147 148 void PosixDynamicThreadPool::PostTask( 149 const tracked_objects::Location& from_here, 150 const base::Closure& task) { 151 PendingTask pending_task(from_here, task); 152 AddTask(&pending_task); 153 } 154 155 void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) { 156 AutoLock locked(lock_); 157 DCHECK(!terminated_) << 158 "This thread pool is already terminated. Do not post new tasks."; 159 160 pending_tasks_.push(*pending_task); 161 pending_task->task.Reset(); 162 163 // We have enough worker threads. 164 if (static_cast<size_t>(num_idle_threads_) >= pending_tasks_.size()) { 165 pending_tasks_available_cv_.Signal(); 166 } else { 167 // The new PlatformThread will take ownership of the WorkerThread object, 168 // which will delete itself on exit. 169 WorkerThread* worker = 170 new WorkerThread(name_prefix_, this); 171 PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker); 172 } 173 } 174 175 PendingTask PosixDynamicThreadPool::WaitForTask() { 176 AutoLock locked(lock_); 177 178 if (terminated_) 179 return PendingTask(FROM_HERE, base::Closure()); 180 181 if (pending_tasks_.empty()) { // No work available, wait for work. 182 num_idle_threads_++; 183 if (num_idle_threads_cv_.get()) 184 num_idle_threads_cv_->Signal(); 185 pending_tasks_available_cv_.TimedWait( 186 TimeDelta::FromSeconds(idle_seconds_before_exit_)); 187 num_idle_threads_--; 188 if (num_idle_threads_cv_.get()) 189 num_idle_threads_cv_->Signal(); 190 if (pending_tasks_.empty()) { 191 // We waited for work, but there's still no work. Return NULL to signal 192 // the thread to terminate. 193 return PendingTask(FROM_HERE, base::Closure()); 194 } 195 } 196 197 PendingTask pending_task = pending_tasks_.front(); 198 pending_tasks_.pop(); 199 return pending_task; 200 } 201 202 } // namespace base 203