1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/threading/worker_pool_posix.h" 6 7 #include "base/lazy_instance.h" 8 #include "base/logging.h" 9 #include "base/memory/ref_counted.h" 10 #include "base/stringprintf.h" 11 #include "base/task.h" 12 #include "base/threading/platform_thread.h" 13 #include "base/threading/worker_pool.h" 14 15 namespace base { 16 17 namespace { 18 19 const int kIdleSecondsBeforeExit = 10 * 60; 20 // A stack size of 64 KB is too small for the CERT_PKIXVerifyCert 21 // function of NSS because of NSS bug 439169. 22 const int kWorkerThreadStackSize = 128 * 1024; 23 24 class WorkerPoolImpl { 25 public: 26 WorkerPoolImpl(); 27 ~WorkerPoolImpl(); 28 29 void PostTask(const tracked_objects::Location& from_here, Task* task, 30 bool task_is_slow); 31 32 private: 33 scoped_refptr<base::PosixDynamicThreadPool> pool_; 34 }; 35 36 WorkerPoolImpl::WorkerPoolImpl() 37 : pool_(new base::PosixDynamicThreadPool("WorkerPool", 38 kIdleSecondsBeforeExit)) { 39 } 40 41 WorkerPoolImpl::~WorkerPoolImpl() { 42 pool_->Terminate(); 43 } 44 45 void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, 46 Task* task, bool task_is_slow) { 47 task->SetBirthPlace(from_here); 48 pool_->PostTask(task); 49 } 50 51 base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool(base::LINKER_INITIALIZED); 52 53 class WorkerThread : public PlatformThread::Delegate { 54 public: 55 WorkerThread(const std::string& name_prefix, int idle_seconds_before_exit, 56 base::PosixDynamicThreadPool* pool) 57 : name_prefix_(name_prefix), 58 idle_seconds_before_exit_(idle_seconds_before_exit), 59 pool_(pool) {} 60 61 virtual void ThreadMain(); 62 63 private: 64 const std::string name_prefix_; 65 const int idle_seconds_before_exit_; 66 scoped_refptr<base::PosixDynamicThreadPool> pool_; 67 68 DISALLOW_COPY_AND_ASSIGN(WorkerThread); 69 }; 70 71 void WorkerThread::ThreadMain() { 72 const std::string name = base::StringPrintf( 73 "%s/%d", name_prefix_.c_str(), PlatformThread::CurrentId()); 74 PlatformThread::SetName(name.c_str()); 75 76 for (;;) { 77 Task* task = pool_->WaitForTask(); 78 if (!task) 79 break; 80 task->Run(); 81 delete task; 82 } 83 84 // The WorkerThread is non-joinable, so it deletes itself. 85 delete this; 86 } 87 88 } // namespace 89 90 bool WorkerPool::PostTask(const tracked_objects::Location& from_here, 91 Task* task, bool task_is_slow) { 92 g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); 93 return true; 94 } 95 96 PosixDynamicThreadPool::PosixDynamicThreadPool( 97 const std::string& name_prefix, 98 int idle_seconds_before_exit) 99 : name_prefix_(name_prefix), 100 idle_seconds_before_exit_(idle_seconds_before_exit), 101 tasks_available_cv_(&lock_), 102 num_idle_threads_(0), 103 terminated_(false), 104 num_idle_threads_cv_(NULL) {} 105 106 PosixDynamicThreadPool::~PosixDynamicThreadPool() { 107 while (!tasks_.empty()) { 108 Task* task = tasks_.front(); 109 tasks_.pop(); 110 delete task; 111 } 112 } 113 114 void PosixDynamicThreadPool::Terminate() { 115 { 116 AutoLock locked(lock_); 117 DCHECK(!terminated_) << "Thread pool is already terminated."; 118 terminated_ = true; 119 } 120 tasks_available_cv_.Broadcast(); 121 } 122 123 void PosixDynamicThreadPool::PostTask(Task* task) { 124 AutoLock locked(lock_); 125 DCHECK(!terminated_) << 126 "This thread pool is already terminated. Do not post new tasks."; 127 128 tasks_.push(task); 129 130 // We have enough worker threads. 131 if (static_cast<size_t>(num_idle_threads_) >= tasks_.size()) { 132 tasks_available_cv_.Signal(); 133 } else { 134 // The new PlatformThread will take ownership of the WorkerThread object, 135 // which will delete itself on exit. 136 WorkerThread* worker = 137 new WorkerThread(name_prefix_, idle_seconds_before_exit_, this); 138 PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker); 139 } 140 } 141 142 Task* PosixDynamicThreadPool::WaitForTask() { 143 AutoLock locked(lock_); 144 145 if (terminated_) 146 return NULL; 147 148 if (tasks_.empty()) { // No work available, wait for work. 149 num_idle_threads_++; 150 if (num_idle_threads_cv_.get()) 151 num_idle_threads_cv_->Signal(); 152 tasks_available_cv_.TimedWait( 153 TimeDelta::FromSeconds(kIdleSecondsBeforeExit)); 154 num_idle_threads_--; 155 if (num_idle_threads_cv_.get()) 156 num_idle_threads_cv_->Signal(); 157 if (tasks_.empty()) { 158 // We waited for work, but there's still no work. Return NULL to signal 159 // the thread to terminate. 160 return NULL; 161 } 162 } 163 164 Task* task = tasks_.front(); 165 tasks_.pop(); 166 return task; 167 } 168 169 } // namespace base 170