1 /* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "thread_pool.h" 18 19 #include "base/casts.h" 20 #include "base/stl_util.h" 21 #include "runtime.h" 22 #include "thread.h" 23 24 namespace art { 25 26 static constexpr bool kMeasureWaitTime = false; 27 28 ThreadPoolWorker::ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name, 29 size_t stack_size) 30 : thread_pool_(thread_pool), 31 name_(name), 32 stack_size_(stack_size) { 33 const char* reason = "new thread pool worker thread"; 34 pthread_attr_t attr; 35 CHECK_PTHREAD_CALL(pthread_attr_init, (&attr), reason); 36 CHECK_PTHREAD_CALL(pthread_attr_setstacksize, (&attr, stack_size), reason); 37 CHECK_PTHREAD_CALL(pthread_create, (&pthread_, &attr, &Callback, this), reason); 38 CHECK_PTHREAD_CALL(pthread_attr_destroy, (&attr), reason); 39 } 40 41 ThreadPoolWorker::~ThreadPoolWorker() { 42 CHECK_PTHREAD_CALL(pthread_join, (pthread_, NULL), "thread pool worker shutdown"); 43 } 44 45 void ThreadPoolWorker::Run() { 46 Thread* self = Thread::Current(); 47 Task* task = NULL; 48 thread_pool_->creation_barier_.Wait(self); 49 while ((task = thread_pool_->GetTask(self)) != NULL) { 50 task->Run(self); 51 task->Finalize(); 52 } 53 } 54 55 void* ThreadPoolWorker::Callback(void* arg) { 56 ThreadPoolWorker* worker = reinterpret_cast<ThreadPoolWorker*>(arg); 57 Runtime* runtime = Runtime::Current(); 58 CHECK(runtime->AttachCurrentThread(worker->name_.c_str(), true, NULL, false)); 59 // Do work until its time to shut down. 60 worker->Run(); 61 runtime->DetachCurrentThread(); 62 return NULL; 63 } 64 65 void ThreadPool::AddTask(Thread* self, Task* task) { 66 MutexLock mu(self, task_queue_lock_); 67 tasks_.push_back(task); 68 // If we have any waiters, signal one. 69 if (started_ && waiting_count_ != 0) { 70 task_queue_condition_.Signal(self); 71 } 72 } 73 74 ThreadPool::ThreadPool(size_t num_threads) 75 : task_queue_lock_("task queue lock"), 76 task_queue_condition_("task queue condition", task_queue_lock_), 77 completion_condition_("task completion condition", task_queue_lock_), 78 started_(false), 79 shutting_down_(false), 80 waiting_count_(0), 81 start_time_(0), 82 total_wait_time_(0), 83 // Add one since the caller of constructor waits on the barrier too. 84 creation_barier_(num_threads + 1), 85 max_active_workers_(num_threads) { 86 Thread* self = Thread::Current(); 87 while (GetThreadCount() < num_threads) { 88 const std::string name = StringPrintf("Thread pool worker %zu", GetThreadCount()); 89 threads_.push_back(new ThreadPoolWorker(this, name, ThreadPoolWorker::kDefaultStackSize)); 90 } 91 // Wait for all of the threads to attach. 92 creation_barier_.Wait(self); 93 } 94 95 void ThreadPool::SetMaxActiveWorkers(size_t threads) { 96 MutexLock mu(Thread::Current(), task_queue_lock_); 97 CHECK_LE(threads, GetThreadCount()); 98 max_active_workers_ = threads; 99 } 100 101 ThreadPool::~ThreadPool() { 102 { 103 Thread* self = Thread::Current(); 104 MutexLock mu(self, task_queue_lock_); 105 // Tell any remaining workers to shut down. 106 shutting_down_ = true; 107 // Broadcast to everyone waiting. 108 task_queue_condition_.Broadcast(self); 109 completion_condition_.Broadcast(self); 110 } 111 // Wait for the threads to finish. 112 STLDeleteElements(&threads_); 113 } 114 115 void ThreadPool::StartWorkers(Thread* self) { 116 MutexLock mu(self, task_queue_lock_); 117 started_ = true; 118 task_queue_condition_.Broadcast(self); 119 start_time_ = NanoTime(); 120 total_wait_time_ = 0; 121 } 122 123 void ThreadPool::StopWorkers(Thread* self) { 124 MutexLock mu(self, task_queue_lock_); 125 started_ = false; 126 } 127 128 Task* ThreadPool::GetTask(Thread* self) { 129 MutexLock mu(self, task_queue_lock_); 130 while (!IsShuttingDown()) { 131 const size_t thread_count = GetThreadCount(); 132 // Ensure that we don't use more threads than the maximum active workers. 133 const size_t active_threads = thread_count - waiting_count_; 134 // <= since self is considered an active worker. 135 if (active_threads <= max_active_workers_) { 136 Task* task = TryGetTaskLocked(self); 137 if (task != NULL) { 138 return task; 139 } 140 } 141 142 ++waiting_count_; 143 if (waiting_count_ == GetThreadCount() && tasks_.empty()) { 144 // We may be done, lets broadcast to the completion condition. 145 completion_condition_.Broadcast(self); 146 } 147 const uint64_t wait_start = kMeasureWaitTime ? NanoTime() : 0; 148 task_queue_condition_.Wait(self); 149 if (kMeasureWaitTime) { 150 const uint64_t wait_end = NanoTime(); 151 total_wait_time_ += wait_end - std::max(wait_start, start_time_); 152 } 153 --waiting_count_; 154 } 155 156 // We are shutting down, return NULL to tell the worker thread to stop looping. 157 return NULL; 158 } 159 160 Task* ThreadPool::TryGetTask(Thread* self) { 161 MutexLock mu(self, task_queue_lock_); 162 return TryGetTaskLocked(self); 163 } 164 165 Task* ThreadPool::TryGetTaskLocked(Thread* self) { 166 if (started_ && !tasks_.empty()) { 167 Task* task = tasks_.front(); 168 tasks_.pop_front(); 169 return task; 170 } 171 return NULL; 172 } 173 174 void ThreadPool::Wait(Thread* self, bool do_work, bool may_hold_locks) { 175 if (do_work) { 176 Task* task = NULL; 177 while ((task = TryGetTask(self)) != NULL) { 178 task->Run(self); 179 task->Finalize(); 180 } 181 } 182 // Wait until each thread is waiting and the task list is empty. 183 MutexLock mu(self, task_queue_lock_); 184 while (!shutting_down_ && (waiting_count_ != GetThreadCount() || !tasks_.empty())) { 185 if (!may_hold_locks) { 186 completion_condition_.Wait(self); 187 } else { 188 completion_condition_.WaitHoldingLocks(self); 189 } 190 } 191 } 192 193 size_t ThreadPool::GetTaskCount(Thread* self) { 194 MutexLock mu(self, task_queue_lock_); 195 return tasks_.size(); 196 } 197 198 WorkStealingWorker::WorkStealingWorker(ThreadPool* thread_pool, const std::string& name, 199 size_t stack_size) 200 : ThreadPoolWorker(thread_pool, name, stack_size), task_(NULL) {} 201 202 void WorkStealingWorker::Run() { 203 Thread* self = Thread::Current(); 204 Task* task = NULL; 205 WorkStealingThreadPool* thread_pool = down_cast<WorkStealingThreadPool*>(thread_pool_); 206 while ((task = thread_pool_->GetTask(self)) != NULL) { 207 WorkStealingTask* stealing_task = down_cast<WorkStealingTask*>(task); 208 209 { 210 CHECK(task_ == NULL); 211 MutexLock mu(self, thread_pool->work_steal_lock_); 212 // Register that we are running the task 213 ++stealing_task->ref_count_; 214 task_ = stealing_task; 215 } 216 stealing_task->Run(self); 217 // Mark ourselves as not running a task so that nobody tries to steal from us. 218 // There is a race condition that someone starts stealing from us at this point. This is okay 219 // due to the reference counting. 220 task_ = NULL; 221 222 bool finalize; 223 224 // Steal work from tasks until there is none left to steal. Note: There is a race, but 225 // all that happens when the race occurs is that we steal some work instead of processing a 226 // task from the queue. 227 while (thread_pool->GetTaskCount(self) == 0) { 228 WorkStealingTask* steal_from_task = NULL; 229 230 { 231 MutexLock mu(self, thread_pool->work_steal_lock_); 232 // Try finding a task to steal from. 233 steal_from_task = thread_pool->FindTaskToStealFrom(self); 234 if (steal_from_task != NULL) { 235 CHECK_NE(stealing_task, steal_from_task) 236 << "Attempting to steal from completed self task"; 237 steal_from_task->ref_count_++; 238 } else { 239 break; 240 } 241 } 242 243 if (steal_from_task != NULL) { 244 // Task which completed earlier is going to steal some work. 245 stealing_task->StealFrom(self, steal_from_task); 246 247 { 248 // We are done stealing from the task, lets decrement its reference count. 249 MutexLock mu(self, thread_pool->work_steal_lock_); 250 finalize = !--steal_from_task->ref_count_; 251 } 252 253 if (finalize) { 254 steal_from_task->Finalize(); 255 } 256 } 257 } 258 259 { 260 MutexLock mu(self, thread_pool->work_steal_lock_); 261 // If nobody is still referencing task_ we can finalize it. 262 finalize = !--stealing_task->ref_count_; 263 } 264 265 if (finalize) { 266 stealing_task->Finalize(); 267 } 268 } 269 } 270 271 WorkStealingWorker::~WorkStealingWorker() {} 272 273 WorkStealingThreadPool::WorkStealingThreadPool(size_t num_threads) 274 : ThreadPool(0), 275 work_steal_lock_("work stealing lock"), 276 steal_index_(0) { 277 while (GetThreadCount() < num_threads) { 278 const std::string name = StringPrintf("Work stealing worker %zu", GetThreadCount()); 279 threads_.push_back(new WorkStealingWorker(this, name, ThreadPoolWorker::kDefaultStackSize)); 280 } 281 } 282 283 WorkStealingTask* WorkStealingThreadPool::FindTaskToStealFrom(Thread* self) { 284 const size_t thread_count = GetThreadCount(); 285 for (size_t i = 0; i < thread_count; ++i) { 286 // TODO: Use CAS instead of lock. 287 ++steal_index_; 288 if (steal_index_ >= thread_count) { 289 steal_index_-= thread_count; 290 } 291 292 WorkStealingWorker* worker = down_cast<WorkStealingWorker*>(threads_[steal_index_]); 293 WorkStealingTask* task = worker->task_; 294 if (task) { 295 // Not null, we can probably steal from this worker. 296 return task; 297 } 298 } 299 // Couldn't find something to steal. 300 return NULL; 301 } 302 303 WorkStealingThreadPool::~WorkStealingThreadPool() {} 304 305 } // namespace art 306