1 /* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "thread_pool.h" 18 19 #include <pthread.h> 20 21 #include <sys/time.h> 22 #include <sys/resource.h> 23 24 #include "android-base/stringprintf.h" 25 26 #include "base/bit_utils.h" 27 #include "base/casts.h" 28 #include "base/logging.h" 29 #include "base/stl_util.h" 30 #include "base/time_utils.h" 31 #include "runtime.h" 32 #include "thread-inl.h" 33 34 namespace art { 35 36 using android::base::StringPrintf; 37 38 static constexpr bool kMeasureWaitTime = false; 39 40 ThreadPoolWorker::ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name, 41 size_t stack_size) 42 : thread_pool_(thread_pool), 43 name_(name) { 44 // Add an inaccessible page to catch stack overflow. 45 stack_size += kPageSize; 46 std::string error_msg; 47 stack_.reset(MemMap::MapAnonymous(name.c_str(), nullptr, stack_size, PROT_READ | PROT_WRITE, 48 false, false, &error_msg)); 49 CHECK(stack_.get() != nullptr) << error_msg; 50 CHECK_ALIGNED(stack_->Begin(), kPageSize); 51 int mprotect_result = mprotect(stack_->Begin(), kPageSize, PROT_NONE); 52 CHECK_EQ(mprotect_result, 0) << "Failed to mprotect() bottom page of thread pool worker stack."; 53 const char* reason = "new thread pool worker thread"; 54 pthread_attr_t attr; 55 CHECK_PTHREAD_CALL(pthread_attr_init, (&attr), reason); 56 CHECK_PTHREAD_CALL(pthread_attr_setstack, (&attr, stack_->Begin(), stack_->Size()), reason); 57 CHECK_PTHREAD_CALL(pthread_create, (&pthread_, &attr, &Callback, this), reason); 58 CHECK_PTHREAD_CALL(pthread_attr_destroy, (&attr), reason); 59 } 60 61 ThreadPoolWorker::~ThreadPoolWorker() { 62 CHECK_PTHREAD_CALL(pthread_join, (pthread_, nullptr), "thread pool worker shutdown"); 63 } 64 65 void ThreadPoolWorker::SetPthreadPriority(int priority) { 66 CHECK_GE(priority, PRIO_MIN); 67 CHECK_LE(priority, PRIO_MAX); 68 #if defined(ART_TARGET_ANDROID) 69 int result = setpriority(PRIO_PROCESS, pthread_gettid_np(pthread_), priority); 70 if (result != 0) { 71 PLOG(ERROR) << "Failed to setpriority to :" << priority; 72 } 73 #else 74 UNUSED(priority); 75 #endif 76 } 77 78 void ThreadPoolWorker::Run() { 79 Thread* self = Thread::Current(); 80 Task* task = nullptr; 81 thread_pool_->creation_barier_.Wait(self); 82 while ((task = thread_pool_->GetTask(self)) != nullptr) { 83 task->Run(self); 84 task->Finalize(); 85 } 86 } 87 88 void* ThreadPoolWorker::Callback(void* arg) { 89 ThreadPoolWorker* worker = reinterpret_cast<ThreadPoolWorker*>(arg); 90 Runtime* runtime = Runtime::Current(); 91 CHECK(runtime->AttachCurrentThread(worker->name_.c_str(), 92 true, 93 nullptr, 94 worker->thread_pool_->create_peers_)); 95 worker->thread_ = Thread::Current(); 96 // Thread pool workers cannot call into java. 97 worker->thread_->SetCanCallIntoJava(false); 98 // Do work until its time to shut down. 99 worker->Run(); 100 runtime->DetachCurrentThread(); 101 return nullptr; 102 } 103 104 void ThreadPool::AddTask(Thread* self, Task* task) { 105 MutexLock mu(self, task_queue_lock_); 106 tasks_.push_back(task); 107 // If we have any waiters, signal one. 108 if (started_ && waiting_count_ != 0) { 109 task_queue_condition_.Signal(self); 110 } 111 } 112 113 void ThreadPool::RemoveAllTasks(Thread* self) { 114 MutexLock mu(self, task_queue_lock_); 115 tasks_.clear(); 116 } 117 118 ThreadPool::ThreadPool(const char* name, size_t num_threads, bool create_peers) 119 : name_(name), 120 task_queue_lock_("task queue lock"), 121 task_queue_condition_("task queue condition", task_queue_lock_), 122 completion_condition_("task completion condition", task_queue_lock_), 123 started_(false), 124 shutting_down_(false), 125 waiting_count_(0), 126 start_time_(0), 127 total_wait_time_(0), 128 // Add one since the caller of constructor waits on the barrier too. 129 creation_barier_(num_threads + 1), 130 max_active_workers_(num_threads), 131 create_peers_(create_peers) { 132 Thread* self = Thread::Current(); 133 while (GetThreadCount() < num_threads) { 134 const std::string worker_name = StringPrintf("%s worker thread %zu", name_.c_str(), 135 GetThreadCount()); 136 threads_.push_back( 137 new ThreadPoolWorker(this, worker_name, ThreadPoolWorker::kDefaultStackSize)); 138 } 139 // Wait for all of the threads to attach. 140 creation_barier_.Wait(self); 141 } 142 143 void ThreadPool::SetMaxActiveWorkers(size_t threads) { 144 MutexLock mu(Thread::Current(), task_queue_lock_); 145 CHECK_LE(threads, GetThreadCount()); 146 max_active_workers_ = threads; 147 } 148 149 ThreadPool::~ThreadPool() { 150 { 151 Thread* self = Thread::Current(); 152 MutexLock mu(self, task_queue_lock_); 153 // Tell any remaining workers to shut down. 154 shutting_down_ = true; 155 // Broadcast to everyone waiting. 156 task_queue_condition_.Broadcast(self); 157 completion_condition_.Broadcast(self); 158 } 159 // Wait for the threads to finish. 160 STLDeleteElements(&threads_); 161 } 162 163 void ThreadPool::StartWorkers(Thread* self) { 164 MutexLock mu(self, task_queue_lock_); 165 started_ = true; 166 task_queue_condition_.Broadcast(self); 167 start_time_ = NanoTime(); 168 total_wait_time_ = 0; 169 } 170 171 void ThreadPool::StopWorkers(Thread* self) { 172 MutexLock mu(self, task_queue_lock_); 173 started_ = false; 174 } 175 176 Task* ThreadPool::GetTask(Thread* self) { 177 MutexLock mu(self, task_queue_lock_); 178 while (!IsShuttingDown()) { 179 const size_t thread_count = GetThreadCount(); 180 // Ensure that we don't use more threads than the maximum active workers. 181 const size_t active_threads = thread_count - waiting_count_; 182 // <= since self is considered an active worker. 183 if (active_threads <= max_active_workers_) { 184 Task* task = TryGetTaskLocked(); 185 if (task != nullptr) { 186 return task; 187 } 188 } 189 190 ++waiting_count_; 191 if (waiting_count_ == GetThreadCount() && !HasOutstandingTasks()) { 192 // We may be done, lets broadcast to the completion condition. 193 completion_condition_.Broadcast(self); 194 } 195 const uint64_t wait_start = kMeasureWaitTime ? NanoTime() : 0; 196 task_queue_condition_.Wait(self); 197 if (kMeasureWaitTime) { 198 const uint64_t wait_end = NanoTime(); 199 total_wait_time_ += wait_end - std::max(wait_start, start_time_); 200 } 201 --waiting_count_; 202 } 203 204 // We are shutting down, return null to tell the worker thread to stop looping. 205 return nullptr; 206 } 207 208 Task* ThreadPool::TryGetTask(Thread* self) { 209 MutexLock mu(self, task_queue_lock_); 210 return TryGetTaskLocked(); 211 } 212 213 Task* ThreadPool::TryGetTaskLocked() { 214 if (HasOutstandingTasks()) { 215 Task* task = tasks_.front(); 216 tasks_.pop_front(); 217 return task; 218 } 219 return nullptr; 220 } 221 222 void ThreadPool::Wait(Thread* self, bool do_work, bool may_hold_locks) { 223 if (do_work) { 224 CHECK(!create_peers_); 225 Task* task = nullptr; 226 while ((task = TryGetTask(self)) != nullptr) { 227 task->Run(self); 228 task->Finalize(); 229 } 230 } 231 // Wait until each thread is waiting and the task list is empty. 232 MutexLock mu(self, task_queue_lock_); 233 while (!shutting_down_ && (waiting_count_ != GetThreadCount() || HasOutstandingTasks())) { 234 if (!may_hold_locks) { 235 completion_condition_.Wait(self); 236 } else { 237 completion_condition_.WaitHoldingLocks(self); 238 } 239 } 240 } 241 242 size_t ThreadPool::GetTaskCount(Thread* self) { 243 MutexLock mu(self, task_queue_lock_); 244 return tasks_.size(); 245 } 246 247 void ThreadPool::SetPthreadPriority(int priority) { 248 for (ThreadPoolWorker* worker : threads_) { 249 worker->SetPthreadPriority(priority); 250 } 251 } 252 253 } // namespace art 254