1 /* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "thread_pool.h" 18 19 #include <pthread.h> 20 21 #include <sys/time.h> 22 #include <sys/resource.h> 23 24 #include "base/bit_utils.h" 25 #include "base/casts.h" 26 #include "base/logging.h" 27 #include "base/stl_util.h" 28 #include "base/time_utils.h" 29 #include "runtime.h" 30 #include "thread-inl.h" 31 32 namespace art { 33 34 static constexpr bool kMeasureWaitTime = false; 35 36 ThreadPoolWorker::ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name, 37 size_t stack_size) 38 : thread_pool_(thread_pool), 39 name_(name) { 40 // Add an inaccessible page to catch stack overflow. 41 stack_size += kPageSize; 42 std::string error_msg; 43 stack_.reset(MemMap::MapAnonymous(name.c_str(), nullptr, stack_size, PROT_READ | PROT_WRITE, 44 false, false, &error_msg)); 45 CHECK(stack_.get() != nullptr) << error_msg; 46 CHECK_ALIGNED(stack_->Begin(), kPageSize); 47 int mprotect_result = mprotect(stack_->Begin(), kPageSize, PROT_NONE); 48 CHECK_EQ(mprotect_result, 0) << "Failed to mprotect() bottom page of thread pool worker stack."; 49 const char* reason = "new thread pool worker thread"; 50 pthread_attr_t attr; 51 CHECK_PTHREAD_CALL(pthread_attr_init, (&attr), reason); 52 CHECK_PTHREAD_CALL(pthread_attr_setstack, (&attr, stack_->Begin(), stack_->Size()), reason); 53 CHECK_PTHREAD_CALL(pthread_create, (&pthread_, &attr, &Callback, this), reason); 54 CHECK_PTHREAD_CALL(pthread_attr_destroy, (&attr), reason); 55 } 56 57 ThreadPoolWorker::~ThreadPoolWorker() { 58 CHECK_PTHREAD_CALL(pthread_join, (pthread_, nullptr), "thread pool worker shutdown"); 59 } 60 61 void ThreadPoolWorker::SetPthreadPriority(int priority) { 62 CHECK_GE(priority, PRIO_MIN); 63 CHECK_LE(priority, PRIO_MAX); 64 #if defined(__ANDROID__) 65 int result = setpriority(PRIO_PROCESS, pthread_gettid_np(pthread_), priority); 66 if (result != 0) { 67 PLOG(ERROR) << "Failed to setpriority to :" << priority; 68 } 69 #else 70 UNUSED(priority); 71 #endif 72 } 73 74 void ThreadPoolWorker::Run() { 75 Thread* self = Thread::Current(); 76 Task* task = nullptr; 77 thread_pool_->creation_barier_.Wait(self); 78 while ((task = thread_pool_->GetTask(self)) != nullptr) { 79 task->Run(self); 80 task->Finalize(); 81 } 82 } 83 84 void* ThreadPoolWorker::Callback(void* arg) { 85 ThreadPoolWorker* worker = reinterpret_cast<ThreadPoolWorker*>(arg); 86 Runtime* runtime = Runtime::Current(); 87 CHECK(runtime->AttachCurrentThread(worker->name_.c_str(), true, nullptr, false)); 88 // Do work until its time to shut down. 89 worker->Run(); 90 runtime->DetachCurrentThread(); 91 return nullptr; 92 } 93 94 void ThreadPool::AddTask(Thread* self, Task* task) { 95 MutexLock mu(self, task_queue_lock_); 96 tasks_.push_back(task); 97 // If we have any waiters, signal one. 98 if (started_ && waiting_count_ != 0) { 99 task_queue_condition_.Signal(self); 100 } 101 } 102 103 void ThreadPool::RemoveAllTasks(Thread* self) { 104 MutexLock mu(self, task_queue_lock_); 105 tasks_.clear(); 106 } 107 108 ThreadPool::ThreadPool(const char* name, size_t num_threads) 109 : name_(name), 110 task_queue_lock_("task queue lock"), 111 task_queue_condition_("task queue condition", task_queue_lock_), 112 completion_condition_("task completion condition", task_queue_lock_), 113 started_(false), 114 shutting_down_(false), 115 waiting_count_(0), 116 start_time_(0), 117 total_wait_time_(0), 118 // Add one since the caller of constructor waits on the barrier too. 119 creation_barier_(num_threads + 1), 120 max_active_workers_(num_threads) { 121 Thread* self = Thread::Current(); 122 while (GetThreadCount() < num_threads) { 123 const std::string worker_name = StringPrintf("%s worker thread %zu", name_.c_str(), 124 GetThreadCount()); 125 threads_.push_back( 126 new ThreadPoolWorker(this, worker_name, ThreadPoolWorker::kDefaultStackSize)); 127 } 128 // Wait for all of the threads to attach. 129 creation_barier_.Wait(self); 130 } 131 132 void ThreadPool::SetMaxActiveWorkers(size_t threads) { 133 MutexLock mu(Thread::Current(), task_queue_lock_); 134 CHECK_LE(threads, GetThreadCount()); 135 max_active_workers_ = threads; 136 } 137 138 ThreadPool::~ThreadPool() { 139 { 140 Thread* self = Thread::Current(); 141 MutexLock mu(self, task_queue_lock_); 142 // Tell any remaining workers to shut down. 143 shutting_down_ = true; 144 // Broadcast to everyone waiting. 145 task_queue_condition_.Broadcast(self); 146 completion_condition_.Broadcast(self); 147 } 148 // Wait for the threads to finish. 149 STLDeleteElements(&threads_); 150 } 151 152 void ThreadPool::StartWorkers(Thread* self) { 153 MutexLock mu(self, task_queue_lock_); 154 started_ = true; 155 task_queue_condition_.Broadcast(self); 156 start_time_ = NanoTime(); 157 total_wait_time_ = 0; 158 } 159 160 void ThreadPool::StopWorkers(Thread* self) { 161 MutexLock mu(self, task_queue_lock_); 162 started_ = false; 163 } 164 165 Task* ThreadPool::GetTask(Thread* self) { 166 MutexLock mu(self, task_queue_lock_); 167 while (!IsShuttingDown()) { 168 const size_t thread_count = GetThreadCount(); 169 // Ensure that we don't use more threads than the maximum active workers. 170 const size_t active_threads = thread_count - waiting_count_; 171 // <= since self is considered an active worker. 172 if (active_threads <= max_active_workers_) { 173 Task* task = TryGetTaskLocked(); 174 if (task != nullptr) { 175 return task; 176 } 177 } 178 179 ++waiting_count_; 180 if (waiting_count_ == GetThreadCount() && tasks_.empty()) { 181 // We may be done, lets broadcast to the completion condition. 182 completion_condition_.Broadcast(self); 183 } 184 const uint64_t wait_start = kMeasureWaitTime ? NanoTime() : 0; 185 task_queue_condition_.Wait(self); 186 if (kMeasureWaitTime) { 187 const uint64_t wait_end = NanoTime(); 188 total_wait_time_ += wait_end - std::max(wait_start, start_time_); 189 } 190 --waiting_count_; 191 } 192 193 // We are shutting down, return null to tell the worker thread to stop looping. 194 return nullptr; 195 } 196 197 Task* ThreadPool::TryGetTask(Thread* self) { 198 MutexLock mu(self, task_queue_lock_); 199 return TryGetTaskLocked(); 200 } 201 202 Task* ThreadPool::TryGetTaskLocked() { 203 if (started_ && !tasks_.empty()) { 204 Task* task = tasks_.front(); 205 tasks_.pop_front(); 206 return task; 207 } 208 return nullptr; 209 } 210 211 void ThreadPool::Wait(Thread* self, bool do_work, bool may_hold_locks) { 212 if (do_work) { 213 Task* task = nullptr; 214 while ((task = TryGetTask(self)) != nullptr) { 215 task->Run(self); 216 task->Finalize(); 217 } 218 } 219 // Wait until each thread is waiting and the task list is empty. 220 MutexLock mu(self, task_queue_lock_); 221 while (!shutting_down_ && (waiting_count_ != GetThreadCount() || !tasks_.empty())) { 222 if (!may_hold_locks) { 223 completion_condition_.Wait(self); 224 } else { 225 completion_condition_.WaitHoldingLocks(self); 226 } 227 } 228 } 229 230 size_t ThreadPool::GetTaskCount(Thread* self) { 231 MutexLock mu(self, task_queue_lock_); 232 return tasks_.size(); 233 } 234 235 void ThreadPool::SetPthreadPriority(int priority) { 236 for (ThreadPoolWorker* worker : threads_) { 237 worker->SetPthreadPriority(priority); 238 } 239 } 240 241 } // namespace art 242