1 /* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "thread_pool.h" 18 19 #include <sys/mman.h> 20 #include <sys/resource.h> 21 #include <sys/time.h> 22 23 #include <pthread.h> 24 25 #include <android-base/logging.h> 26 #include <android-base/stringprintf.h> 27 28 #include "base/bit_utils.h" 29 #include "base/casts.h" 30 #include "base/stl_util.h" 31 #include "base/time_utils.h" 32 #include "base/utils.h" 33 #include "runtime.h" 34 #include "thread-current-inl.h" 35 36 namespace art { 37 38 using android::base::StringPrintf; 39 40 static constexpr bool kMeasureWaitTime = false; 41 42 ThreadPoolWorker::ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name, 43 size_t stack_size) 44 : thread_pool_(thread_pool), 45 name_(name) { 46 // Add an inaccessible page to catch stack overflow. 47 stack_size += kPageSize; 48 std::string error_msg; 49 stack_.reset(MemMap::MapAnonymous(name.c_str(), nullptr, stack_size, PROT_READ | PROT_WRITE, 50 false, false, &error_msg)); 51 CHECK(stack_.get() != nullptr) << error_msg; 52 CHECK_ALIGNED(stack_->Begin(), kPageSize); 53 CheckedCall(mprotect, 54 "mprotect bottom page of thread pool worker stack", 55 stack_->Begin(), 56 kPageSize, 57 PROT_NONE); 58 const char* reason = "new thread pool worker thread"; 59 pthread_attr_t attr; 60 CHECK_PTHREAD_CALL(pthread_attr_init, (&attr), reason); 61 CHECK_PTHREAD_CALL(pthread_attr_setstack, (&attr, stack_->Begin(), stack_->Size()), reason); 62 CHECK_PTHREAD_CALL(pthread_create, (&pthread_, &attr, &Callback, this), reason); 63 CHECK_PTHREAD_CALL(pthread_attr_destroy, (&attr), reason); 64 } 65 66 ThreadPoolWorker::~ThreadPoolWorker() { 67 CHECK_PTHREAD_CALL(pthread_join, (pthread_, nullptr), "thread pool worker shutdown"); 68 } 69 70 void ThreadPoolWorker::SetPthreadPriority(int priority) { 71 CHECK_GE(priority, PRIO_MIN); 72 CHECK_LE(priority, PRIO_MAX); 73 #if defined(ART_TARGET_ANDROID) 74 int result = setpriority(PRIO_PROCESS, pthread_gettid_np(pthread_), priority); 75 if (result != 0) { 76 PLOG(ERROR) << "Failed to setpriority to :" << priority; 77 } 78 #else 79 UNUSED(priority); 80 #endif 81 } 82 83 void ThreadPoolWorker::Run() { 84 Thread* self = Thread::Current(); 85 Task* task = nullptr; 86 thread_pool_->creation_barier_.Wait(self); 87 while ((task = thread_pool_->GetTask(self)) != nullptr) { 88 task->Run(self); 89 task->Finalize(); 90 } 91 } 92 93 void* ThreadPoolWorker::Callback(void* arg) { 94 ThreadPoolWorker* worker = reinterpret_cast<ThreadPoolWorker*>(arg); 95 Runtime* runtime = Runtime::Current(); 96 CHECK(runtime->AttachCurrentThread(worker->name_.c_str(), 97 true, 98 nullptr, 99 worker->thread_pool_->create_peers_)); 100 worker->thread_ = Thread::Current(); 101 // Thread pool workers cannot call into java. 102 worker->thread_->SetCanCallIntoJava(false); 103 // Do work until its time to shut down. 104 worker->Run(); 105 runtime->DetachCurrentThread(); 106 return nullptr; 107 } 108 109 void ThreadPool::AddTask(Thread* self, Task* task) { 110 MutexLock mu(self, task_queue_lock_); 111 tasks_.push_back(task); 112 // If we have any waiters, signal one. 113 if (started_ && waiting_count_ != 0) { 114 task_queue_condition_.Signal(self); 115 } 116 } 117 118 void ThreadPool::RemoveAllTasks(Thread* self) { 119 MutexLock mu(self, task_queue_lock_); 120 tasks_.clear(); 121 } 122 123 ThreadPool::ThreadPool(const char* name, size_t num_threads, bool create_peers) 124 : name_(name), 125 task_queue_lock_("task queue lock"), 126 task_queue_condition_("task queue condition", task_queue_lock_), 127 completion_condition_("task completion condition", task_queue_lock_), 128 started_(false), 129 shutting_down_(false), 130 waiting_count_(0), 131 start_time_(0), 132 total_wait_time_(0), 133 // Add one since the caller of constructor waits on the barrier too. 134 creation_barier_(num_threads + 1), 135 max_active_workers_(num_threads), 136 create_peers_(create_peers) { 137 Thread* self = Thread::Current(); 138 while (GetThreadCount() < num_threads) { 139 const std::string worker_name = StringPrintf("%s worker thread %zu", name_.c_str(), 140 GetThreadCount()); 141 threads_.push_back( 142 new ThreadPoolWorker(this, worker_name, ThreadPoolWorker::kDefaultStackSize)); 143 } 144 // Wait for all of the threads to attach. 145 creation_barier_.Wait(self); 146 } 147 148 void ThreadPool::SetMaxActiveWorkers(size_t threads) { 149 MutexLock mu(Thread::Current(), task_queue_lock_); 150 CHECK_LE(threads, GetThreadCount()); 151 max_active_workers_ = threads; 152 } 153 154 ThreadPool::~ThreadPool() { 155 { 156 Thread* self = Thread::Current(); 157 MutexLock mu(self, task_queue_lock_); 158 // Tell any remaining workers to shut down. 159 shutting_down_ = true; 160 // Broadcast to everyone waiting. 161 task_queue_condition_.Broadcast(self); 162 completion_condition_.Broadcast(self); 163 } 164 // Wait for the threads to finish. 165 STLDeleteElements(&threads_); 166 } 167 168 void ThreadPool::StartWorkers(Thread* self) { 169 MutexLock mu(self, task_queue_lock_); 170 started_ = true; 171 task_queue_condition_.Broadcast(self); 172 start_time_ = NanoTime(); 173 total_wait_time_ = 0; 174 } 175 176 void ThreadPool::StopWorkers(Thread* self) { 177 MutexLock mu(self, task_queue_lock_); 178 started_ = false; 179 } 180 181 Task* ThreadPool::GetTask(Thread* self) { 182 MutexLock mu(self, task_queue_lock_); 183 while (!IsShuttingDown()) { 184 const size_t thread_count = GetThreadCount(); 185 // Ensure that we don't use more threads than the maximum active workers. 186 const size_t active_threads = thread_count - waiting_count_; 187 // <= since self is considered an active worker. 188 if (active_threads <= max_active_workers_) { 189 Task* task = TryGetTaskLocked(); 190 if (task != nullptr) { 191 return task; 192 } 193 } 194 195 ++waiting_count_; 196 if (waiting_count_ == GetThreadCount() && !HasOutstandingTasks()) { 197 // We may be done, lets broadcast to the completion condition. 198 completion_condition_.Broadcast(self); 199 } 200 const uint64_t wait_start = kMeasureWaitTime ? NanoTime() : 0; 201 task_queue_condition_.Wait(self); 202 if (kMeasureWaitTime) { 203 const uint64_t wait_end = NanoTime(); 204 total_wait_time_ += wait_end - std::max(wait_start, start_time_); 205 } 206 --waiting_count_; 207 } 208 209 // We are shutting down, return null to tell the worker thread to stop looping. 210 return nullptr; 211 } 212 213 Task* ThreadPool::TryGetTask(Thread* self) { 214 MutexLock mu(self, task_queue_lock_); 215 return TryGetTaskLocked(); 216 } 217 218 Task* ThreadPool::TryGetTaskLocked() { 219 if (HasOutstandingTasks()) { 220 Task* task = tasks_.front(); 221 tasks_.pop_front(); 222 return task; 223 } 224 return nullptr; 225 } 226 227 void ThreadPool::Wait(Thread* self, bool do_work, bool may_hold_locks) { 228 if (do_work) { 229 CHECK(!create_peers_); 230 Task* task = nullptr; 231 while ((task = TryGetTask(self)) != nullptr) { 232 task->Run(self); 233 task->Finalize(); 234 } 235 } 236 // Wait until each thread is waiting and the task list is empty. 237 MutexLock mu(self, task_queue_lock_); 238 while (!shutting_down_ && (waiting_count_ != GetThreadCount() || HasOutstandingTasks())) { 239 if (!may_hold_locks) { 240 completion_condition_.Wait(self); 241 } else { 242 completion_condition_.WaitHoldingLocks(self); 243 } 244 } 245 } 246 247 size_t ThreadPool::GetTaskCount(Thread* self) { 248 MutexLock mu(self, task_queue_lock_); 249 return tasks_.size(); 250 } 251 252 void ThreadPool::SetPthreadPriority(int priority) { 253 for (ThreadPoolWorker* worker : threads_) { 254 worker->SetPthreadPriority(priority); 255 } 256 } 257 258 } // namespace art 259