1 2 /* 3 * Copyright (C) 2012 The Android Open Source Project 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 #include "thread_pool.h" 19 20 #include <sys/mman.h> 21 #include <sys/resource.h> 22 #include <sys/time.h> 23 24 #include <pthread.h> 25 26 #include <android-base/logging.h> 27 #include <android-base/stringprintf.h> 28 29 #include "base/bit_utils.h" 30 #include "base/casts.h" 31 #include "base/stl_util.h" 32 #include "base/time_utils.h" 33 #include "base/utils.h" 34 #include "runtime.h" 35 #include "thread-current-inl.h" 36 37 namespace art { 38 39 using android::base::StringPrintf; 40 41 static constexpr bool kMeasureWaitTime = false; 42 43 ThreadPoolWorker::ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name, 44 size_t stack_size) 45 : thread_pool_(thread_pool), 46 name_(name) { 47 // Add an inaccessible page to catch stack overflow. 48 stack_size += kPageSize; 49 std::string error_msg; 50 stack_ = MemMap::MapAnonymous(name.c_str(), 51 stack_size, 52 PROT_READ | PROT_WRITE, 53 /*low_4gb=*/ false, 54 &error_msg); 55 CHECK(stack_.IsValid()) << error_msg; 56 CHECK_ALIGNED(stack_.Begin(), kPageSize); 57 CheckedCall(mprotect, 58 "mprotect bottom page of thread pool worker stack", 59 stack_.Begin(), 60 kPageSize, 61 PROT_NONE); 62 const char* reason = "new thread pool worker thread"; 63 pthread_attr_t attr; 64 CHECK_PTHREAD_CALL(pthread_attr_init, (&attr), reason); 65 CHECK_PTHREAD_CALL(pthread_attr_setstack, (&attr, stack_.Begin(), stack_.Size()), reason); 66 CHECK_PTHREAD_CALL(pthread_create, (&pthread_, &attr, &Callback, this), reason); 67 CHECK_PTHREAD_CALL(pthread_attr_destroy, (&attr), reason); 68 } 69 70 ThreadPoolWorker::~ThreadPoolWorker() { 71 CHECK_PTHREAD_CALL(pthread_join, (pthread_, nullptr), "thread pool worker shutdown"); 72 } 73 74 void ThreadPoolWorker::SetPthreadPriority(int priority) { 75 CHECK_GE(priority, PRIO_MIN); 76 CHECK_LE(priority, PRIO_MAX); 77 #if defined(ART_TARGET_ANDROID) 78 int result = setpriority(PRIO_PROCESS, pthread_gettid_np(pthread_), priority); 79 if (result != 0) { 80 PLOG(ERROR) << "Failed to setpriority to :" << priority; 81 } 82 #else 83 UNUSED(priority); 84 #endif 85 } 86 87 void ThreadPoolWorker::Run() { 88 Thread* self = Thread::Current(); 89 Task* task = nullptr; 90 thread_pool_->creation_barier_.Pass(self); 91 while ((task = thread_pool_->GetTask(self)) != nullptr) { 92 task->Run(self); 93 task->Finalize(); 94 } 95 } 96 97 void* ThreadPoolWorker::Callback(void* arg) { 98 ThreadPoolWorker* worker = reinterpret_cast<ThreadPoolWorker*>(arg); 99 Runtime* runtime = Runtime::Current(); 100 CHECK(runtime->AttachCurrentThread(worker->name_.c_str(), 101 true, 102 nullptr, 103 worker->thread_pool_->create_peers_)); 104 worker->thread_ = Thread::Current(); 105 // Mark thread pool workers as runtime-threads. 106 worker->thread_->SetIsRuntimeThread(true); 107 // Do work until its time to shut down. 108 worker->Run(); 109 runtime->DetachCurrentThread(); 110 return nullptr; 111 } 112 113 void ThreadPool::AddTask(Thread* self, Task* task) { 114 MutexLock mu(self, task_queue_lock_); 115 tasks_.push_back(task); 116 // If we have any waiters, signal one. 117 if (started_ && waiting_count_ != 0) { 118 task_queue_condition_.Signal(self); 119 } 120 } 121 122 void ThreadPool::RemoveAllTasks(Thread* self) { 123 MutexLock mu(self, task_queue_lock_); 124 tasks_.clear(); 125 } 126 127 ThreadPool::ThreadPool(const char* name, 128 size_t num_threads, 129 bool create_peers, 130 size_t worker_stack_size) 131 : name_(name), 132 task_queue_lock_("task queue lock"), 133 task_queue_condition_("task queue condition", task_queue_lock_), 134 completion_condition_("task completion condition", task_queue_lock_), 135 started_(false), 136 shutting_down_(false), 137 waiting_count_(0), 138 start_time_(0), 139 total_wait_time_(0), 140 creation_barier_(0), 141 max_active_workers_(num_threads), 142 create_peers_(create_peers), 143 worker_stack_size_(worker_stack_size) { 144 CreateThreads(); 145 } 146 147 void ThreadPool::CreateThreads() { 148 CHECK(threads_.empty()); 149 Thread* self = Thread::Current(); 150 { 151 MutexLock mu(self, task_queue_lock_); 152 shutting_down_ = false; 153 // Add one since the caller of constructor waits on the barrier too. 154 creation_barier_.Init(self, max_active_workers_); 155 while (GetThreadCount() < max_active_workers_) { 156 const std::string worker_name = StringPrintf("%s worker thread %zu", name_.c_str(), 157 GetThreadCount()); 158 threads_.push_back( 159 new ThreadPoolWorker(this, worker_name, worker_stack_size_)); 160 } 161 } 162 } 163 164 void ThreadPool::WaitForWorkersToBeCreated() { 165 creation_barier_.Increment(Thread::Current(), 0); 166 } 167 168 const std::vector<ThreadPoolWorker*>& ThreadPool::GetWorkers() { 169 // Wait for all the workers to be created before returning them. 170 WaitForWorkersToBeCreated(); 171 return threads_; 172 } 173 174 void ThreadPool::DeleteThreads() { 175 { 176 Thread* self = Thread::Current(); 177 MutexLock mu(self, task_queue_lock_); 178 // Tell any remaining workers to shut down. 179 shutting_down_ = true; 180 // Broadcast to everyone waiting. 181 task_queue_condition_.Broadcast(self); 182 completion_condition_.Broadcast(self); 183 } 184 // Wait for the threads to finish. We expect the user of the pool 185 // not to run multi-threaded calls to `CreateThreads` and `DeleteThreads`, 186 // so we don't guard the field here. 187 STLDeleteElements(&threads_); 188 } 189 190 void ThreadPool::SetMaxActiveWorkers(size_t max_workers) { 191 MutexLock mu(Thread::Current(), task_queue_lock_); 192 CHECK_LE(max_workers, GetThreadCount()); 193 max_active_workers_ = max_workers; 194 } 195 196 ThreadPool::~ThreadPool() { 197 DeleteThreads(); 198 } 199 200 void ThreadPool::StartWorkers(Thread* self) { 201 MutexLock mu(self, task_queue_lock_); 202 started_ = true; 203 task_queue_condition_.Broadcast(self); 204 start_time_ = NanoTime(); 205 total_wait_time_ = 0; 206 } 207 208 void ThreadPool::StopWorkers(Thread* self) { 209 MutexLock mu(self, task_queue_lock_); 210 started_ = false; 211 } 212 213 Task* ThreadPool::GetTask(Thread* self) { 214 MutexLock mu(self, task_queue_lock_); 215 while (!IsShuttingDown()) { 216 const size_t thread_count = GetThreadCount(); 217 // Ensure that we don't use more threads than the maximum active workers. 218 const size_t active_threads = thread_count - waiting_count_; 219 // <= since self is considered an active worker. 220 if (active_threads <= max_active_workers_) { 221 Task* task = TryGetTaskLocked(); 222 if (task != nullptr) { 223 return task; 224 } 225 } 226 227 ++waiting_count_; 228 if (waiting_count_ == GetThreadCount() && !HasOutstandingTasks()) { 229 // We may be done, lets broadcast to the completion condition. 230 completion_condition_.Broadcast(self); 231 } 232 const uint64_t wait_start = kMeasureWaitTime ? NanoTime() : 0; 233 task_queue_condition_.Wait(self); 234 if (kMeasureWaitTime) { 235 const uint64_t wait_end = NanoTime(); 236 total_wait_time_ += wait_end - std::max(wait_start, start_time_); 237 } 238 --waiting_count_; 239 } 240 241 // We are shutting down, return null to tell the worker thread to stop looping. 242 return nullptr; 243 } 244 245 Task* ThreadPool::TryGetTask(Thread* self) { 246 MutexLock mu(self, task_queue_lock_); 247 return TryGetTaskLocked(); 248 } 249 250 Task* ThreadPool::TryGetTaskLocked() { 251 if (HasOutstandingTasks()) { 252 Task* task = tasks_.front(); 253 tasks_.pop_front(); 254 return task; 255 } 256 return nullptr; 257 } 258 259 void ThreadPool::Wait(Thread* self, bool do_work, bool may_hold_locks) { 260 if (do_work) { 261 CHECK(!create_peers_); 262 Task* task = nullptr; 263 while ((task = TryGetTask(self)) != nullptr) { 264 task->Run(self); 265 task->Finalize(); 266 } 267 } 268 // Wait until each thread is waiting and the task list is empty. 269 MutexLock mu(self, task_queue_lock_); 270 while (!shutting_down_ && (waiting_count_ != GetThreadCount() || HasOutstandingTasks())) { 271 if (!may_hold_locks) { 272 completion_condition_.Wait(self); 273 } else { 274 completion_condition_.WaitHoldingLocks(self); 275 } 276 } 277 } 278 279 size_t ThreadPool::GetTaskCount(Thread* self) { 280 MutexLock mu(self, task_queue_lock_); 281 return tasks_.size(); 282 } 283 284 void ThreadPool::SetPthreadPriority(int priority) { 285 for (ThreadPoolWorker* worker : threads_) { 286 worker->SetPthreadPriority(priority); 287 } 288 } 289 290 } // namespace art 291