1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/threading/worker_pool_posix.h" 6 7 #include <stddef.h> 8 9 #include <utility> 10 11 #include "base/bind.h" 12 #include "base/callback.h" 13 #include "base/lazy_instance.h" 14 #include "base/logging.h" 15 #include "base/macros.h" 16 #include "base/memory/ref_counted.h" 17 #include "base/strings/stringprintf.h" 18 #include "base/threading/platform_thread.h" 19 #include "base/threading/thread_local.h" 20 #include "base/threading/worker_pool.h" 21 #include "base/trace_event/trace_event.h" 22 #include "base/tracked_objects.h" 23 24 using tracked_objects::TrackedTime; 25 26 namespace base { 27 28 namespace { 29 30 base::LazyInstance<ThreadLocalBoolean>::Leaky 31 g_worker_pool_running_on_this_thread = LAZY_INSTANCE_INITIALIZER; 32 33 const int kIdleSecondsBeforeExit = 10 * 60; 34 35 #if defined(OS_MACOSX) 36 // On Mac OS X a background thread's default stack size is 512Kb. We need at 37 // least 1MB for compilation tasks in V8, so increase this default. 38 const int kStackSize = 1 * 1024 * 1024; 39 #else 40 const int kStackSize = 0; 41 #endif 42 43 class WorkerPoolImpl { 44 public: 45 WorkerPoolImpl(); 46 47 // WorkerPoolImpl is only instantiated as a leaky LazyInstance, so the 48 // destructor is never called. 49 ~WorkerPoolImpl() = delete; 50 51 void PostTask(const tracked_objects::Location& from_here, 52 base::OnceClosure task, 53 bool task_is_slow); 54 55 private: 56 scoped_refptr<base::PosixDynamicThreadPool> pool_; 57 }; 58 59 WorkerPoolImpl::WorkerPoolImpl() 60 : pool_(new base::PosixDynamicThreadPool("WorkerPool", 61 kIdleSecondsBeforeExit)) {} 62 63 void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, 64 base::OnceClosure task, 65 bool task_is_slow) { 66 pool_->PostTask(from_here, std::move(task)); 67 } 68 69 base::LazyInstance<WorkerPoolImpl>::Leaky g_lazy_worker_pool = 70 LAZY_INSTANCE_INITIALIZER; 71 72 class WorkerThread : public PlatformThread::Delegate { 73 public: 74 WorkerThread(const std::string& name_prefix, 75 base::PosixDynamicThreadPool* pool) 76 : name_prefix_(name_prefix), pool_(pool) {} 77 78 void ThreadMain() override; 79 80 private: 81 const std::string name_prefix_; 82 scoped_refptr<base::PosixDynamicThreadPool> pool_; 83 84 DISALLOW_COPY_AND_ASSIGN(WorkerThread); 85 }; 86 87 void WorkerThread::ThreadMain() { 88 g_worker_pool_running_on_this_thread.Get().Set(true); 89 const std::string name = base::StringPrintf("%s/%d", name_prefix_.c_str(), 90 PlatformThread::CurrentId()); 91 // Note |name.c_str()| must remain valid for for the whole life of the thread. 92 PlatformThread::SetName(name); 93 94 for (;;) { 95 PendingTask pending_task = pool_->WaitForTask(); 96 if (pending_task.task.is_null()) 97 break; 98 TRACE_TASK_EXECUTION("WorkerThread::ThreadMain::Run", pending_task); 99 100 tracked_objects::TaskStopwatch stopwatch; 101 stopwatch.Start(); 102 std::move(pending_task.task).Run(); 103 stopwatch.Stop(); 104 105 tracked_objects::ThreadData::TallyRunOnWorkerThreadIfTracking( 106 pending_task.birth_tally, pending_task.time_posted, stopwatch); 107 } 108 109 // The WorkerThread is non-joinable, so it deletes itself. 110 delete this; 111 } 112 113 } // namespace 114 115 // static 116 bool WorkerPool::PostTask(const tracked_objects::Location& from_here, 117 base::OnceClosure task, 118 bool task_is_slow) { 119 g_lazy_worker_pool.Pointer()->PostTask(from_here, std::move(task), 120 task_is_slow); 121 return true; 122 } 123 124 // static 125 bool WorkerPool::RunsTasksOnCurrentThread() { 126 return g_worker_pool_running_on_this_thread.Get().Get(); 127 } 128 129 PosixDynamicThreadPool::PosixDynamicThreadPool(const std::string& name_prefix, 130 int idle_seconds_before_exit) 131 : name_prefix_(name_prefix), 132 idle_seconds_before_exit_(idle_seconds_before_exit), 133 pending_tasks_available_cv_(&lock_), 134 num_idle_threads_(0) {} 135 136 PosixDynamicThreadPool::~PosixDynamicThreadPool() { 137 while (!pending_tasks_.empty()) 138 pending_tasks_.pop(); 139 } 140 141 void PosixDynamicThreadPool::PostTask( 142 const tracked_objects::Location& from_here, 143 base::OnceClosure task) { 144 PendingTask pending_task(from_here, std::move(task)); 145 AddTask(&pending_task); 146 } 147 148 void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) { 149 DCHECK(pending_task); 150 DCHECK(pending_task->task); 151 AutoLock locked(lock_); 152 153 pending_tasks_.push(std::move(*pending_task)); 154 155 // We have enough worker threads. 156 if (static_cast<size_t>(num_idle_threads_) >= pending_tasks_.size()) { 157 pending_tasks_available_cv_.Signal(); 158 } else { 159 // The new PlatformThread will take ownership of the WorkerThread object, 160 // which will delete itself on exit. 161 WorkerThread* worker = new WorkerThread(name_prefix_, this); 162 PlatformThread::CreateNonJoinable(kStackSize, worker); 163 } 164 } 165 166 PendingTask PosixDynamicThreadPool::WaitForTask() { 167 AutoLock locked(lock_); 168 169 if (pending_tasks_.empty()) { // No work available, wait for work. 170 num_idle_threads_++; 171 if (num_idle_threads_cv_.get()) 172 num_idle_threads_cv_->Signal(); 173 pending_tasks_available_cv_.TimedWait( 174 TimeDelta::FromSeconds(idle_seconds_before_exit_)); 175 num_idle_threads_--; 176 if (num_idle_threads_cv_.get()) 177 num_idle_threads_cv_->Signal(); 178 if (pending_tasks_.empty()) { 179 // We waited for work, but there's still no work. Return NULL to signal 180 // the thread to terminate. 181 return PendingTask(FROM_HERE, base::Closure()); 182 } 183 } 184 185 PendingTask pending_task = std::move(pending_tasks_.front()); 186 pending_tasks_.pop(); 187 return pending_task; 188 } 189 190 } // namespace base 191