Home | History | Annotate | Download | only in sequence_manager
      1 // Copyright 2018 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #ifndef BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_IMPL_H_
      6 #define BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_IMPL_H_
      7 
      8 #include <stddef.h>
      9 
     10 #include <memory>
     11 #include <set>
     12 
     13 #include "base/callback.h"
     14 #include "base/containers/circular_deque.h"
     15 #include "base/macros.h"
     16 #include "base/memory/weak_ptr.h"
     17 #include "base/message_loop/message_loop.h"
     18 #include "base/pending_task.h"
     19 #include "base/task/sequence_manager/enqueue_order.h"
     20 #include "base/task/sequence_manager/intrusive_heap.h"
     21 #include "base/task/sequence_manager/lazily_deallocated_deque.h"
     22 #include "base/task/sequence_manager/sequenced_task_source.h"
     23 #include "base/task/sequence_manager/task_queue.h"
     24 #include "base/threading/thread_checker.h"
     25 #include "base/trace_event/trace_event.h"
     26 #include "base/trace_event/trace_event_argument.h"
     27 
     28 namespace base {
     29 namespace sequence_manager {
     30 
     31 class LazyNow;
     32 class TimeDomain;
     33 
     34 namespace internal {
     35 
     36 class SequenceManagerImpl;
     37 class WorkQueue;
     38 class WorkQueueSets;
     39 
     40 struct IncomingImmediateWorkList {
     41   IncomingImmediateWorkList* next = nullptr;
     42   TaskQueueImpl* queue = nullptr;
     43   internal::EnqueueOrder order;
     44 };
     45 
     46 // TaskQueueImpl has four main queues:
     47 //
     48 // Immediate (non-delayed) tasks:
     49 //    |immediate_incoming_queue| - PostTask enqueues tasks here.
     50 //    |immediate_work_queue| - SequenceManager takes immediate tasks here.
     51 //
     52 // Delayed tasks
     53 //    |delayed_incoming_queue| - PostDelayedTask enqueues tasks here.
     54 //    |delayed_work_queue| - SequenceManager takes delayed tasks here.
     55 //
     56 // The |immediate_incoming_queue| can be accessed from any thread, the other
     57 // queues are main-thread only. To reduce the overhead of locking,
     58 // |immediate_work_queue| is swapped with |immediate_incoming_queue| when
     59 // |immediate_work_queue| becomes empty.
     60 //
     61 // Delayed tasks are initially posted to |delayed_incoming_queue| and a wake-up
     62 // is scheduled with the TimeDomain.  When the delay has elapsed, the TimeDomain
     63 // calls UpdateDelayedWorkQueue and ready delayed tasks are moved into the
     64 // |delayed_work_queue|. Note the EnqueueOrder (used for ordering) for a delayed
     65 // task is not set until it's moved into the |delayed_work_queue|.
     66 //
     67 // TaskQueueImpl uses the WorkQueueSets and the TaskQueueSelector to implement
     68 // prioritization. Task selection is done by the TaskQueueSelector and when a
     69 // queue is selected, it round-robins between the |immediate_work_queue| and
     70 // |delayed_work_queue|.  The reason for this is we want to make sure delayed
     71 // tasks (normally the most common type) don't starve out immediate work.
     72 class BASE_EXPORT TaskQueueImpl {
     73  public:
     74   TaskQueueImpl(SequenceManagerImpl* sequence_manager,
     75                 TimeDomain* time_domain,
     76                 const TaskQueue::Spec& spec);
     77 
     78   ~TaskQueueImpl();
     79 
     80   // Represents a time at which a task wants to run. Tasks scheduled for the
     81   // same point in time will be ordered by their sequence numbers.
     82   struct DelayedWakeUp {
     83     TimeTicks time;
     84     int sequence_num;
     85 
     86     bool operator!=(const DelayedWakeUp& other) const {
     87       return time != other.time || other.sequence_num != sequence_num;
     88     }
     89 
     90     bool operator==(const DelayedWakeUp& other) const {
     91       return !(*this != other);
     92     }
     93 
     94     bool operator<=(const DelayedWakeUp& other) const {
     95       if (time == other.time) {
     96         // Debug gcc builds can compare an element against itself.
     97         DCHECK(sequence_num != other.sequence_num || this == &other);
     98         // |PostedTask::sequence_num| is int and might wrap around to
     99         // a negative number when casted from EnqueueOrder.
    100         // This way of comparison handles that properly.
    101         return (sequence_num - other.sequence_num) <= 0;
    102       }
    103       return time < other.time;
    104     }
    105   };
    106 
    107   class BASE_EXPORT Task : public TaskQueue::Task {
    108    public:
    109     Task(TaskQueue::PostedTask task,
    110          TimeTicks desired_run_time,
    111          EnqueueOrder sequence_number);
    112 
    113     Task(TaskQueue::PostedTask task,
    114          TimeTicks desired_run_time,
    115          EnqueueOrder sequence_number,
    116          EnqueueOrder enqueue_order);
    117 
    118     DelayedWakeUp delayed_wake_up() const {
    119       // Since we use |sequence_num| in DelayedWakeUp for ordering purposes
    120       // and integer overflow handling is type-sensitive it's worth to protect
    121       // it from an unnoticed potential change in the PendingTask base class.
    122       static_assert(std::is_same<decltype(sequence_num), int>::value, "");
    123       return DelayedWakeUp{delayed_run_time, sequence_num};
    124     }
    125 
    126     EnqueueOrder enqueue_order() const {
    127       DCHECK(enqueue_order_);
    128       return enqueue_order_;
    129     }
    130 
    131     void set_enqueue_order(EnqueueOrder enqueue_order) {
    132       DCHECK(!enqueue_order_);
    133       enqueue_order_ = enqueue_order;
    134     }
    135 
    136     bool enqueue_order_set() const { return enqueue_order_; }
    137 
    138    private:
    139     // Similar to sequence number, but ultimately the |enqueue_order_| is what
    140     // the scheduler uses for task ordering. For immediate tasks |enqueue_order|
    141     // is set when posted, but for delayed tasks it's not defined until they are
    142     // enqueued on the |delayed_work_queue_|. This is because otherwise delayed
    143     // tasks could run before an immediate task posted after the delayed task.
    144     EnqueueOrder enqueue_order_;
    145   };
    146 
    147   // A result retuned by PostDelayedTask. When scheduler failed to post a task
    148   // due to being shutdown a task is returned to be destroyed outside the lock.
    149   struct PostTaskResult {
    150     PostTaskResult();
    151     PostTaskResult(bool success, TaskQueue::PostedTask task);
    152     PostTaskResult(PostTaskResult&& move_from);
    153     PostTaskResult(const PostTaskResult& copy_from) = delete;
    154     ~PostTaskResult();
    155 
    156     static PostTaskResult Success();
    157     static PostTaskResult Fail(TaskQueue::PostedTask task);
    158 
    159     bool success;
    160     TaskQueue::PostedTask task;
    161   };
    162 
    163   // Types of queues TaskQueueImpl is maintaining internally.
    164   enum class WorkQueueType { kImmediate, kDelayed };
    165 
    166   // Non-nestable tasks may get deferred but such queue is being maintained on
    167   // SequenceManager side, so we need to keep information how to requeue it.
    168   struct DeferredNonNestableTask {
    169     internal::TaskQueueImpl::Task task;
    170     internal::TaskQueueImpl* task_queue;
    171     WorkQueueType work_queue_type;
    172   };
    173 
    174   using OnNextWakeUpChangedCallback = RepeatingCallback<void(TimeTicks)>;
    175   using OnTaskStartedHandler =
    176       RepeatingCallback<void(const TaskQueue::Task&,
    177                              const TaskQueue::TaskTiming&)>;
    178   using OnTaskCompletedHandler =
    179       RepeatingCallback<void(const TaskQueue::Task&,
    180                              const TaskQueue::TaskTiming&)>;
    181 
    182   // TaskQueue implementation.
    183   const char* GetName() const;
    184   bool RunsTasksInCurrentSequence() const;
    185   PostTaskResult PostDelayedTask(TaskQueue::PostedTask task);
    186   // Require a reference to enclosing task queue for lifetime control.
    187   std::unique_ptr<TaskQueue::QueueEnabledVoter> CreateQueueEnabledVoter(
    188       scoped_refptr<TaskQueue> owning_task_queue);
    189   bool IsQueueEnabled() const;
    190   bool IsEmpty() const;
    191   size_t GetNumberOfPendingTasks() const;
    192   bool HasTaskToRunImmediately() const;
    193   Optional<TimeTicks> GetNextScheduledWakeUp();
    194   Optional<DelayedWakeUp> GetNextScheduledWakeUpImpl();
    195   void SetQueuePriority(TaskQueue::QueuePriority priority);
    196   TaskQueue::QueuePriority GetQueuePriority() const;
    197   void AddTaskObserver(MessageLoop::TaskObserver* task_observer);
    198   void RemoveTaskObserver(MessageLoop::TaskObserver* task_observer);
    199   void SetTimeDomain(TimeDomain* time_domain);
    200   TimeDomain* GetTimeDomain() const;
    201   void SetBlameContext(trace_event::BlameContext* blame_context);
    202   void InsertFence(TaskQueue::InsertFencePosition position);
    203   void InsertFenceAt(TimeTicks time);
    204   void RemoveFence();
    205   bool HasActiveFence();
    206   bool BlockedByFence() const;
    207   // Implementation of TaskQueue::SetObserver.
    208   void SetOnNextWakeUpChangedCallback(OnNextWakeUpChangedCallback callback);
    209 
    210   void UnregisterTaskQueue();
    211 
    212   // Returns true if a (potentially hypothetical) task with the specified
    213   // |enqueue_order| could run on the queue. Must be called from the main
    214   // thread.
    215   bool CouldTaskRun(EnqueueOrder enqueue_order) const;
    216 
    217   // Must only be called from the thread this task queue was created on.
    218   void ReloadImmediateWorkQueueIfEmpty();
    219 
    220   void AsValueInto(TimeTicks now, trace_event::TracedValue* state) const;
    221 
    222   bool GetQuiescenceMonitored() const { return should_monitor_quiescence_; }
    223   bool GetShouldNotifyObservers() const { return should_notify_observers_; }
    224 
    225   void NotifyWillProcessTask(const PendingTask& pending_task);
    226   void NotifyDidProcessTask(const PendingTask& pending_task);
    227 
    228   // Check for available tasks in immediate work queues.
    229   // Used to check if we need to generate notifications about delayed work.
    230   bool HasPendingImmediateWork();
    231 
    232   WorkQueue* delayed_work_queue() {
    233     return main_thread_only().delayed_work_queue.get();
    234   }
    235 
    236   const WorkQueue* delayed_work_queue() const {
    237     return main_thread_only().delayed_work_queue.get();
    238   }
    239 
    240   WorkQueue* immediate_work_queue() {
    241     return main_thread_only().immediate_work_queue.get();
    242   }
    243 
    244   const WorkQueue* immediate_work_queue() const {
    245     return main_thread_only().immediate_work_queue.get();
    246   }
    247 
    248   // Protected by SequenceManagerImpl's AnyThread lock.
    249   IncomingImmediateWorkList* immediate_work_list_storage() {
    250     return &immediate_work_list_storage_;
    251   }
    252 
    253   // Enqueues any delayed tasks which should be run now on the
    254   // |delayed_work_queue|.
    255   // Must be called from the main thread.
    256   void WakeUpForDelayedWork(LazyNow* lazy_now);
    257 
    258   HeapHandle heap_handle() const { return main_thread_only().heap_handle; }
    259 
    260   void set_heap_handle(HeapHandle heap_handle) {
    261     main_thread_only().heap_handle = heap_handle;
    262   }
    263 
    264   // Pushes |task| onto the front of the specified work queue. Caution must be
    265   // taken with this API because you could easily starve out other work.
    266   // TODO(kraynov): Simplify non-nestable task logic https://crbug.com/845437.
    267   void RequeueDeferredNonNestableTask(DeferredNonNestableTask task);
    268 
    269   void PushImmediateIncomingTaskForTest(TaskQueueImpl::Task&& task);
    270 
    271   class QueueEnabledVoterImpl : public TaskQueue::QueueEnabledVoter {
    272    public:
    273     explicit QueueEnabledVoterImpl(scoped_refptr<TaskQueue> task_queue);
    274     ~QueueEnabledVoterImpl() override;
    275 
    276     // QueueEnabledVoter implementation.
    277     void SetQueueEnabled(bool enabled) override;
    278 
    279     TaskQueueImpl* GetTaskQueueForTest() const {
    280       return task_queue_->GetTaskQueueImpl();
    281     }
    282 
    283    private:
    284     friend class TaskQueueImpl;
    285 
    286     scoped_refptr<TaskQueue> task_queue_;
    287     bool enabled_;
    288   };
    289 
    290   // Iterates over |delayed_incoming_queue| removing canceled tasks.
    291   void SweepCanceledDelayedTasks(TimeTicks now);
    292 
    293   // Allows wrapping TaskQueue to set a handler to subscribe for notifications
    294   // about started and completed tasks.
    295   void SetOnTaskStartedHandler(OnTaskStartedHandler handler);
    296   void OnTaskStarted(const TaskQueue::Task& task,
    297                      const TaskQueue::TaskTiming& task_timing);
    298   void SetOnTaskCompletedHandler(OnTaskCompletedHandler handler);
    299   void OnTaskCompleted(const TaskQueue::Task& task,
    300                        const TaskQueue::TaskTiming& task_timing);
    301   bool RequiresTaskTiming() const;
    302 
    303   WeakPtr<SequenceManagerImpl> GetSequenceManagerWeakPtr();
    304 
    305   scoped_refptr<GracefulQueueShutdownHelper> GetGracefulQueueShutdownHelper();
    306 
    307   // Returns true if this queue is unregistered or task queue manager is deleted
    308   // and this queue can be safely deleted on any thread.
    309   bool IsUnregistered() const;
    310 
    311   // Disables queue for testing purposes, when a QueueEnabledVoter can't be
    312   // constructed due to not having TaskQueue.
    313   void SetQueueEnabledForTest(bool enabled);
    314 
    315  protected:
    316   void SetDelayedWakeUpForTesting(Optional<DelayedWakeUp> wake_up);
    317 
    318  private:
    319   friend class WorkQueue;
    320   friend class WorkQueueTest;
    321 
    322   struct AnyThread {
    323     AnyThread(SequenceManagerImpl* sequence_manager, TimeDomain* time_domain);
    324     ~AnyThread();
    325 
    326     // SequenceManagerImpl, TimeDomain and Observer are maintained in two
    327     // copies: inside AnyThread and inside MainThreadOnly. They can be changed
    328     // only from main thread, so it should be locked before accessing from other
    329     // threads.
    330     SequenceManagerImpl* sequence_manager;
    331     TimeDomain* time_domain;
    332     // Callback corresponding to TaskQueue::Observer::OnQueueNextChanged.
    333     OnNextWakeUpChangedCallback on_next_wake_up_changed_callback;
    334   };
    335 
    336   struct MainThreadOnly {
    337     MainThreadOnly(SequenceManagerImpl* sequence_manager,
    338                    TaskQueueImpl* task_queue,
    339                    TimeDomain* time_domain);
    340     ~MainThreadOnly();
    341 
    342     // Another copy of SequenceManagerImpl, TimeDomain and Observer
    343     // for lock-free access from the main thread.
    344     // See description inside struct AnyThread for details.
    345     SequenceManagerImpl* sequence_manager;
    346     TimeDomain* time_domain;
    347     // Callback corresponding to TaskQueue::Observer::OnQueueNextChanged.
    348     OnNextWakeUpChangedCallback on_next_wake_up_changed_callback;
    349 
    350     std::unique_ptr<WorkQueue> delayed_work_queue;
    351     std::unique_ptr<WorkQueue> immediate_work_queue;
    352     std::priority_queue<TaskQueueImpl::Task> delayed_incoming_queue;
    353     ObserverList<MessageLoop::TaskObserver> task_observers;
    354     size_t set_index;
    355     HeapHandle heap_handle;
    356     int is_enabled_refcount;
    357     int voter_refcount;
    358     trace_event::BlameContext* blame_context;  // Not owned.
    359     EnqueueOrder current_fence;
    360     Optional<TimeTicks> delayed_fence;
    361     OnTaskStartedHandler on_task_started_handler;
    362     OnTaskCompletedHandler on_task_completed_handler;
    363     // Last reported wake up, used only in UpdateWakeUp to avoid
    364     // excessive calls.
    365     Optional<DelayedWakeUp> scheduled_wake_up;
    366     // If false, queue will be disabled. Used only for tests.
    367     bool is_enabled_for_test;
    368   };
    369 
    370   PostTaskResult PostImmediateTaskImpl(TaskQueue::PostedTask task);
    371   PostTaskResult PostDelayedTaskImpl(TaskQueue::PostedTask task);
    372 
    373   // Push the task onto the |delayed_incoming_queue|. Lock-free main thread
    374   // only fast path.
    375   void PushOntoDelayedIncomingQueueFromMainThread(Task pending_task,
    376                                                   TimeTicks now);
    377 
    378   // Push the task onto the |delayed_incoming_queue|.  Slow path from other
    379   // threads.
    380   void PushOntoDelayedIncomingQueueLocked(Task pending_task);
    381 
    382   void ScheduleDelayedWorkTask(Task pending_task);
    383 
    384   void MoveReadyImmediateTasksToImmediateWorkQueueLocked();
    385 
    386   // Push the task onto the |immediate_incoming_queue| and for auto pumped
    387   // queues it calls MaybePostDoWorkOnMainRunner if the Incoming queue was
    388   // empty.
    389   void PushOntoImmediateIncomingQueueLocked(Task task);
    390 
    391   using TaskDeque = circular_deque<Task>;
    392 
    393   // Extracts all the tasks from the immediate incoming queue and swaps it with
    394   // |queue| which must be empty.
    395   // Can be called from any thread.
    396   void ReloadEmptyImmediateQueue(TaskDeque* queue);
    397 
    398   void TraceQueueSize() const;
    399   static void QueueAsValueInto(const TaskDeque& queue,
    400                                TimeTicks now,
    401                                trace_event::TracedValue* state);
    402   static void QueueAsValueInto(const std::priority_queue<Task>& queue,
    403                                TimeTicks now,
    404                                trace_event::TracedValue* state);
    405   static void TaskAsValueInto(const Task& task,
    406                               TimeTicks now,
    407                               trace_event::TracedValue* state);
    408 
    409   void RemoveQueueEnabledVoter(const QueueEnabledVoterImpl* voter);
    410   void OnQueueEnabledVoteChanged(bool enabled);
    411   void EnableOrDisableWithSelector(bool enable);
    412 
    413   // Schedules delayed work on time domain and calls the observer.
    414   void UpdateDelayedWakeUp(LazyNow* lazy_now);
    415   void UpdateDelayedWakeUpImpl(LazyNow* lazy_now,
    416                                Optional<DelayedWakeUp> wake_up);
    417 
    418   // Activate a delayed fence if a time has come.
    419   void ActivateDelayedFenceIfNeeded(TimeTicks now);
    420 
    421   const char* name_;
    422 
    423   const PlatformThreadId thread_id_;
    424 
    425   mutable Lock any_thread_lock_;
    426   AnyThread any_thread_;
    427   struct AnyThread& any_thread() {
    428     any_thread_lock_.AssertAcquired();
    429     return any_thread_;
    430   }
    431   const struct AnyThread& any_thread() const {
    432     any_thread_lock_.AssertAcquired();
    433     return any_thread_;
    434   }
    435 
    436   ThreadChecker main_thread_checker_;
    437   MainThreadOnly main_thread_only_;
    438   MainThreadOnly& main_thread_only() {
    439     DCHECK(main_thread_checker_.CalledOnValidThread());
    440     return main_thread_only_;
    441   }
    442   const MainThreadOnly& main_thread_only() const {
    443     DCHECK(main_thread_checker_.CalledOnValidThread());
    444     return main_thread_only_;
    445   }
    446 
    447   mutable Lock immediate_incoming_queue_lock_;
    448   TaskDeque immediate_incoming_queue_;
    449   TaskDeque& immediate_incoming_queue() {
    450     immediate_incoming_queue_lock_.AssertAcquired();
    451     return immediate_incoming_queue_;
    452   }
    453   const TaskDeque& immediate_incoming_queue() const {
    454     immediate_incoming_queue_lock_.AssertAcquired();
    455     return immediate_incoming_queue_;
    456   }
    457 
    458   // Protected by SequenceManagerImpl's AnyThread lock.
    459   IncomingImmediateWorkList immediate_work_list_storage_;
    460 
    461   const bool should_monitor_quiescence_;
    462   const bool should_notify_observers_;
    463 
    464   DISALLOW_COPY_AND_ASSIGN(TaskQueueImpl);
    465 };
    466 
    467 }  // namespace internal
    468 }  // namespace sequence_manager
    469 }  // namespace base
    470 
    471 #endif  // BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_IMPL_H_
    472