Home | History | Annotate | Download | only in task_scheduler
      1 // Copyright 2017 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "base/task_scheduler/scheduler_worker_pool.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/bind_helpers.h"
      9 #include "base/lazy_instance.h"
     10 #include "base/task_scheduler/delayed_task_manager.h"
     11 #include "base/task_scheduler/task_tracker.h"
     12 #include "base/threading/thread_local.h"
     13 
     14 namespace base {
     15 namespace internal {
     16 
     17 namespace {
     18 
     19 // The number of SchedulerWorkerPool that are alive in this process. This
     20 // variable should only be incremented when the SchedulerWorkerPool instances
     21 // are brought up (on the main thread; before any tasks are posted) and
     22 // decremented when the same instances are brought down (i.e., only when unit
     23 // tests tear down the task environment and never in production). This makes the
     24 // variable const while worker threads are up and as such it doesn't need to be
     25 // atomic. It is used to tell when a task is posted from the main thread after
     26 // the task environment was brought down in unit tests so that
     27 // SchedulerWorkerPool bound TaskRunners can return false on PostTask, letting
     28 // such callers know they should complete necessary work synchronously. Note:
     29 // |!g_active_pools_count| is generally equivalent to
     30 // |!TaskScheduler::GetInstance()| but has the advantage of being valid in
     31 // task_scheduler unit tests that don't instantiate a full TaskScheduler.
     32 int g_active_pools_count = 0;
     33 
     34 // SchedulerWorkerPool that owns the current thread, if any.
     35 LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky
     36     tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER;
     37 
     38 const SchedulerWorkerPool* GetCurrentWorkerPool() {
     39   return tls_current_worker_pool.Get().Get();
     40 }
     41 
     42 }  // namespace
     43 
     44 // A task runner that runs tasks in parallel.
     45 class SchedulerParallelTaskRunner : public TaskRunner {
     46  public:
     47   // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
     48   // long as |worker_pool| is alive.
     49   // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
     50   SchedulerParallelTaskRunner(const TaskTraits& traits,
     51                               SchedulerWorkerPool* worker_pool)
     52       : traits_(traits), worker_pool_(worker_pool) {
     53     DCHECK(worker_pool_);
     54   }
     55 
     56   // TaskRunner:
     57   bool PostDelayedTask(const Location& from_here,
     58                        OnceClosure closure,
     59                        TimeDelta delay) override {
     60     if (!g_active_pools_count)
     61       return false;
     62 
     63     // Post the task as part of a one-off single-task Sequence.
     64     return worker_pool_->PostTaskWithSequence(
     65         Task(from_here, std::move(closure), traits_, delay),
     66         MakeRefCounted<Sequence>());
     67   }
     68 
     69   bool RunsTasksInCurrentSequence() const override {
     70     return GetCurrentWorkerPool() == worker_pool_;
     71   }
     72 
     73  private:
     74   ~SchedulerParallelTaskRunner() override = default;
     75 
     76   const TaskTraits traits_;
     77   SchedulerWorkerPool* const worker_pool_;
     78 
     79   DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
     80 };
     81 
     82 // A task runner that runs tasks in sequence.
     83 class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
     84  public:
     85   // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks
     86   // so long as |worker_pool| is alive.
     87   // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
     88   SchedulerSequencedTaskRunner(const TaskTraits& traits,
     89                                SchedulerWorkerPool* worker_pool)
     90       : traits_(traits), worker_pool_(worker_pool) {
     91     DCHECK(worker_pool_);
     92   }
     93 
     94   // SequencedTaskRunner:
     95   bool PostDelayedTask(const Location& from_here,
     96                        OnceClosure closure,
     97                        TimeDelta delay) override {
     98     if (!g_active_pools_count)
     99       return false;
    100 
    101     Task task(from_here, std::move(closure), traits_, delay);
    102     task.sequenced_task_runner_ref = this;
    103 
    104     // Post the task as part of |sequence_|.
    105     return worker_pool_->PostTaskWithSequence(std::move(task), sequence_);
    106   }
    107 
    108   bool PostNonNestableDelayedTask(const Location& from_here,
    109                                   OnceClosure closure,
    110                                   base::TimeDelta delay) override {
    111     // Tasks are never nested within the task scheduler.
    112     return PostDelayedTask(from_here, std::move(closure), delay);
    113   }
    114 
    115   bool RunsTasksInCurrentSequence() const override {
    116     return sequence_->token() == SequenceToken::GetForCurrentThread();
    117   }
    118 
    119  private:
    120   ~SchedulerSequencedTaskRunner() override = default;
    121 
    122   // Sequence for all Tasks posted through this TaskRunner.
    123   const scoped_refptr<Sequence> sequence_ = MakeRefCounted<Sequence>();
    124 
    125   const TaskTraits traits_;
    126   SchedulerWorkerPool* const worker_pool_;
    127 
    128   DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
    129 };
    130 
    131 scoped_refptr<TaskRunner> SchedulerWorkerPool::CreateTaskRunnerWithTraits(
    132     const TaskTraits& traits) {
    133   return MakeRefCounted<SchedulerParallelTaskRunner>(traits, this);
    134 }
    135 
    136 scoped_refptr<SequencedTaskRunner>
    137 SchedulerWorkerPool::CreateSequencedTaskRunnerWithTraits(
    138     const TaskTraits& traits) {
    139   return MakeRefCounted<SchedulerSequencedTaskRunner>(traits, this);
    140 }
    141 
    142 bool SchedulerWorkerPool::PostTaskWithSequence(
    143     Task task,
    144     scoped_refptr<Sequence> sequence) {
    145   DCHECK(task.task);
    146   DCHECK(sequence);
    147 
    148   if (!task_tracker_->WillPostTask(&task))
    149     return false;
    150 
    151   if (task.delayed_run_time.is_null()) {
    152     PostTaskWithSequenceNow(std::move(task), std::move(sequence));
    153   } else {
    154     // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
    155     // for details.
    156     CHECK(task.task);
    157     delayed_task_manager_->AddDelayedTask(
    158         std::move(task), BindOnce(
    159                              [](scoped_refptr<Sequence> sequence,
    160                                 SchedulerWorkerPool* worker_pool, Task task) {
    161                                worker_pool->PostTaskWithSequenceNow(
    162                                    std::move(task), std::move(sequence));
    163                              },
    164                              std::move(sequence), Unretained(this)));
    165   }
    166 
    167   return true;
    168 }
    169 
    170 SchedulerWorkerPool::SchedulerWorkerPool(
    171     TrackedRef<TaskTracker> task_tracker,
    172     DelayedTaskManager* delayed_task_manager)
    173     : task_tracker_(std::move(task_tracker)),
    174       delayed_task_manager_(delayed_task_manager) {
    175   DCHECK(task_tracker_);
    176   DCHECK(delayed_task_manager_);
    177   ++g_active_pools_count;
    178 }
    179 
    180 SchedulerWorkerPool::~SchedulerWorkerPool() {
    181   --g_active_pools_count;
    182   DCHECK_GE(g_active_pools_count, 0);
    183 }
    184 
    185 void SchedulerWorkerPool::BindToCurrentThread() {
    186   DCHECK(!GetCurrentWorkerPool());
    187   tls_current_worker_pool.Get().Set(this);
    188 }
    189 
    190 void SchedulerWorkerPool::UnbindFromCurrentThread() {
    191   DCHECK(GetCurrentWorkerPool());
    192   tls_current_worker_pool.Get().Set(nullptr);
    193 }
    194 
    195 void SchedulerWorkerPool::PostTaskWithSequenceNow(
    196     Task task,
    197     scoped_refptr<Sequence> sequence) {
    198   DCHECK(task.task);
    199   DCHECK(sequence);
    200 
    201   // Confirm that |task| is ready to run (its delayed run time is either null or
    202   // in the past).
    203   DCHECK_LE(task.delayed_run_time, TimeTicks::Now());
    204 
    205   const bool sequence_was_empty = sequence->PushTask(std::move(task));
    206   if (sequence_was_empty) {
    207     // Try to schedule |sequence| if it was empty before |task| was inserted
    208     // into it. Otherwise, one of these must be true:
    209     // - |sequence| is already scheduled, or,
    210     // - The pool is running a Task from |sequence|. The pool is expected to
    211     //   reschedule |sequence| once it's done running the Task.
    212     sequence = task_tracker_->WillScheduleSequence(std::move(sequence), this);
    213     if (sequence)
    214       OnCanScheduleSequence(std::move(sequence));
    215   }
    216 }
    217 
    218 }  // namespace internal
    219 }  // namespace base
    220