Home | History | Annotate | Download | only in message_loop
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #ifndef BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
      6 #define BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
      7 
      8 #include "base/base_export.h"
      9 #include "base/callback.h"
     10 #include "base/macros.h"
     11 #include "base/memory/ref_counted.h"
     12 #include "base/pending_task.h"
     13 #include "base/sequence_checker.h"
     14 #include "base/synchronization/lock.h"
     15 #include "base/time/time.h"
     16 
     17 namespace base {
     18 
     19 class BasicPostTaskPerfTest;
     20 
     21 namespace internal {
     22 
     23 // Implements a queue of tasks posted to the message loop running on the current
     24 // thread. This class takes care of synchronizing posting tasks from different
     25 // threads and together with MessageLoop ensures clean shutdown.
     26 class BASE_EXPORT IncomingTaskQueue
     27     : public RefCountedThreadSafe<IncomingTaskQueue> {
     28  public:
     29   // TODO(gab): Move this to SequencedTaskSource::Observer in
     30   // https://chromium-review.googlesource.com/c/chromium/src/+/1088762.
     31   class Observer {
     32    public:
     33     virtual ~Observer() = default;
     34 
     35     // Notifies this Observer that it is about to enqueue |task|. The Observer
     36     // may alter |task| as a result (e.g. add metadata to the PendingTask
     37     // struct). This may be called while holding a lock and shouldn't perform
     38     // logic requiring synchronization (override DidQueueTask() for that).
     39     virtual void WillQueueTask(PendingTask* task) = 0;
     40 
     41     // Notifies this Observer that a task was queued in the IncomingTaskQueue it
     42     // observes. |was_empty| is true if the task source was empty (i.e.
     43     // |!HasTasks()|) before this task was posted. DidQueueTask() can be invoked
     44     // from any thread.
     45     virtual void DidQueueTask(bool was_empty) = 0;
     46   };
     47 
     48   // Provides a read and remove only view into a task queue.
     49   class ReadAndRemoveOnlyQueue {
     50    public:
     51     ReadAndRemoveOnlyQueue() = default;
     52     virtual ~ReadAndRemoveOnlyQueue() = default;
     53 
     54     // Returns the next task. HasTasks() is assumed to be true.
     55     virtual const PendingTask& Peek() = 0;
     56 
     57     // Removes and returns the next task. HasTasks() is assumed to be true.
     58     virtual PendingTask Pop() = 0;
     59 
     60     // Whether this queue has tasks.
     61     virtual bool HasTasks() = 0;
     62 
     63     // Removes all tasks.
     64     virtual void Clear() = 0;
     65 
     66    private:
     67     DISALLOW_COPY_AND_ASSIGN(ReadAndRemoveOnlyQueue);
     68   };
     69 
     70   // Provides a read-write task queue.
     71   class Queue : public ReadAndRemoveOnlyQueue {
     72    public:
     73     Queue() = default;
     74     ~Queue() override = default;
     75 
     76     // Adds the task to the end of the queue.
     77     virtual void Push(PendingTask pending_task) = 0;
     78 
     79    private:
     80     DISALLOW_COPY_AND_ASSIGN(Queue);
     81   };
     82 
     83   // Constructs an IncomingTaskQueue which will invoke |task_queue_observer|
     84   // when tasks are queued. |task_queue_observer| will be bound to this
     85   // IncomingTaskQueue's lifetime. Ownership is required as opposed to a raw
     86   // pointer since IncomingTaskQueue is ref-counted. For the same reasons,
     87   // |task_queue_observer| needs to support being invoked racily during
     88   // shutdown).
     89   explicit IncomingTaskQueue(std::unique_ptr<Observer> task_queue_observer);
     90 
     91   // Appends a task to the incoming queue. Posting of all tasks is routed though
     92   // AddToIncomingQueue() or TryAddToIncomingQueue() to make sure that posting
     93   // task is properly synchronized between different threads.
     94   //
     95   // Returns true if the task was successfully added to the queue, otherwise
     96   // returns false. In all cases, the ownership of |task| is transferred to the
     97   // called method.
     98   bool AddToIncomingQueue(const Location& from_here,
     99                           OnceClosure task,
    100                           TimeDelta delay,
    101                           Nestable nestable);
    102 
    103   // Instructs this IncomingTaskQueue to stop accepting tasks, this cannot be
    104   // undone. Note that the registered IncomingTaskQueue::Observer may still
    105   // racily receive a few DidQueueTask() calls while the Shutdown() signal
    106   // propagates to other threads and it needs to support that.
    107   void Shutdown();
    108 
    109   ReadAndRemoveOnlyQueue& triage_tasks() { return triage_tasks_; }
    110 
    111   Queue& delayed_tasks() { return delayed_tasks_; }
    112 
    113   Queue& deferred_tasks() { return deferred_tasks_; }
    114 
    115   bool HasPendingHighResolutionTasks() const {
    116     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    117     return delayed_tasks_.HasPendingHighResolutionTasks();
    118   }
    119 
    120   // Reports UMA metrics about its queues before the MessageLoop goes to sleep
    121   // per being idle.
    122   void ReportMetricsOnIdle() const;
    123 
    124  private:
    125   friend class base::BasicPostTaskPerfTest;
    126   friend class RefCountedThreadSafe<IncomingTaskQueue>;
    127 
    128   // These queues below support the previous MessageLoop behavior of
    129   // maintaining three queue queues to process tasks:
    130   //
    131   // TriageQueue
    132   // The first queue to receive all tasks for the processing sequence (when
    133   // reloading from the thread-safe |incoming_queue_|). Tasks are generally
    134   // either dispatched immediately or sent to the queues below.
    135   //
    136   // DelayedQueue
    137   // The queue for holding tasks that should be run later and sorted by expected
    138   // run time.
    139   //
    140   // DeferredQueue
    141   // The queue for holding tasks that couldn't be run while the MessageLoop was
    142   // nested. These are generally processed during the idle stage.
    143   //
    144   // Many of these do not share implementations even though they look like they
    145   // could because of small quirks (reloading semantics) or differing underlying
    146   // data strucutre (TaskQueue vs DelayedTaskQueue).
    147 
    148   // The starting point for all tasks on the sequence processing the tasks.
    149   class TriageQueue : public ReadAndRemoveOnlyQueue {
    150    public:
    151     TriageQueue(IncomingTaskQueue* outer);
    152     ~TriageQueue() override;
    153 
    154     // ReadAndRemoveOnlyQueue:
    155     // The methods below will attempt to reload from the incoming queue if the
    156     // queue itself is empty (Clear() has special logic to reload only once
    157     // should destructors post more tasks).
    158     const PendingTask& Peek() override;
    159     PendingTask Pop() override;
    160     // Whether this queue has tasks after reloading from the incoming queue.
    161     bool HasTasks() override;
    162     void Clear() override;
    163 
    164    private:
    165     void ReloadFromIncomingQueueIfEmpty();
    166 
    167     IncomingTaskQueue* const outer_;
    168     TaskQueue queue_;
    169 
    170     DISALLOW_COPY_AND_ASSIGN(TriageQueue);
    171   };
    172 
    173   class DelayedQueue : public Queue {
    174    public:
    175     DelayedQueue();
    176     ~DelayedQueue() override;
    177 
    178     // Queue:
    179     const PendingTask& Peek() override;
    180     PendingTask Pop() override;
    181     // Whether this queue has tasks after sweeping the cancelled ones in front.
    182     bool HasTasks() override;
    183     void Clear() override;
    184     void Push(PendingTask pending_task) override;
    185 
    186     size_t Size() const;
    187     bool HasPendingHighResolutionTasks() const {
    188       return pending_high_res_tasks_ > 0;
    189     }
    190 
    191    private:
    192     DelayedTaskQueue queue_;
    193 
    194     // Number of high resolution tasks in |queue_|.
    195     int pending_high_res_tasks_ = 0;
    196 
    197     SEQUENCE_CHECKER(sequence_checker_);
    198 
    199     DISALLOW_COPY_AND_ASSIGN(DelayedQueue);
    200   };
    201 
    202   class DeferredQueue : public Queue {
    203    public:
    204     DeferredQueue();
    205     ~DeferredQueue() override;
    206 
    207     // Queue:
    208     const PendingTask& Peek() override;
    209     PendingTask Pop() override;
    210     bool HasTasks() override;
    211     void Clear() override;
    212     void Push(PendingTask pending_task) override;
    213 
    214    private:
    215     TaskQueue queue_;
    216 
    217     SEQUENCE_CHECKER(sequence_checker_);
    218 
    219     DISALLOW_COPY_AND_ASSIGN(DeferredQueue);
    220   };
    221 
    222   virtual ~IncomingTaskQueue();
    223 
    224   // Adds a task to |incoming_queue_|. The caller retains ownership of
    225   // |pending_task|, but this function will reset the value of
    226   // |pending_task->task|. This is needed to ensure that the posting call stack
    227   // does not retain |pending_task->task| beyond this function call.
    228   bool PostPendingTask(PendingTask* pending_task);
    229 
    230   // Does the real work of posting a pending task. Returns true if
    231   // |incoming_queue_| was empty before |pending_task| was posted.
    232   bool PostPendingTaskLockRequired(PendingTask* pending_task);
    233 
    234   // Loads tasks from the |incoming_queue_| into |*work_queue|. Must be called
    235   // from the sequence processing the tasks.
    236   void ReloadWorkQueue(TaskQueue* work_queue);
    237 
    238   // Checks calls made only on the MessageLoop thread.
    239   SEQUENCE_CHECKER(sequence_checker_);
    240 
    241   const std::unique_ptr<Observer> task_queue_observer_;
    242 
    243   // Queue for initial triaging of tasks on the |sequence_checker_| sequence.
    244   TriageQueue triage_tasks_;
    245 
    246   // Queue for delayed tasks on the |sequence_checker_| sequence.
    247   DelayedQueue delayed_tasks_;
    248 
    249   // Queue for non-nestable deferred tasks on the |sequence_checker_| sequence.
    250   DeferredQueue deferred_tasks_;
    251 
    252   // Synchronizes access to all members below this line.
    253   base::Lock incoming_queue_lock_;
    254 
    255   // An incoming queue of tasks that are acquired under a mutex for processing
    256   // on this instance's thread. These tasks have not yet been been pushed to
    257   // |triage_tasks_|.
    258   TaskQueue incoming_queue_;
    259 
    260   // True if new tasks should be accepted.
    261   bool accept_new_tasks_ = true;
    262 
    263   // The next sequence number to use for delayed tasks.
    264   int next_sequence_num_ = 0;
    265 
    266   // True if the outgoing queue (|triage_tasks_|) is empty. Toggled under
    267   // |incoming_queue_lock_| in ReloadWorkQueue() so that
    268   // PostPendingTaskLockRequired() can tell, without accessing the thread unsafe
    269   // |triage_tasks_|, if the IncomingTaskQueue has been made non-empty by a
    270   // PostTask() (and needs to inform its Observer).
    271   bool triage_queue_empty_ = true;
    272 
    273   DISALLOW_COPY_AND_ASSIGN(IncomingTaskQueue);
    274 };
    275 
    276 }  // namespace internal
    277 }  // namespace base
    278 
    279 #endif  // BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
    280