1 /* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "thread_pool.h" 18 19 #include <pthread.h> 20 21 #include <sys/mman.h> 22 #include <sys/time.h> 23 #include <sys/resource.h> 24 25 #include "android-base/stringprintf.h" 26 27 #include "base/bit_utils.h" 28 #include "base/casts.h" 29 #include "base/logging.h" 30 #include "base/stl_util.h" 31 #include "base/time_utils.h" 32 #include "runtime.h" 33 #include "thread-current-inl.h" 34 35 namespace art { 36 37 using android::base::StringPrintf; 38 39 static constexpr bool kMeasureWaitTime = false; 40 41 ThreadPoolWorker::ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name, 42 size_t stack_size) 43 : thread_pool_(thread_pool), 44 name_(name) { 45 // Add an inaccessible page to catch stack overflow. 46 stack_size += kPageSize; 47 std::string error_msg; 48 stack_.reset(MemMap::MapAnonymous(name.c_str(), nullptr, stack_size, PROT_READ | PROT_WRITE, 49 false, false, &error_msg)); 50 CHECK(stack_.get() != nullptr) << error_msg; 51 CHECK_ALIGNED(stack_->Begin(), kPageSize); 52 int mprotect_result = mprotect(stack_->Begin(), kPageSize, PROT_NONE); 53 CHECK_EQ(mprotect_result, 0) << "Failed to mprotect() bottom page of thread pool worker stack."; 54 const char* reason = "new thread pool worker thread"; 55 pthread_attr_t attr; 56 CHECK_PTHREAD_CALL(pthread_attr_init, (&attr), reason); 57 CHECK_PTHREAD_CALL(pthread_attr_setstack, (&attr, stack_->Begin(), stack_->Size()), reason); 58 CHECK_PTHREAD_CALL(pthread_create, (&pthread_, &attr, &Callback, this), reason); 59 CHECK_PTHREAD_CALL(pthread_attr_destroy, (&attr), reason); 60 } 61 62 ThreadPoolWorker::~ThreadPoolWorker() { 63 CHECK_PTHREAD_CALL(pthread_join, (pthread_, nullptr), "thread pool worker shutdown"); 64 } 65 66 void ThreadPoolWorker::SetPthreadPriority(int priority) { 67 CHECK_GE(priority, PRIO_MIN); 68 CHECK_LE(priority, PRIO_MAX); 69 #if defined(ART_TARGET_ANDROID) 70 int result = setpriority(PRIO_PROCESS, pthread_gettid_np(pthread_), priority); 71 if (result != 0) { 72 PLOG(ERROR) << "Failed to setpriority to :" << priority; 73 } 74 #else 75 UNUSED(priority); 76 #endif 77 } 78 79 void ThreadPoolWorker::Run() { 80 Thread* self = Thread::Current(); 81 Task* task = nullptr; 82 thread_pool_->creation_barier_.Wait(self); 83 while ((task = thread_pool_->GetTask(self)) != nullptr) { 84 task->Run(self); 85 task->Finalize(); 86 } 87 } 88 89 void* ThreadPoolWorker::Callback(void* arg) { 90 ThreadPoolWorker* worker = reinterpret_cast<ThreadPoolWorker*>(arg); 91 Runtime* runtime = Runtime::Current(); 92 CHECK(runtime->AttachCurrentThread(worker->name_.c_str(), 93 true, 94 nullptr, 95 worker->thread_pool_->create_peers_)); 96 worker->thread_ = Thread::Current(); 97 // Thread pool workers cannot call into java. 98 worker->thread_->SetCanCallIntoJava(false); 99 // Do work until its time to shut down. 100 worker->Run(); 101 runtime->DetachCurrentThread(); 102 return nullptr; 103 } 104 105 void ThreadPool::AddTask(Thread* self, Task* task) { 106 MutexLock mu(self, task_queue_lock_); 107 tasks_.push_back(task); 108 // If we have any waiters, signal one. 109 if (started_ && waiting_count_ != 0) { 110 task_queue_condition_.Signal(self); 111 } 112 } 113 114 void ThreadPool::RemoveAllTasks(Thread* self) { 115 MutexLock mu(self, task_queue_lock_); 116 tasks_.clear(); 117 } 118 119 ThreadPool::ThreadPool(const char* name, size_t num_threads, bool create_peers) 120 : name_(name), 121 task_queue_lock_("task queue lock"), 122 task_queue_condition_("task queue condition", task_queue_lock_), 123 completion_condition_("task completion condition", task_queue_lock_), 124 started_(false), 125 shutting_down_(false), 126 waiting_count_(0), 127 start_time_(0), 128 total_wait_time_(0), 129 // Add one since the caller of constructor waits on the barrier too. 130 creation_barier_(num_threads + 1), 131 max_active_workers_(num_threads), 132 create_peers_(create_peers) { 133 Thread* self = Thread::Current(); 134 while (GetThreadCount() < num_threads) { 135 const std::string worker_name = StringPrintf("%s worker thread %zu", name_.c_str(), 136 GetThreadCount()); 137 threads_.push_back( 138 new ThreadPoolWorker(this, worker_name, ThreadPoolWorker::kDefaultStackSize)); 139 } 140 // Wait for all of the threads to attach. 141 creation_barier_.Wait(self); 142 } 143 144 void ThreadPool::SetMaxActiveWorkers(size_t threads) { 145 MutexLock mu(Thread::Current(), task_queue_lock_); 146 CHECK_LE(threads, GetThreadCount()); 147 max_active_workers_ = threads; 148 } 149 150 ThreadPool::~ThreadPool() { 151 { 152 Thread* self = Thread::Current(); 153 MutexLock mu(self, task_queue_lock_); 154 // Tell any remaining workers to shut down. 155 shutting_down_ = true; 156 // Broadcast to everyone waiting. 157 task_queue_condition_.Broadcast(self); 158 completion_condition_.Broadcast(self); 159 } 160 // Wait for the threads to finish. 161 STLDeleteElements(&threads_); 162 } 163 164 void ThreadPool::StartWorkers(Thread* self) { 165 MutexLock mu(self, task_queue_lock_); 166 started_ = true; 167 task_queue_condition_.Broadcast(self); 168 start_time_ = NanoTime(); 169 total_wait_time_ = 0; 170 } 171 172 void ThreadPool::StopWorkers(Thread* self) { 173 MutexLock mu(self, task_queue_lock_); 174 started_ = false; 175 } 176 177 Task* ThreadPool::GetTask(Thread* self) { 178 MutexLock mu(self, task_queue_lock_); 179 while (!IsShuttingDown()) { 180 const size_t thread_count = GetThreadCount(); 181 // Ensure that we don't use more threads than the maximum active workers. 182 const size_t active_threads = thread_count - waiting_count_; 183 // <= since self is considered an active worker. 184 if (active_threads <= max_active_workers_) { 185 Task* task = TryGetTaskLocked(); 186 if (task != nullptr) { 187 return task; 188 } 189 } 190 191 ++waiting_count_; 192 if (waiting_count_ == GetThreadCount() && !HasOutstandingTasks()) { 193 // We may be done, lets broadcast to the completion condition. 194 completion_condition_.Broadcast(self); 195 } 196 const uint64_t wait_start = kMeasureWaitTime ? NanoTime() : 0; 197 task_queue_condition_.Wait(self); 198 if (kMeasureWaitTime) { 199 const uint64_t wait_end = NanoTime(); 200 total_wait_time_ += wait_end - std::max(wait_start, start_time_); 201 } 202 --waiting_count_; 203 } 204 205 // We are shutting down, return null to tell the worker thread to stop looping. 206 return nullptr; 207 } 208 209 Task* ThreadPool::TryGetTask(Thread* self) { 210 MutexLock mu(self, task_queue_lock_); 211 return TryGetTaskLocked(); 212 } 213 214 Task* ThreadPool::TryGetTaskLocked() { 215 if (HasOutstandingTasks()) { 216 Task* task = tasks_.front(); 217 tasks_.pop_front(); 218 return task; 219 } 220 return nullptr; 221 } 222 223 void ThreadPool::Wait(Thread* self, bool do_work, bool may_hold_locks) { 224 if (do_work) { 225 CHECK(!create_peers_); 226 Task* task = nullptr; 227 while ((task = TryGetTask(self)) != nullptr) { 228 task->Run(self); 229 task->Finalize(); 230 } 231 } 232 // Wait until each thread is waiting and the task list is empty. 233 MutexLock mu(self, task_queue_lock_); 234 while (!shutting_down_ && (waiting_count_ != GetThreadCount() || HasOutstandingTasks())) { 235 if (!may_hold_locks) { 236 completion_condition_.Wait(self); 237 } else { 238 completion_condition_.WaitHoldingLocks(self); 239 } 240 } 241 } 242 243 size_t ThreadPool::GetTaskCount(Thread* self) { 244 MutexLock mu(self, task_queue_lock_); 245 return tasks_.size(); 246 } 247 248 void ThreadPool::SetPthreadPriority(int priority) { 249 for (ThreadPoolWorker* worker : threads_) { 250 worker->SetPthreadPriority(priority); 251 } 252 } 253 254 } // namespace art 255