1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/worker_pool.h" 6 #include "base/worker_pool_linux.h" 7 8 #include "base/lazy_instance.h" 9 #include "base/logging.h" 10 #include "base/platform_thread.h" 11 #include "base/ref_counted.h" 12 #include "base/string_util.h" 13 #include "base/task.h" 14 15 namespace { 16 17 const int kIdleSecondsBeforeExit = 10 * 60; 18 // A stack size of 64 KB is too small for the CERT_PKIXVerifyCert 19 // function of NSS because of NSS bug 439169. 20 const int kWorkerThreadStackSize = 128 * 1024; 21 22 class WorkerPoolImpl { 23 public: 24 WorkerPoolImpl(); 25 ~WorkerPoolImpl(); 26 27 void PostTask(const tracked_objects::Location& from_here, Task* task, 28 bool task_is_slow); 29 30 private: 31 scoped_refptr<base::LinuxDynamicThreadPool> pool_; 32 }; 33 34 WorkerPoolImpl::WorkerPoolImpl() 35 : pool_(new base::LinuxDynamicThreadPool( 36 "WorkerPool", kIdleSecondsBeforeExit)) {} 37 38 WorkerPoolImpl::~WorkerPoolImpl() { 39 pool_->Terminate(); 40 } 41 42 void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, 43 Task* task, bool task_is_slow) { 44 task->SetBirthPlace(from_here); 45 pool_->PostTask(task); 46 } 47 48 base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool(base::LINKER_INITIALIZED); 49 50 class WorkerThread : public PlatformThread::Delegate { 51 public: 52 WorkerThread(const std::string& name_prefix, int idle_seconds_before_exit, 53 base::LinuxDynamicThreadPool* pool) 54 : name_prefix_(name_prefix), 55 idle_seconds_before_exit_(idle_seconds_before_exit), 56 pool_(pool) {} 57 58 virtual void ThreadMain(); 59 60 private: 61 const std::string name_prefix_; 62 const int idle_seconds_before_exit_; 63 scoped_refptr<base::LinuxDynamicThreadPool> pool_; 64 65 DISALLOW_COPY_AND_ASSIGN(WorkerThread); 66 }; 67 68 void WorkerThread::ThreadMain() { 69 const std::string name = 70 StringPrintf("%s/%d", name_prefix_.c_str(), PlatformThread::CurrentId()); 71 PlatformThread::SetName(name.c_str()); 72 73 for (;;) { 74 Task* task = pool_->WaitForTask(); 75 if (!task) 76 break; 77 task->Run(); 78 delete task; 79 } 80 81 // The WorkerThread is non-joinable, so it deletes itself. 82 delete this; 83 } 84 85 } // namespace 86 87 bool WorkerPool::PostTask(const tracked_objects::Location& from_here, 88 Task* task, bool task_is_slow) { 89 g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); 90 return true; 91 } 92 93 namespace base { 94 95 LinuxDynamicThreadPool::LinuxDynamicThreadPool( 96 const std::string& name_prefix, 97 int idle_seconds_before_exit) 98 : name_prefix_(name_prefix), 99 idle_seconds_before_exit_(idle_seconds_before_exit), 100 tasks_available_cv_(&lock_), 101 num_idle_threads_(0), 102 terminated_(false), 103 num_idle_threads_cv_(NULL) {} 104 105 LinuxDynamicThreadPool::~LinuxDynamicThreadPool() { 106 while (!tasks_.empty()) { 107 Task* task = tasks_.front(); 108 tasks_.pop(); 109 delete task; 110 } 111 } 112 113 void LinuxDynamicThreadPool::Terminate() { 114 { 115 AutoLock locked(lock_); 116 DCHECK(!terminated_) << "Thread pool is already terminated."; 117 terminated_ = true; 118 } 119 tasks_available_cv_.Broadcast(); 120 } 121 122 void LinuxDynamicThreadPool::PostTask(Task* task) { 123 AutoLock locked(lock_); 124 DCHECK(!terminated_) << 125 "This thread pool is already terminated. Do not post new tasks."; 126 127 tasks_.push(task); 128 129 // We have enough worker threads. 130 if (static_cast<size_t>(num_idle_threads_) >= tasks_.size()) { 131 tasks_available_cv_.Signal(); 132 } else { 133 // The new PlatformThread will take ownership of the WorkerThread object, 134 // which will delete itself on exit. 135 WorkerThread* worker = 136 new WorkerThread(name_prefix_, idle_seconds_before_exit_, this); 137 PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker); 138 } 139 } 140 141 Task* LinuxDynamicThreadPool::WaitForTask() { 142 AutoLock locked(lock_); 143 144 if (terminated_) 145 return NULL; 146 147 if (tasks_.empty()) { // No work available, wait for work. 148 num_idle_threads_++; 149 if (num_idle_threads_cv_.get()) 150 num_idle_threads_cv_->Signal(); 151 tasks_available_cv_.TimedWait( 152 TimeDelta::FromSeconds(kIdleSecondsBeforeExit)); 153 num_idle_threads_--; 154 if (num_idle_threads_cv_.get()) 155 num_idle_threads_cv_->Signal(); 156 if (tasks_.empty()) { 157 // We waited for work, but there's still no work. Return NULL to signal 158 // the thread to terminate. 159 return NULL; 160 } 161 } 162 163 Task* task = tasks_.front(); 164 tasks_.pop(); 165 return task; 166 } 167 168 } // namespace base 169