1 /* 2 * Copyright (C) 2014 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "task_processor.h" 18 19 #include "base/time_utils.h" 20 #include "scoped_thread_state_change.h" 21 22 namespace art { 23 namespace gc { 24 25 TaskProcessor::TaskProcessor() 26 : lock_(new Mutex("Task processor lock", kReferenceProcessorLock)), is_running_(false), 27 running_thread_(nullptr) { 28 // Piggyback off the reference processor lock level. 29 cond_.reset(new ConditionVariable("Task processor condition", *lock_)); 30 } 31 32 TaskProcessor::~TaskProcessor() { 33 delete lock_; 34 } 35 36 void TaskProcessor::AddTask(Thread* self, HeapTask* task) { 37 ScopedThreadStateChange tsc(self, kBlocked); 38 MutexLock mu(self, *lock_); 39 tasks_.insert(task); 40 cond_->Signal(self); 41 } 42 43 HeapTask* TaskProcessor::GetTask(Thread* self) { 44 ScopedThreadStateChange tsc(self, kBlocked); 45 MutexLock mu(self, *lock_); 46 while (true) { 47 if (tasks_.empty()) { 48 if (!is_running_) { 49 return nullptr; 50 } 51 cond_->Wait(self); // Empty queue, wait until we are signalled. 52 } else { 53 // Non empty queue, look at the top element and see if we are ready to run it. 54 const uint64_t current_time = NanoTime(); 55 HeapTask* task = *tasks_.begin(); 56 // If we are shutting down, return the task right away without waiting. Otherwise return the 57 // task if it is late enough. 58 uint64_t target_time = task->GetTargetRunTime(); 59 if (!is_running_ || target_time <= current_time) { 60 tasks_.erase(tasks_.begin()); 61 return task; 62 } 63 DCHECK_GT(target_time, current_time); 64 // Wait untl we hit the target run time. 65 const uint64_t delta_time = target_time - current_time; 66 const uint64_t ms_delta = NsToMs(delta_time); 67 const uint64_t ns_delta = delta_time - MsToNs(ms_delta); 68 cond_->TimedWait(self, static_cast<int64_t>(ms_delta), static_cast<int32_t>(ns_delta)); 69 } 70 } 71 UNREACHABLE(); 72 } 73 74 void TaskProcessor::UpdateTargetRunTime(Thread* self, HeapTask* task, uint64_t new_target_time) { 75 MutexLock mu(self, *lock_); 76 // Find the task. 77 auto range = tasks_.equal_range(task); 78 for (auto it = range.first; it != range.second; ++it) { 79 if (*it == task) { 80 // Check if the target time was updated, if so re-insert then wait. 81 if (new_target_time != task->GetTargetRunTime()) { 82 tasks_.erase(it); 83 task->SetTargetRunTime(new_target_time); 84 tasks_.insert(task); 85 // If we became the first task then we may need to signal since we changed the task that we 86 // are sleeping on. 87 if (*tasks_.begin() == task) { 88 cond_->Signal(self); 89 } 90 return; 91 } 92 } 93 } 94 } 95 96 bool TaskProcessor::IsRunning() const { 97 MutexLock mu(Thread::Current(), *lock_); 98 return is_running_; 99 } 100 101 Thread* TaskProcessor::GetRunningThread() const { 102 MutexLock mu(Thread::Current(), *lock_); 103 return running_thread_; 104 } 105 106 void TaskProcessor::Stop(Thread* self) { 107 MutexLock mu(self, *lock_); 108 is_running_ = false; 109 running_thread_ = nullptr; 110 cond_->Broadcast(self); 111 } 112 113 void TaskProcessor::Start(Thread* self) { 114 MutexLock mu(self, *lock_); 115 is_running_ = true; 116 running_thread_ = self; 117 } 118 119 void TaskProcessor::RunAllTasks(Thread* self) { 120 while (true) { 121 // Wait and get a task, may be interrupted. 122 HeapTask* task = GetTask(self); 123 if (task != nullptr) { 124 task->Run(self); 125 task->Finalize(); 126 } else if (!IsRunning()) { 127 break; 128 } 129 } 130 } 131 132 } // namespace gc 133 } // namespace art 134