1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/threading/worker_pool_posix.h" 6 7 #include <stddef.h> 8 9 #include "base/bind.h" 10 #include "base/callback.h" 11 #include "base/lazy_instance.h" 12 #include "base/logging.h" 13 #include "base/macros.h" 14 #include "base/memory/ref_counted.h" 15 #include "base/strings/stringprintf.h" 16 #include "base/threading/platform_thread.h" 17 #include "base/threading/thread_local.h" 18 #include "base/threading/worker_pool.h" 19 #include "base/trace_event/trace_event.h" 20 #include "base/tracked_objects.h" 21 22 using tracked_objects::TrackedTime; 23 24 namespace base { 25 26 namespace { 27 28 base::LazyInstance<ThreadLocalBoolean>::Leaky 29 g_worker_pool_running_on_this_thread = LAZY_INSTANCE_INITIALIZER; 30 31 const int kIdleSecondsBeforeExit = 10 * 60; 32 33 class WorkerPoolImpl { 34 public: 35 WorkerPoolImpl(); 36 ~WorkerPoolImpl(); 37 38 void PostTask(const tracked_objects::Location& from_here, 39 const base::Closure& task, 40 bool task_is_slow); 41 42 private: 43 scoped_refptr<base::PosixDynamicThreadPool> pool_; 44 }; 45 46 WorkerPoolImpl::WorkerPoolImpl() 47 : pool_(new base::PosixDynamicThreadPool("WorkerPool", 48 kIdleSecondsBeforeExit)) {} 49 50 WorkerPoolImpl::~WorkerPoolImpl() { 51 pool_->Terminate(); 52 } 53 54 void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, 55 const base::Closure& task, 56 bool /*task_is_slow*/) { 57 pool_->PostTask(from_here, task); 58 } 59 60 base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool = 61 LAZY_INSTANCE_INITIALIZER; 62 63 class WorkerThread : public PlatformThread::Delegate { 64 public: 65 WorkerThread(const std::string& name_prefix, 66 base::PosixDynamicThreadPool* pool) 67 : name_prefix_(name_prefix), pool_(pool) {} 68 69 void ThreadMain() override; 70 71 private: 72 const std::string name_prefix_; 73 scoped_refptr<base::PosixDynamicThreadPool> pool_; 74 75 DISALLOW_COPY_AND_ASSIGN(WorkerThread); 76 }; 77 78 void WorkerThread::ThreadMain() { 79 g_worker_pool_running_on_this_thread.Get().Set(true); 80 const std::string name = base::StringPrintf("%s/%d", name_prefix_.c_str(), 81 PlatformThread::CurrentId()); 82 // Note |name.c_str()| must remain valid for for the whole life of the thread. 83 PlatformThread::SetName(name); 84 85 for (;;) { 86 PendingTask pending_task = pool_->WaitForTask(); 87 if (pending_task.task.is_null()) 88 break; 89 TRACE_TASK_EXECUTION("WorkerThread::ThreadMain::Run", pending_task); 90 91 tracked_objects::TaskStopwatch stopwatch; 92 stopwatch.Start(); 93 pending_task.task.Run(); 94 stopwatch.Stop(); 95 96 tracked_objects::ThreadData::TallyRunOnWorkerThreadIfTracking( 97 pending_task.birth_tally, pending_task.time_posted, stopwatch); 98 } 99 100 // The WorkerThread is non-joinable, so it deletes itself. 101 delete this; 102 } 103 104 } // namespace 105 106 // static 107 bool WorkerPool::PostTask(const tracked_objects::Location& from_here, 108 const base::Closure& task, 109 bool task_is_slow) { 110 g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); 111 return true; 112 } 113 114 // static 115 bool WorkerPool::RunsTasksOnCurrentThread() { 116 return g_worker_pool_running_on_this_thread.Get().Get(); 117 } 118 119 PosixDynamicThreadPool::PosixDynamicThreadPool(const std::string& name_prefix, 120 int idle_seconds_before_exit) 121 : name_prefix_(name_prefix), 122 idle_seconds_before_exit_(idle_seconds_before_exit), 123 pending_tasks_available_cv_(&lock_), 124 num_idle_threads_(0), 125 terminated_(false) {} 126 127 PosixDynamicThreadPool::~PosixDynamicThreadPool() { 128 while (!pending_tasks_.empty()) 129 pending_tasks_.pop(); 130 } 131 132 void PosixDynamicThreadPool::Terminate() { 133 { 134 AutoLock locked(lock_); 135 DCHECK(!terminated_) << "Thread pool is already terminated."; 136 terminated_ = true; 137 } 138 pending_tasks_available_cv_.Broadcast(); 139 } 140 141 void PosixDynamicThreadPool::PostTask( 142 const tracked_objects::Location& from_here, 143 const base::Closure& task) { 144 PendingTask pending_task(from_here, task); 145 AddTask(&pending_task); 146 } 147 148 void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) { 149 AutoLock locked(lock_); 150 DCHECK(!terminated_) 151 << "This thread pool is already terminated. Do not post new tasks."; 152 153 pending_tasks_.push(std::move(*pending_task)); 154 155 // We have enough worker threads. 156 if (static_cast<size_t>(num_idle_threads_) >= pending_tasks_.size()) { 157 pending_tasks_available_cv_.Signal(); 158 } else { 159 // The new PlatformThread will take ownership of the WorkerThread object, 160 // which will delete itself on exit. 161 WorkerThread* worker = new WorkerThread(name_prefix_, this); 162 PlatformThread::CreateNonJoinable(0, worker); 163 } 164 } 165 166 PendingTask PosixDynamicThreadPool::WaitForTask() { 167 AutoLock locked(lock_); 168 169 if (terminated_) 170 return PendingTask(FROM_HERE, base::Closure()); 171 172 if (pending_tasks_.empty()) { // No work available, wait for work. 173 num_idle_threads_++; 174 if (num_idle_threads_cv_.get()) 175 num_idle_threads_cv_->Signal(); 176 pending_tasks_available_cv_.TimedWait( 177 TimeDelta::FromSeconds(idle_seconds_before_exit_)); 178 num_idle_threads_--; 179 if (num_idle_threads_cv_.get()) 180 num_idle_threads_cv_->Signal(); 181 if (pending_tasks_.empty()) { 182 // We waited for work, but there's still no work. Return NULL to signal 183 // the thread to terminate. 184 return PendingTask(FROM_HERE, base::Closure()); 185 } 186 } 187 188 PendingTask pending_task = std::move(pending_tasks_.front()); 189 pending_tasks_.pop(); 190 return pending_task; 191 } 192 193 } // namespace base 194