Home | History | Annotate | Download | only in sequence_manager
      1 // Copyright 2015 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "base/task/sequence_manager/task_queue_impl.h"
      6 
      7 #include <memory>
      8 #include <utility>
      9 
     10 #include "base/strings/stringprintf.h"
     11 #include "base/task/sequence_manager/sequence_manager_impl.h"
     12 #include "base/task/sequence_manager/time_domain.h"
     13 #include "base/task/sequence_manager/work_queue.h"
     14 #include "base/time/time.h"
     15 #include "base/trace_event/blame_context.h"
     16 
     17 namespace base {
     18 namespace sequence_manager {
     19 
     20 // static
     21 const char* TaskQueue::PriorityToString(TaskQueue::QueuePriority priority) {
     22   switch (priority) {
     23     case kControlPriority:
     24       return "control";
     25     case kHighestPriority:
     26       return "highest";
     27     case kHighPriority:
     28       return "high";
     29     case kNormalPriority:
     30       return "normal";
     31     case kLowPriority:
     32       return "low";
     33     case kBestEffortPriority:
     34       return "best_effort";
     35     default:
     36       NOTREACHED();
     37       return nullptr;
     38   }
     39 }
     40 
     41 namespace internal {
     42 
     43 TaskQueueImpl::TaskQueueImpl(SequenceManagerImpl* sequence_manager,
     44                              TimeDomain* time_domain,
     45                              const TaskQueue::Spec& spec)
     46     : name_(spec.name),
     47       thread_id_(PlatformThread::CurrentId()),
     48       any_thread_(sequence_manager, time_domain),
     49       main_thread_only_(sequence_manager, this, time_domain),
     50       should_monitor_quiescence_(spec.should_monitor_quiescence),
     51       should_notify_observers_(spec.should_notify_observers) {
     52   DCHECK(time_domain);
     53 }
     54 
     55 TaskQueueImpl::~TaskQueueImpl() {
     56 #if DCHECK_IS_ON()
     57   AutoLock lock(any_thread_lock_);
     58   // NOTE this check shouldn't fire because |SequenceManagerImpl::queues_|
     59   // contains a strong reference to this TaskQueueImpl and the
     60   // SequenceManagerImpl destructor calls UnregisterTaskQueue on all task
     61   // queues.
     62   DCHECK(!any_thread().sequence_manager)
     63       << "UnregisterTaskQueue must be called first!";
     64 #endif
     65 }
     66 
     67 TaskQueueImpl::PostTaskResult::PostTaskResult()
     68     : success(false), task(OnceClosure(), Location()) {}
     69 
     70 TaskQueueImpl::PostTaskResult::PostTaskResult(bool success,
     71                                               TaskQueue::PostedTask task)
     72     : success(success), task(std::move(task)) {}
     73 
     74 TaskQueueImpl::PostTaskResult::PostTaskResult(PostTaskResult&& move_from)
     75     : success(move_from.success), task(std::move(move_from.task)) {}
     76 
     77 TaskQueueImpl::PostTaskResult::~PostTaskResult() = default;
     78 
     79 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostTaskResult::Success() {
     80   return PostTaskResult(true, TaskQueue::PostedTask(OnceClosure(), Location()));
     81 }
     82 
     83 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostTaskResult::Fail(
     84     TaskQueue::PostedTask task) {
     85   return PostTaskResult(false, std::move(task));
     86 }
     87 
     88 TaskQueueImpl::Task::Task(TaskQueue::PostedTask task,
     89                           TimeTicks desired_run_time,
     90                           EnqueueOrder sequence_number)
     91     : TaskQueue::Task(std::move(task), desired_run_time) {
     92   // It might wrap around to a negative number but it's handled properly.
     93   sequence_num = static_cast<int>(sequence_number);
     94 }
     95 
     96 TaskQueueImpl::Task::Task(TaskQueue::PostedTask task,
     97                           TimeTicks desired_run_time,
     98                           EnqueueOrder sequence_number,
     99                           EnqueueOrder enqueue_order)
    100     : TaskQueue::Task(std::move(task), desired_run_time),
    101       enqueue_order_(enqueue_order) {
    102   // It might wrap around to a negative number but it's handled properly.
    103   sequence_num = static_cast<int>(sequence_number);
    104 }
    105 
    106 TaskQueueImpl::AnyThread::AnyThread(SequenceManagerImpl* sequence_manager,
    107                                     TimeDomain* time_domain)
    108     : sequence_manager(sequence_manager), time_domain(time_domain) {}
    109 
    110 TaskQueueImpl::AnyThread::~AnyThread() = default;
    111 
    112 TaskQueueImpl::MainThreadOnly::MainThreadOnly(
    113     SequenceManagerImpl* sequence_manager,
    114     TaskQueueImpl* task_queue,
    115     TimeDomain* time_domain)
    116     : sequence_manager(sequence_manager),
    117       time_domain(time_domain),
    118       delayed_work_queue(
    119           new WorkQueue(task_queue, "delayed", WorkQueue::QueueType::kDelayed)),
    120       immediate_work_queue(new WorkQueue(task_queue,
    121                                          "immediate",
    122                                          WorkQueue::QueueType::kImmediate)),
    123       set_index(0),
    124       is_enabled_refcount(0),
    125       voter_refcount(0),
    126       blame_context(nullptr),
    127       is_enabled_for_test(true) {}
    128 
    129 TaskQueueImpl::MainThreadOnly::~MainThreadOnly() = default;
    130 
    131 void TaskQueueImpl::UnregisterTaskQueue() {
    132   TaskDeque immediate_incoming_queue;
    133 
    134   {
    135     AutoLock lock(any_thread_lock_);
    136     AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_);
    137 
    138     if (main_thread_only().time_domain)
    139       main_thread_only().time_domain->UnregisterQueue(this);
    140 
    141     if (!any_thread().sequence_manager)
    142       return;
    143 
    144     main_thread_only().on_task_completed_handler = OnTaskCompletedHandler();
    145     any_thread().time_domain = nullptr;
    146     main_thread_only().time_domain = nullptr;
    147 
    148     any_thread().sequence_manager = nullptr;
    149     main_thread_only().sequence_manager = nullptr;
    150     any_thread().on_next_wake_up_changed_callback =
    151         OnNextWakeUpChangedCallback();
    152     main_thread_only().on_next_wake_up_changed_callback =
    153         OnNextWakeUpChangedCallback();
    154     immediate_incoming_queue.swap(immediate_incoming_queue_);
    155   }
    156 
    157   // It is possible for a task to hold a scoped_refptr to this, which
    158   // will lead to TaskQueueImpl destructor being called when deleting a task.
    159   // To avoid use-after-free, we need to clear all fields of a task queue
    160   // before starting to delete the tasks.
    161   // All work queues and priority queues containing tasks should be moved to
    162   // local variables on stack (std::move for unique_ptrs and swap for queues)
    163   // before clearing them and deleting tasks.
    164 
    165   // Flush the queues outside of the lock because TSAN complains about a lock
    166   // order inversion for tasks that are posted from within a lock, with a
    167   // destructor that acquires the same lock.
    168 
    169   std::priority_queue<Task> delayed_incoming_queue;
    170   delayed_incoming_queue.swap(main_thread_only().delayed_incoming_queue);
    171 
    172   std::unique_ptr<WorkQueue> immediate_work_queue =
    173       std::move(main_thread_only().immediate_work_queue);
    174   std::unique_ptr<WorkQueue> delayed_work_queue =
    175       std::move(main_thread_only().delayed_work_queue);
    176 }
    177 
    178 const char* TaskQueueImpl::GetName() const {
    179   return name_;
    180 }
    181 
    182 bool TaskQueueImpl::RunsTasksInCurrentSequence() const {
    183   return PlatformThread::CurrentId() == thread_id_;
    184 }
    185 
    186 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostDelayedTask(
    187     TaskQueue::PostedTask task) {
    188   if (task.delay.is_zero())
    189     return PostImmediateTaskImpl(std::move(task));
    190 
    191   return PostDelayedTaskImpl(std::move(task));
    192 }
    193 
    194 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostImmediateTaskImpl(
    195     TaskQueue::PostedTask task) {
    196   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
    197   // for details.
    198   CHECK(task.callback);
    199   AutoLock lock(any_thread_lock_);
    200   if (!any_thread().sequence_manager)
    201     return PostTaskResult::Fail(std::move(task));
    202 
    203   EnqueueOrder sequence_number =
    204       any_thread().sequence_manager->GetNextSequenceNumber();
    205 
    206   PushOntoImmediateIncomingQueueLocked(Task(std::move(task),
    207                                             any_thread().time_domain->Now(),
    208                                             sequence_number, sequence_number));
    209   return PostTaskResult::Success();
    210 }
    211 
    212 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostDelayedTaskImpl(
    213     TaskQueue::PostedTask task) {
    214   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
    215   // for details.
    216   CHECK(task.callback);
    217   DCHECK_GT(task.delay, TimeDelta());
    218   if (PlatformThread::CurrentId() == thread_id_) {
    219     // Lock-free fast path for delayed tasks posted from the main thread.
    220     if (!main_thread_only().sequence_manager)
    221       return PostTaskResult::Fail(std::move(task));
    222 
    223     EnqueueOrder sequence_number =
    224         main_thread_only().sequence_manager->GetNextSequenceNumber();
    225 
    226     TimeTicks time_domain_now = main_thread_only().time_domain->Now();
    227     TimeTicks time_domain_delayed_run_time = time_domain_now + task.delay;
    228     PushOntoDelayedIncomingQueueFromMainThread(
    229         Task(std::move(task), time_domain_delayed_run_time, sequence_number),
    230         time_domain_now);
    231   } else {
    232     // NOTE posting a delayed task from a different thread is not expected to
    233     // be common. This pathway is less optimal than perhaps it could be
    234     // because it causes two main thread tasks to be run.  Should this
    235     // assumption prove to be false in future, we may need to revisit this.
    236     AutoLock lock(any_thread_lock_);
    237     if (!any_thread().sequence_manager)
    238       return PostTaskResult::Fail(std::move(task));
    239 
    240     EnqueueOrder sequence_number =
    241         any_thread().sequence_manager->GetNextSequenceNumber();
    242 
    243     TimeTicks time_domain_now = any_thread().time_domain->Now();
    244     TimeTicks time_domain_delayed_run_time = time_domain_now + task.delay;
    245     PushOntoDelayedIncomingQueueLocked(
    246         Task(std::move(task), time_domain_delayed_run_time, sequence_number));
    247   }
    248   return PostTaskResult::Success();
    249 }
    250 
    251 void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread(
    252     Task pending_task,
    253     TimeTicks now) {
    254   main_thread_only().sequence_manager->WillQueueTask(&pending_task);
    255   main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
    256 
    257   LazyNow lazy_now(now);
    258   UpdateDelayedWakeUp(&lazy_now);
    259 
    260   TraceQueueSize();
    261 }
    262 
    263 void TaskQueueImpl::PushOntoDelayedIncomingQueueLocked(Task pending_task) {
    264   any_thread().sequence_manager->WillQueueTask(&pending_task);
    265 
    266   EnqueueOrder thread_hop_task_sequence_number =
    267       any_thread().sequence_manager->GetNextSequenceNumber();
    268   // TODO(altimin): Add a copy method to Task to capture metadata here.
    269   PushOntoImmediateIncomingQueueLocked(Task(
    270       TaskQueue::PostedTask(BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask,
    271                                      Unretained(this), std::move(pending_task)),
    272                             FROM_HERE, TimeDelta(), Nestable::kNonNestable,
    273                             pending_task.task_type()),
    274       TimeTicks(), thread_hop_task_sequence_number,
    275       thread_hop_task_sequence_number));
    276 }
    277 
    278 void TaskQueueImpl::ScheduleDelayedWorkTask(Task pending_task) {
    279   DCHECK(main_thread_checker_.CalledOnValidThread());
    280   TimeTicks delayed_run_time = pending_task.delayed_run_time;
    281   TimeTicks time_domain_now = main_thread_only().time_domain->Now();
    282   if (delayed_run_time <= time_domain_now) {
    283     // If |delayed_run_time| is in the past then push it onto the work queue
    284     // immediately. To ensure the right task ordering we need to temporarily
    285     // push it onto the |delayed_incoming_queue|.
    286     delayed_run_time = time_domain_now;
    287     pending_task.delayed_run_time = time_domain_now;
    288     main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
    289     LazyNow lazy_now(time_domain_now);
    290     WakeUpForDelayedWork(&lazy_now);
    291   } else {
    292     // If |delayed_run_time| is in the future we can queue it as normal.
    293     PushOntoDelayedIncomingQueueFromMainThread(std::move(pending_task),
    294                                                time_domain_now);
    295   }
    296   TraceQueueSize();
    297 }
    298 
    299 void TaskQueueImpl::PushOntoImmediateIncomingQueueLocked(Task task) {
    300   // If the |immediate_incoming_queue| is empty we need a DoWork posted to make
    301   // it run.
    302   bool was_immediate_incoming_queue_empty;
    303 
    304   EnqueueOrder sequence_number = task.enqueue_order();
    305   TimeTicks desired_run_time = task.delayed_run_time;
    306 
    307   {
    308     AutoLock lock(immediate_incoming_queue_lock_);
    309     was_immediate_incoming_queue_empty = immediate_incoming_queue().empty();
    310     any_thread().sequence_manager->WillQueueTask(&task);
    311     immediate_incoming_queue().push_back(std::move(task));
    312   }
    313 
    314   if (was_immediate_incoming_queue_empty) {
    315     // However there's no point posting a DoWork for a blocked queue. NB we can
    316     // only tell if it's disabled from the main thread.
    317     bool queue_is_blocked =
    318         RunsTasksInCurrentSequence() &&
    319         (!IsQueueEnabled() || main_thread_only().current_fence);
    320     any_thread().sequence_manager->OnQueueHasIncomingImmediateWork(
    321         this, sequence_number, queue_is_blocked);
    322     if (!any_thread().on_next_wake_up_changed_callback.is_null())
    323       any_thread().on_next_wake_up_changed_callback.Run(desired_run_time);
    324   }
    325 
    326   TraceQueueSize();
    327 }
    328 
    329 void TaskQueueImpl::ReloadImmediateWorkQueueIfEmpty() {
    330   if (!main_thread_only().immediate_work_queue->Empty())
    331     return;
    332 
    333   main_thread_only().immediate_work_queue->ReloadEmptyImmediateQueue();
    334 }
    335 
    336 void TaskQueueImpl::ReloadEmptyImmediateQueue(TaskDeque* queue) {
    337   DCHECK(queue->empty());
    338 
    339   AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_);
    340   queue->swap(immediate_incoming_queue());
    341 
    342   // Activate delayed fence if necessary. This is ideologically similar to
    343   // ActivateDelayedFenceIfNeeded, but due to immediate tasks being posted
    344   // from any thread we can't generate an enqueue order for the fence there,
    345   // so we have to check all immediate tasks and use their enqueue order for
    346   // a fence.
    347   if (main_thread_only().delayed_fence) {
    348     for (const Task& task : *queue) {
    349       if (task.delayed_run_time >= main_thread_only().delayed_fence.value()) {
    350         main_thread_only().delayed_fence = nullopt;
    351         DCHECK(!main_thread_only().current_fence);
    352         main_thread_only().current_fence = task.enqueue_order();
    353         // Do not trigger WorkQueueSets notification when taking incoming
    354         // immediate queue.
    355         main_thread_only().immediate_work_queue->InsertFenceSilently(
    356             main_thread_only().current_fence);
    357         main_thread_only().delayed_work_queue->InsertFenceSilently(
    358             main_thread_only().current_fence);
    359         break;
    360       }
    361     }
    362   }
    363 }
    364 
    365 bool TaskQueueImpl::IsEmpty() const {
    366   if (!main_thread_only().delayed_work_queue->Empty() ||
    367       !main_thread_only().delayed_incoming_queue.empty() ||
    368       !main_thread_only().immediate_work_queue->Empty()) {
    369     return false;
    370   }
    371 
    372   AutoLock lock(immediate_incoming_queue_lock_);
    373   return immediate_incoming_queue().empty();
    374 }
    375 
    376 size_t TaskQueueImpl::GetNumberOfPendingTasks() const {
    377   size_t task_count = 0;
    378   task_count += main_thread_only().delayed_work_queue->Size();
    379   task_count += main_thread_only().delayed_incoming_queue.size();
    380   task_count += main_thread_only().immediate_work_queue->Size();
    381 
    382   AutoLock lock(immediate_incoming_queue_lock_);
    383   task_count += immediate_incoming_queue().size();
    384   return task_count;
    385 }
    386 
    387 bool TaskQueueImpl::HasTaskToRunImmediately() const {
    388   // Any work queue tasks count as immediate work.
    389   if (!main_thread_only().delayed_work_queue->Empty() ||
    390       !main_thread_only().immediate_work_queue->Empty()) {
    391     return true;
    392   }
    393 
    394   // Tasks on |delayed_incoming_queue| that could run now, count as
    395   // immediate work.
    396   if (!main_thread_only().delayed_incoming_queue.empty() &&
    397       main_thread_only().delayed_incoming_queue.top().delayed_run_time <=
    398           main_thread_only().time_domain->CreateLazyNow().Now()) {
    399     return true;
    400   }
    401 
    402   // Finally tasks on |immediate_incoming_queue| count as immediate work.
    403   AutoLock lock(immediate_incoming_queue_lock_);
    404   return !immediate_incoming_queue().empty();
    405 }
    406 
    407 Optional<TaskQueueImpl::DelayedWakeUp>
    408 TaskQueueImpl::GetNextScheduledWakeUpImpl() {
    409   // Note we don't scheduled a wake-up for disabled queues.
    410   if (main_thread_only().delayed_incoming_queue.empty() || !IsQueueEnabled())
    411     return nullopt;
    412 
    413   return main_thread_only().delayed_incoming_queue.top().delayed_wake_up();
    414 }
    415 
    416 Optional<TimeTicks> TaskQueueImpl::GetNextScheduledWakeUp() {
    417   Optional<DelayedWakeUp> wake_up = GetNextScheduledWakeUpImpl();
    418   if (!wake_up)
    419     return nullopt;
    420   return wake_up->time;
    421 }
    422 
    423 void TaskQueueImpl::WakeUpForDelayedWork(LazyNow* lazy_now) {
    424   // Enqueue all delayed tasks that should be running now, skipping any that
    425   // have been canceled.
    426   while (!main_thread_only().delayed_incoming_queue.empty()) {
    427     Task& task =
    428         const_cast<Task&>(main_thread_only().delayed_incoming_queue.top());
    429     if (!task.task || task.task.IsCancelled()) {
    430       main_thread_only().delayed_incoming_queue.pop();
    431       continue;
    432     }
    433     if (task.delayed_run_time > lazy_now->Now())
    434       break;
    435     ActivateDelayedFenceIfNeeded(task.delayed_run_time);
    436     task.set_enqueue_order(
    437         main_thread_only().sequence_manager->GetNextSequenceNumber());
    438     main_thread_only().delayed_work_queue->Push(std::move(task));
    439     main_thread_only().delayed_incoming_queue.pop();
    440 
    441     // Normally WakeUpForDelayedWork is called inside DoWork, but it also
    442     // can be called elsewhere (e.g. tests and fast-path for posting
    443     // delayed tasks). Ensure that there is a DoWork posting. No-op inside
    444     // existing DoWork due to DoWork deduplication.
    445     if (IsQueueEnabled() || !main_thread_only().current_fence) {
    446       main_thread_only().sequence_manager->MaybeScheduleImmediateWork(
    447           FROM_HERE);
    448     }
    449   }
    450 
    451   UpdateDelayedWakeUp(lazy_now);
    452 }
    453 
    454 void TaskQueueImpl::TraceQueueSize() const {
    455   bool is_tracing;
    456   TRACE_EVENT_CATEGORY_GROUP_ENABLED(
    457       TRACE_DISABLED_BY_DEFAULT("sequence_manager"), &is_tracing);
    458   if (!is_tracing)
    459     return;
    460 
    461   // It's only safe to access the work queues from the main thread.
    462   // TODO(alexclarke): We should find another way of tracing this
    463   if (PlatformThread::CurrentId() != thread_id_)
    464     return;
    465 
    466   AutoLock lock(immediate_incoming_queue_lock_);
    467   TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("sequence_manager"), GetName(),
    468                  immediate_incoming_queue().size() +
    469                      main_thread_only().immediate_work_queue->Size() +
    470                      main_thread_only().delayed_work_queue->Size() +
    471                      main_thread_only().delayed_incoming_queue.size());
    472 }
    473 
    474 void TaskQueueImpl::SetQueuePriority(TaskQueue::QueuePriority priority) {
    475   if (!main_thread_only().sequence_manager || priority == GetQueuePriority())
    476     return;
    477   main_thread_only()
    478       .sequence_manager->main_thread_only()
    479       .selector.SetQueuePriority(this, priority);
    480 }
    481 
    482 TaskQueue::QueuePriority TaskQueueImpl::GetQueuePriority() const {
    483   size_t set_index = immediate_work_queue()->work_queue_set_index();
    484   DCHECK_EQ(set_index, delayed_work_queue()->work_queue_set_index());
    485   return static_cast<TaskQueue::QueuePriority>(set_index);
    486 }
    487 
    488 void TaskQueueImpl::AsValueInto(TimeTicks now,
    489                                 trace_event::TracedValue* state) const {
    490   AutoLock lock(any_thread_lock_);
    491   AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_);
    492   state->BeginDictionary();
    493   state->SetString("name", GetName());
    494   if (!main_thread_only().sequence_manager) {
    495     state->SetBoolean("unregistered", true);
    496     state->EndDictionary();
    497     return;
    498   }
    499   DCHECK(main_thread_only().time_domain);
    500   DCHECK(main_thread_only().delayed_work_queue);
    501   DCHECK(main_thread_only().immediate_work_queue);
    502 
    503   state->SetString(
    504       "task_queue_id",
    505       StringPrintf("0x%" PRIx64,
    506                    static_cast<uint64_t>(reinterpret_cast<uintptr_t>(this))));
    507   state->SetBoolean("enabled", IsQueueEnabled());
    508   state->SetString("time_domain_name",
    509                    main_thread_only().time_domain->GetName());
    510   state->SetInteger("immediate_incoming_queue_size",
    511                     immediate_incoming_queue().size());
    512   state->SetInteger("delayed_incoming_queue_size",
    513                     main_thread_only().delayed_incoming_queue.size());
    514   state->SetInteger("immediate_work_queue_size",
    515                     main_thread_only().immediate_work_queue->Size());
    516   state->SetInteger("delayed_work_queue_size",
    517                     main_thread_only().delayed_work_queue->Size());
    518 
    519   if (!main_thread_only().delayed_incoming_queue.empty()) {
    520     TimeDelta delay_to_next_task =
    521         (main_thread_only().delayed_incoming_queue.top().delayed_run_time -
    522          main_thread_only().time_domain->CreateLazyNow().Now());
    523     state->SetDouble("delay_to_next_task_ms",
    524                      delay_to_next_task.InMillisecondsF());
    525   }
    526   if (main_thread_only().current_fence)
    527     state->SetInteger("current_fence", main_thread_only().current_fence);
    528   if (main_thread_only().delayed_fence) {
    529     state->SetDouble(
    530         "delayed_fence_seconds_from_now",
    531         (main_thread_only().delayed_fence.value() - now).InSecondsF());
    532   }
    533 
    534   bool verbose = false;
    535   TRACE_EVENT_CATEGORY_GROUP_ENABLED(
    536       TRACE_DISABLED_BY_DEFAULT("sequence_manager.verbose_snapshots"),
    537       &verbose);
    538 
    539   if (verbose) {
    540     state->BeginArray("immediate_incoming_queue");
    541     QueueAsValueInto(immediate_incoming_queue(), now, state);
    542     state->EndArray();
    543     state->BeginArray("delayed_work_queue");
    544     main_thread_only().delayed_work_queue->AsValueInto(now, state);
    545     state->EndArray();
    546     state->BeginArray("immediate_work_queue");
    547     main_thread_only().immediate_work_queue->AsValueInto(now, state);
    548     state->EndArray();
    549     state->BeginArray("delayed_incoming_queue");
    550     QueueAsValueInto(main_thread_only().delayed_incoming_queue, now, state);
    551     state->EndArray();
    552   }
    553   state->SetString("priority", TaskQueue::PriorityToString(GetQueuePriority()));
    554   state->EndDictionary();
    555 }
    556 
    557 void TaskQueueImpl::AddTaskObserver(MessageLoop::TaskObserver* task_observer) {
    558   main_thread_only().task_observers.AddObserver(task_observer);
    559 }
    560 
    561 void TaskQueueImpl::RemoveTaskObserver(
    562     MessageLoop::TaskObserver* task_observer) {
    563   main_thread_only().task_observers.RemoveObserver(task_observer);
    564 }
    565 
    566 void TaskQueueImpl::NotifyWillProcessTask(const PendingTask& pending_task) {
    567   DCHECK(should_notify_observers_);
    568   if (main_thread_only().blame_context)
    569     main_thread_only().blame_context->Enter();
    570   for (auto& observer : main_thread_only().task_observers)
    571     observer.WillProcessTask(pending_task);
    572 }
    573 
    574 void TaskQueueImpl::NotifyDidProcessTask(const PendingTask& pending_task) {
    575   DCHECK(should_notify_observers_);
    576   for (auto& observer : main_thread_only().task_observers)
    577     observer.DidProcessTask(pending_task);
    578   if (main_thread_only().blame_context)
    579     main_thread_only().blame_context->Leave();
    580 }
    581 
    582 void TaskQueueImpl::SetTimeDomain(TimeDomain* time_domain) {
    583   {
    584     AutoLock lock(any_thread_lock_);
    585     DCHECK(time_domain);
    586     // NOTE this is similar to checking |any_thread().sequence_manager| but
    587     // the TaskQueueSelectorTests constructs TaskQueueImpl directly with a null
    588     // sequence_manager.  Instead we check |any_thread().time_domain| which is
    589     // another way of asserting that UnregisterTaskQueue has not been called.
    590     DCHECK(any_thread().time_domain);
    591     if (!any_thread().time_domain)
    592       return;
    593     DCHECK(main_thread_checker_.CalledOnValidThread());
    594     if (time_domain == main_thread_only().time_domain)
    595       return;
    596 
    597     any_thread().time_domain = time_domain;
    598   }
    599 
    600   main_thread_only().time_domain->UnregisterQueue(this);
    601   main_thread_only().time_domain = time_domain;
    602 
    603   LazyNow lazy_now = time_domain->CreateLazyNow();
    604   // Clear scheduled wake up to ensure that new notifications are issued
    605   // correctly.
    606   // TODO(altimin): Remove this when we won't have to support changing time
    607   // domains.
    608   main_thread_only().scheduled_wake_up = nullopt;
    609   UpdateDelayedWakeUp(&lazy_now);
    610 }
    611 
    612 TimeDomain* TaskQueueImpl::GetTimeDomain() const {
    613   if (PlatformThread::CurrentId() == thread_id_)
    614     return main_thread_only().time_domain;
    615 
    616   AutoLock lock(any_thread_lock_);
    617   return any_thread().time_domain;
    618 }
    619 
    620 void TaskQueueImpl::SetBlameContext(trace_event::BlameContext* blame_context) {
    621   main_thread_only().blame_context = blame_context;
    622 }
    623 
    624 void TaskQueueImpl::InsertFence(TaskQueue::InsertFencePosition position) {
    625   if (!main_thread_only().sequence_manager)
    626     return;
    627 
    628   // Only one fence may be present at a time.
    629   main_thread_only().delayed_fence = nullopt;
    630 
    631   EnqueueOrder previous_fence = main_thread_only().current_fence;
    632   EnqueueOrder current_fence =
    633       position == TaskQueue::InsertFencePosition::kNow
    634           ? main_thread_only().sequence_manager->GetNextSequenceNumber()
    635           : EnqueueOrder::blocking_fence();
    636 
    637   // Tasks posted after this point will have a strictly higher enqueue order
    638   // and will be blocked from running.
    639   main_thread_only().current_fence = current_fence;
    640   bool task_unblocked =
    641       main_thread_only().immediate_work_queue->InsertFence(current_fence);
    642   task_unblocked |=
    643       main_thread_only().delayed_work_queue->InsertFence(current_fence);
    644 
    645   if (!task_unblocked && previous_fence && previous_fence < current_fence) {
    646     AutoLock lock(immediate_incoming_queue_lock_);
    647     if (!immediate_incoming_queue().empty() &&
    648         immediate_incoming_queue().front().enqueue_order() > previous_fence &&
    649         immediate_incoming_queue().front().enqueue_order() < current_fence) {
    650       task_unblocked = true;
    651     }
    652   }
    653 
    654   if (IsQueueEnabled() && task_unblocked) {
    655     main_thread_only().sequence_manager->MaybeScheduleImmediateWork(FROM_HERE);
    656   }
    657 }
    658 
    659 void TaskQueueImpl::InsertFenceAt(TimeTicks time) {
    660   // Task queue can have only one fence, delayed or not.
    661   RemoveFence();
    662   main_thread_only().delayed_fence = time;
    663 }
    664 
    665 void TaskQueueImpl::RemoveFence() {
    666   if (!main_thread_only().sequence_manager)
    667     return;
    668 
    669   EnqueueOrder previous_fence = main_thread_only().current_fence;
    670   main_thread_only().current_fence = EnqueueOrder::none();
    671   main_thread_only().delayed_fence = nullopt;
    672 
    673   bool task_unblocked = main_thread_only().immediate_work_queue->RemoveFence();
    674   task_unblocked |= main_thread_only().delayed_work_queue->RemoveFence();
    675 
    676   if (!task_unblocked && previous_fence) {
    677     AutoLock lock(immediate_incoming_queue_lock_);
    678     if (!immediate_incoming_queue().empty() &&
    679         immediate_incoming_queue().front().enqueue_order() > previous_fence) {
    680       task_unblocked = true;
    681     }
    682   }
    683 
    684   if (IsQueueEnabled() && task_unblocked) {
    685     main_thread_only().sequence_manager->MaybeScheduleImmediateWork(FROM_HERE);
    686   }
    687 }
    688 
    689 bool TaskQueueImpl::BlockedByFence() const {
    690   if (!main_thread_only().current_fence)
    691     return false;
    692 
    693   if (!main_thread_only().immediate_work_queue->BlockedByFence() ||
    694       !main_thread_only().delayed_work_queue->BlockedByFence()) {
    695     return false;
    696   }
    697 
    698   AutoLock lock(immediate_incoming_queue_lock_);
    699   if (immediate_incoming_queue().empty())
    700     return true;
    701 
    702   return immediate_incoming_queue().front().enqueue_order() >
    703          main_thread_only().current_fence;
    704 }
    705 
    706 bool TaskQueueImpl::HasActiveFence() {
    707   if (main_thread_only().delayed_fence &&
    708       main_thread_only().time_domain->Now() >
    709           main_thread_only().delayed_fence.value()) {
    710     return true;
    711   }
    712   return !!main_thread_only().current_fence;
    713 }
    714 
    715 bool TaskQueueImpl::CouldTaskRun(EnqueueOrder enqueue_order) const {
    716   if (!IsQueueEnabled())
    717     return false;
    718 
    719   if (!main_thread_only().current_fence)
    720     return true;
    721 
    722   return enqueue_order < main_thread_only().current_fence;
    723 }
    724 
    725 // static
    726 void TaskQueueImpl::QueueAsValueInto(const TaskDeque& queue,
    727                                      TimeTicks now,
    728                                      trace_event::TracedValue* state) {
    729   for (const Task& task : queue) {
    730     TaskAsValueInto(task, now, state);
    731   }
    732 }
    733 
    734 // static
    735 void TaskQueueImpl::QueueAsValueInto(const std::priority_queue<Task>& queue,
    736                                      TimeTicks now,
    737                                      trace_event::TracedValue* state) {
    738   // Remove const to search |queue| in the destructive manner. Restore the
    739   // content from |visited| later.
    740   std::priority_queue<Task>* mutable_queue =
    741       const_cast<std::priority_queue<Task>*>(&queue);
    742   std::priority_queue<Task> visited;
    743   while (!mutable_queue->empty()) {
    744     TaskAsValueInto(mutable_queue->top(), now, state);
    745     visited.push(std::move(const_cast<Task&>(mutable_queue->top())));
    746     mutable_queue->pop();
    747   }
    748   *mutable_queue = std::move(visited);
    749 }
    750 
    751 // static
    752 void TaskQueueImpl::TaskAsValueInto(const Task& task,
    753                                     TimeTicks now,
    754                                     trace_event::TracedValue* state) {
    755   state->BeginDictionary();
    756   state->SetString("posted_from", task.posted_from.ToString());
    757   if (task.enqueue_order_set())
    758     state->SetInteger("enqueue_order", task.enqueue_order());
    759   state->SetInteger("sequence_num", task.sequence_num);
    760   state->SetBoolean("nestable", task.nestable == Nestable::kNestable);
    761   state->SetBoolean("is_high_res", task.is_high_res);
    762   state->SetBoolean("is_cancelled", task.task.IsCancelled());
    763   state->SetDouble("delayed_run_time",
    764                    (task.delayed_run_time - TimeTicks()).InMillisecondsF());
    765   state->SetDouble("delayed_run_time_milliseconds_from_now",
    766                    (task.delayed_run_time - now).InMillisecondsF());
    767   state->EndDictionary();
    768 }
    769 
    770 TaskQueueImpl::QueueEnabledVoterImpl::QueueEnabledVoterImpl(
    771     scoped_refptr<TaskQueue> task_queue)
    772     : task_queue_(task_queue), enabled_(true) {}
    773 
    774 TaskQueueImpl::QueueEnabledVoterImpl::~QueueEnabledVoterImpl() {
    775   if (task_queue_->GetTaskQueueImpl())
    776     task_queue_->GetTaskQueueImpl()->RemoveQueueEnabledVoter(this);
    777 }
    778 
    779 void TaskQueueImpl::QueueEnabledVoterImpl::SetQueueEnabled(bool enabled) {
    780   if (enabled_ == enabled)
    781     return;
    782 
    783   task_queue_->GetTaskQueueImpl()->OnQueueEnabledVoteChanged(enabled);
    784   enabled_ = enabled;
    785 }
    786 
    787 void TaskQueueImpl::RemoveQueueEnabledVoter(
    788     const QueueEnabledVoterImpl* voter) {
    789   // Bail out if we're being called from TaskQueueImpl::UnregisterTaskQueue.
    790   if (!main_thread_only().time_domain)
    791     return;
    792 
    793   bool was_enabled = IsQueueEnabled();
    794   if (voter->enabled_) {
    795     main_thread_only().is_enabled_refcount--;
    796     DCHECK_GE(main_thread_only().is_enabled_refcount, 0);
    797   }
    798 
    799   main_thread_only().voter_refcount--;
    800   DCHECK_GE(main_thread_only().voter_refcount, 0);
    801 
    802   bool is_enabled = IsQueueEnabled();
    803   if (was_enabled != is_enabled)
    804     EnableOrDisableWithSelector(is_enabled);
    805 }
    806 
    807 bool TaskQueueImpl::IsQueueEnabled() const {
    808   // By default is_enabled_refcount and voter_refcount both equal zero.
    809   return (main_thread_only().is_enabled_refcount ==
    810           main_thread_only().voter_refcount) &&
    811          main_thread_only().is_enabled_for_test;
    812 }
    813 
    814 void TaskQueueImpl::OnQueueEnabledVoteChanged(bool enabled) {
    815   bool was_enabled = IsQueueEnabled();
    816   if (enabled) {
    817     main_thread_only().is_enabled_refcount++;
    818     DCHECK_LE(main_thread_only().is_enabled_refcount,
    819               main_thread_only().voter_refcount);
    820   } else {
    821     main_thread_only().is_enabled_refcount--;
    822     DCHECK_GE(main_thread_only().is_enabled_refcount, 0);
    823   }
    824 
    825   bool is_enabled = IsQueueEnabled();
    826   if (was_enabled != is_enabled)
    827     EnableOrDisableWithSelector(is_enabled);
    828 }
    829 
    830 void TaskQueueImpl::EnableOrDisableWithSelector(bool enable) {
    831   if (!main_thread_only().sequence_manager)
    832     return;
    833 
    834   LazyNow lazy_now = main_thread_only().time_domain->CreateLazyNow();
    835   UpdateDelayedWakeUp(&lazy_now);
    836 
    837   if (enable) {
    838     if (HasPendingImmediateWork() &&
    839         !main_thread_only().on_next_wake_up_changed_callback.is_null()) {
    840       // Delayed work notification will be issued via time domain.
    841       main_thread_only().on_next_wake_up_changed_callback.Run(TimeTicks());
    842     }
    843 
    844     // Note the selector calls SequenceManager::OnTaskQueueEnabled which posts
    845     // a DoWork if needed.
    846     main_thread_only()
    847         .sequence_manager->main_thread_only()
    848         .selector.EnableQueue(this);
    849   } else {
    850     main_thread_only()
    851         .sequence_manager->main_thread_only()
    852         .selector.DisableQueue(this);
    853   }
    854 }
    855 
    856 std::unique_ptr<TaskQueue::QueueEnabledVoter>
    857 TaskQueueImpl::CreateQueueEnabledVoter(scoped_refptr<TaskQueue> task_queue) {
    858   DCHECK_EQ(task_queue->GetTaskQueueImpl(), this);
    859   main_thread_only().voter_refcount++;
    860   main_thread_only().is_enabled_refcount++;
    861   return std::make_unique<QueueEnabledVoterImpl>(task_queue);
    862 }
    863 
    864 void TaskQueueImpl::SweepCanceledDelayedTasks(TimeTicks now) {
    865   if (main_thread_only().delayed_incoming_queue.empty())
    866     return;
    867 
    868   // Remove canceled tasks.
    869   std::priority_queue<Task> remaining_tasks;
    870   while (!main_thread_only().delayed_incoming_queue.empty()) {
    871     if (!main_thread_only().delayed_incoming_queue.top().task.IsCancelled()) {
    872       remaining_tasks.push(std::move(
    873           const_cast<Task&>(main_thread_only().delayed_incoming_queue.top())));
    874     }
    875     main_thread_only().delayed_incoming_queue.pop();
    876   }
    877 
    878   main_thread_only().delayed_incoming_queue = std::move(remaining_tasks);
    879 
    880   LazyNow lazy_now(now);
    881   UpdateDelayedWakeUp(&lazy_now);
    882 }
    883 
    884 void TaskQueueImpl::PushImmediateIncomingTaskForTest(
    885     TaskQueueImpl::Task&& task) {
    886   AutoLock lock(immediate_incoming_queue_lock_);
    887   immediate_incoming_queue().push_back(std::move(task));
    888 }
    889 
    890 void TaskQueueImpl::RequeueDeferredNonNestableTask(
    891     DeferredNonNestableTask task) {
    892   DCHECK(task.task.nestable == Nestable::kNonNestable);
    893   // The re-queued tasks have to be pushed onto the front because we'd otherwise
    894   // violate the strict monotonically increasing enqueue order within the
    895   // WorkQueue.  We can't assign them a new enqueue order here because that will
    896   // not behave correctly with fences and things will break (e.g Idle TQ).
    897   if (task.work_queue_type == WorkQueueType::kDelayed) {
    898     main_thread_only().delayed_work_queue->PushNonNestableTaskToFront(
    899         std::move(task.task));
    900   } else {
    901     main_thread_only().immediate_work_queue->PushNonNestableTaskToFront(
    902         std::move(task.task));
    903   }
    904 }
    905 
    906 void TaskQueueImpl::SetOnNextWakeUpChangedCallback(
    907     TaskQueueImpl::OnNextWakeUpChangedCallback callback) {
    908 #if DCHECK_IS_ON()
    909   if (callback) {
    910     DCHECK(main_thread_only().on_next_wake_up_changed_callback.is_null())
    911         << "Can't assign two different observers to "
    912            "blink::scheduler::TaskQueue";
    913   }
    914 #endif
    915   AutoLock lock(any_thread_lock_);
    916   any_thread().on_next_wake_up_changed_callback = callback;
    917   main_thread_only().on_next_wake_up_changed_callback = callback;
    918 }
    919 
    920 void TaskQueueImpl::UpdateDelayedWakeUp(LazyNow* lazy_now) {
    921   return UpdateDelayedWakeUpImpl(lazy_now, GetNextScheduledWakeUpImpl());
    922 }
    923 
    924 void TaskQueueImpl::UpdateDelayedWakeUpImpl(
    925     LazyNow* lazy_now,
    926     Optional<TaskQueueImpl::DelayedWakeUp> wake_up) {
    927   if (main_thread_only().scheduled_wake_up == wake_up)
    928     return;
    929   main_thread_only().scheduled_wake_up = wake_up;
    930 
    931   if (wake_up &&
    932       !main_thread_only().on_next_wake_up_changed_callback.is_null() &&
    933       !HasPendingImmediateWork()) {
    934     main_thread_only().on_next_wake_up_changed_callback.Run(wake_up->time);
    935   }
    936 
    937   main_thread_only().time_domain->SetNextWakeUpForQueue(this, wake_up,
    938                                                         lazy_now);
    939 }
    940 
    941 void TaskQueueImpl::SetDelayedWakeUpForTesting(
    942     Optional<TaskQueueImpl::DelayedWakeUp> wake_up) {
    943   LazyNow lazy_now = main_thread_only().time_domain->CreateLazyNow();
    944   UpdateDelayedWakeUpImpl(&lazy_now, wake_up);
    945 }
    946 
    947 bool TaskQueueImpl::HasPendingImmediateWork() {
    948   // Any work queue tasks count as immediate work.
    949   if (!main_thread_only().delayed_work_queue->Empty() ||
    950       !main_thread_only().immediate_work_queue->Empty()) {
    951     return true;
    952   }
    953 
    954   // Finally tasks on |immediate_incoming_queue| count as immediate work.
    955   AutoLock lock(immediate_incoming_queue_lock_);
    956   return !immediate_incoming_queue().empty();
    957 }
    958 
    959 void TaskQueueImpl::SetOnTaskStartedHandler(
    960     TaskQueueImpl::OnTaskStartedHandler handler) {
    961   main_thread_only().on_task_started_handler = std::move(handler);
    962 }
    963 
    964 void TaskQueueImpl::OnTaskStarted(const TaskQueue::Task& task,
    965                                   const TaskQueue::TaskTiming& task_timing) {
    966   if (!main_thread_only().on_task_started_handler.is_null())
    967     main_thread_only().on_task_started_handler.Run(task, task_timing);
    968 }
    969 
    970 void TaskQueueImpl::SetOnTaskCompletedHandler(
    971     TaskQueueImpl::OnTaskCompletedHandler handler) {
    972   main_thread_only().on_task_completed_handler = std::move(handler);
    973 }
    974 
    975 void TaskQueueImpl::OnTaskCompleted(const TaskQueue::Task& task,
    976                                     const TaskQueue::TaskTiming& task_timing) {
    977   if (!main_thread_only().on_task_completed_handler.is_null())
    978     main_thread_only().on_task_completed_handler.Run(task, task_timing);
    979 }
    980 
    981 bool TaskQueueImpl::RequiresTaskTiming() const {
    982   return !main_thread_only().on_task_started_handler.is_null() ||
    983          !main_thread_only().on_task_completed_handler.is_null();
    984 }
    985 
    986 bool TaskQueueImpl::IsUnregistered() const {
    987   AutoLock lock(any_thread_lock_);
    988   return !any_thread().sequence_manager;
    989 }
    990 
    991 WeakPtr<SequenceManagerImpl> TaskQueueImpl::GetSequenceManagerWeakPtr() {
    992   return main_thread_only().sequence_manager->GetWeakPtr();
    993 }
    994 
    995 scoped_refptr<GracefulQueueShutdownHelper>
    996 TaskQueueImpl::GetGracefulQueueShutdownHelper() {
    997   return main_thread_only().sequence_manager->GetGracefulQueueShutdownHelper();
    998 }
    999 
   1000 void TaskQueueImpl::SetQueueEnabledForTest(bool enabled) {
   1001   main_thread_only().is_enabled_for_test = enabled;
   1002   EnableOrDisableWithSelector(IsQueueEnabled());
   1003 }
   1004 
   1005 void TaskQueueImpl::ActivateDelayedFenceIfNeeded(TimeTicks now) {
   1006   if (!main_thread_only().delayed_fence)
   1007     return;
   1008   if (main_thread_only().delayed_fence.value() > now)
   1009     return;
   1010   InsertFence(TaskQueue::InsertFencePosition::kNow);
   1011   main_thread_only().delayed_fence = nullopt;
   1012 }
   1013 
   1014 }  // namespace internal
   1015 }  // namespace sequence_manager
   1016 }  // namespace base
   1017