1 // Copyright 2017 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/task_scheduler/scheduler_worker_pool.h" 6 7 #include "base/bind.h" 8 #include "base/bind_helpers.h" 9 #include "base/lazy_instance.h" 10 #include "base/task_scheduler/delayed_task_manager.h" 11 #include "base/task_scheduler/task_tracker.h" 12 #include "base/threading/thread_local.h" 13 14 namespace base { 15 namespace internal { 16 17 namespace { 18 19 // The number of SchedulerWorkerPool that are alive in this process. This 20 // variable should only be incremented when the SchedulerWorkerPool instances 21 // are brought up (on the main thread; before any tasks are posted) and 22 // decremented when the same instances are brought down (i.e., only when unit 23 // tests tear down the task environment and never in production). This makes the 24 // variable const while worker threads are up and as such it doesn't need to be 25 // atomic. It is used to tell when a task is posted from the main thread after 26 // the task environment was brought down in unit tests so that 27 // SchedulerWorkerPool bound TaskRunners can return false on PostTask, letting 28 // such callers know they should complete necessary work synchronously. Note: 29 // |!g_active_pools_count| is generally equivalent to 30 // |!TaskScheduler::GetInstance()| but has the advantage of being valid in 31 // task_scheduler unit tests that don't instantiate a full TaskScheduler. 32 int g_active_pools_count = 0; 33 34 // SchedulerWorkerPool that owns the current thread, if any. 35 LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky 36 tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER; 37 38 const SchedulerWorkerPool* GetCurrentWorkerPool() { 39 return tls_current_worker_pool.Get().Get(); 40 } 41 42 } // namespace 43 44 // A task runner that runs tasks in parallel. 45 class SchedulerParallelTaskRunner : public TaskRunner { 46 public: 47 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so 48 // long as |worker_pool| is alive. 49 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. 50 SchedulerParallelTaskRunner(const TaskTraits& traits, 51 SchedulerWorkerPool* worker_pool) 52 : traits_(traits), worker_pool_(worker_pool) { 53 DCHECK(worker_pool_); 54 } 55 56 // TaskRunner: 57 bool PostDelayedTask(const Location& from_here, 58 OnceClosure closure, 59 TimeDelta delay) override { 60 if (!g_active_pools_count) 61 return false; 62 63 // Post the task as part of a one-off single-task Sequence. 64 return worker_pool_->PostTaskWithSequence( 65 Task(from_here, std::move(closure), traits_, delay), 66 MakeRefCounted<Sequence>()); 67 } 68 69 bool RunsTasksInCurrentSequence() const override { 70 return GetCurrentWorkerPool() == worker_pool_; 71 } 72 73 private: 74 ~SchedulerParallelTaskRunner() override = default; 75 76 const TaskTraits traits_; 77 SchedulerWorkerPool* const worker_pool_; 78 79 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); 80 }; 81 82 // A task runner that runs tasks in sequence. 83 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { 84 public: 85 // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks 86 // so long as |worker_pool| is alive. 87 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. 88 SchedulerSequencedTaskRunner(const TaskTraits& traits, 89 SchedulerWorkerPool* worker_pool) 90 : traits_(traits), worker_pool_(worker_pool) { 91 DCHECK(worker_pool_); 92 } 93 94 // SequencedTaskRunner: 95 bool PostDelayedTask(const Location& from_here, 96 OnceClosure closure, 97 TimeDelta delay) override { 98 if (!g_active_pools_count) 99 return false; 100 101 Task task(from_here, std::move(closure), traits_, delay); 102 task.sequenced_task_runner_ref = this; 103 104 // Post the task as part of |sequence_|. 105 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_); 106 } 107 108 bool PostNonNestableDelayedTask(const Location& from_here, 109 OnceClosure closure, 110 base::TimeDelta delay) override { 111 // Tasks are never nested within the task scheduler. 112 return PostDelayedTask(from_here, std::move(closure), delay); 113 } 114 115 bool RunsTasksInCurrentSequence() const override { 116 return sequence_->token() == SequenceToken::GetForCurrentThread(); 117 } 118 119 private: 120 ~SchedulerSequencedTaskRunner() override = default; 121 122 // Sequence for all Tasks posted through this TaskRunner. 123 const scoped_refptr<Sequence> sequence_ = MakeRefCounted<Sequence>(); 124 125 const TaskTraits traits_; 126 SchedulerWorkerPool* const worker_pool_; 127 128 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); 129 }; 130 131 scoped_refptr<TaskRunner> SchedulerWorkerPool::CreateTaskRunnerWithTraits( 132 const TaskTraits& traits) { 133 return MakeRefCounted<SchedulerParallelTaskRunner>(traits, this); 134 } 135 136 scoped_refptr<SequencedTaskRunner> 137 SchedulerWorkerPool::CreateSequencedTaskRunnerWithTraits( 138 const TaskTraits& traits) { 139 return MakeRefCounted<SchedulerSequencedTaskRunner>(traits, this); 140 } 141 142 bool SchedulerWorkerPool::PostTaskWithSequence( 143 Task task, 144 scoped_refptr<Sequence> sequence) { 145 DCHECK(task.task); 146 DCHECK(sequence); 147 148 if (!task_tracker_->WillPostTask(&task)) 149 return false; 150 151 if (task.delayed_run_time.is_null()) { 152 PostTaskWithSequenceNow(std::move(task), std::move(sequence)); 153 } else { 154 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167 155 // for details. 156 CHECK(task.task); 157 delayed_task_manager_->AddDelayedTask( 158 std::move(task), BindOnce( 159 [](scoped_refptr<Sequence> sequence, 160 SchedulerWorkerPool* worker_pool, Task task) { 161 worker_pool->PostTaskWithSequenceNow( 162 std::move(task), std::move(sequence)); 163 }, 164 std::move(sequence), Unretained(this))); 165 } 166 167 return true; 168 } 169 170 SchedulerWorkerPool::SchedulerWorkerPool( 171 TrackedRef<TaskTracker> task_tracker, 172 DelayedTaskManager* delayed_task_manager) 173 : task_tracker_(std::move(task_tracker)), 174 delayed_task_manager_(delayed_task_manager) { 175 DCHECK(task_tracker_); 176 DCHECK(delayed_task_manager_); 177 ++g_active_pools_count; 178 } 179 180 SchedulerWorkerPool::~SchedulerWorkerPool() { 181 --g_active_pools_count; 182 DCHECK_GE(g_active_pools_count, 0); 183 } 184 185 void SchedulerWorkerPool::BindToCurrentThread() { 186 DCHECK(!GetCurrentWorkerPool()); 187 tls_current_worker_pool.Get().Set(this); 188 } 189 190 void SchedulerWorkerPool::UnbindFromCurrentThread() { 191 DCHECK(GetCurrentWorkerPool()); 192 tls_current_worker_pool.Get().Set(nullptr); 193 } 194 195 void SchedulerWorkerPool::PostTaskWithSequenceNow( 196 Task task, 197 scoped_refptr<Sequence> sequence) { 198 DCHECK(task.task); 199 DCHECK(sequence); 200 201 // Confirm that |task| is ready to run (its delayed run time is either null or 202 // in the past). 203 DCHECK_LE(task.delayed_run_time, TimeTicks::Now()); 204 205 const bool sequence_was_empty = sequence->PushTask(std::move(task)); 206 if (sequence_was_empty) { 207 // Try to schedule |sequence| if it was empty before |task| was inserted 208 // into it. Otherwise, one of these must be true: 209 // - |sequence| is already scheduled, or, 210 // - The pool is running a Task from |sequence|. The pool is expected to 211 // reschedule |sequence| once it's done running the Task. 212 sequence = task_tracker_->WillScheduleSequence(std::move(sequence), this); 213 if (sequence) 214 OnCanScheduleSequence(std::move(sequence)); 215 } 216 } 217 218 } // namespace internal 219 } // namespace base 220