Home | History | Annotate | Download | only in message_loop
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "base/message_loop/incoming_task_queue.h"
      6 
      7 #include <limits>
      8 #include <utility>
      9 
     10 #include "base/bind.h"
     11 #include "base/callback_helpers.h"
     12 #include "base/location.h"
     13 #include "base/metrics/histogram_macros.h"
     14 #include "base/synchronization/waitable_event.h"
     15 #include "base/time/time.h"
     16 #include "build/build_config.h"
     17 
     18 namespace base {
     19 namespace internal {
     20 
     21 namespace {
     22 
     23 #if DCHECK_IS_ON()
     24 // Delays larger than this are often bogus, and a warning should be emitted in
     25 // debug builds to warn developers.  http://crbug.com/450045
     26 constexpr TimeDelta kTaskDelayWarningThreshold = TimeDelta::FromDays(14);
     27 #endif
     28 
     29 TimeTicks CalculateDelayedRuntime(TimeDelta delay) {
     30   TimeTicks delayed_run_time;
     31   if (delay > TimeDelta())
     32     delayed_run_time = TimeTicks::Now() + delay;
     33   else
     34     DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative";
     35   return delayed_run_time;
     36 }
     37 
     38 }  // namespace
     39 
     40 IncomingTaskQueue::IncomingTaskQueue(
     41     std::unique_ptr<Observer> task_queue_observer)
     42     : task_queue_observer_(std::move(task_queue_observer)),
     43       triage_tasks_(this) {
     44   // The constructing sequence is not necessarily the running sequence, e.g. in
     45   // the case of a MessageLoop created unbound.
     46   DETACH_FROM_SEQUENCE(sequence_checker_);
     47 }
     48 
     49 IncomingTaskQueue::~IncomingTaskQueue() = default;
     50 
     51 bool IncomingTaskQueue::AddToIncomingQueue(const Location& from_here,
     52                                            OnceClosure task,
     53                                            TimeDelta delay,
     54                                            Nestable nestable) {
     55   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
     56   // for details.
     57   CHECK(task);
     58   DLOG_IF(WARNING, delay > kTaskDelayWarningThreshold)
     59       << "Requesting super-long task delay period of " << delay.InSeconds()
     60       << " seconds from here: " << from_here.ToString();
     61 
     62   PendingTask pending_task(from_here, std::move(task),
     63                            CalculateDelayedRuntime(delay), nestable);
     64 #if defined(OS_WIN)
     65   // We consider the task needs a high resolution timer if the delay is
     66   // more than 0 and less than 32ms. This caps the relative error to
     67   // less than 50% : a 33ms wait can wake at 48ms since the default
     68   // resolution on Windows is between 10 and 15ms.
     69   if (delay > TimeDelta() &&
     70       delay.InMilliseconds() < (2 * Time::kMinLowResolutionThresholdMs)) {
     71     pending_task.is_high_res = true;
     72   }
     73 #endif
     74 
     75   if (!delay.is_zero())
     76     UMA_HISTOGRAM_LONG_TIMES("MessageLoop.DelayedTaskQueue.PostedDelay", delay);
     77 
     78   return PostPendingTask(&pending_task);
     79 }
     80 
     81 void IncomingTaskQueue::Shutdown() {
     82   AutoLock auto_lock(incoming_queue_lock_);
     83   accept_new_tasks_ = false;
     84 }
     85 
     86 void IncomingTaskQueue::ReportMetricsOnIdle() const {
     87   UMA_HISTOGRAM_COUNTS_1M(
     88       "MessageLoop.DelayedTaskQueue.PendingTasksCountOnIdle",
     89       delayed_tasks_.Size());
     90 }
     91 
     92 IncomingTaskQueue::TriageQueue::TriageQueue(IncomingTaskQueue* outer)
     93     : outer_(outer) {}
     94 
     95 IncomingTaskQueue::TriageQueue::~TriageQueue() = default;
     96 
     97 const PendingTask& IncomingTaskQueue::TriageQueue::Peek() {
     98   DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
     99   ReloadFromIncomingQueueIfEmpty();
    100   DCHECK(!queue_.empty());
    101   return queue_.front();
    102 }
    103 
    104 PendingTask IncomingTaskQueue::TriageQueue::Pop() {
    105   DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
    106   ReloadFromIncomingQueueIfEmpty();
    107   DCHECK(!queue_.empty());
    108   PendingTask pending_task = std::move(queue_.front());
    109   queue_.pop();
    110   return pending_task;
    111 }
    112 
    113 bool IncomingTaskQueue::TriageQueue::HasTasks() {
    114   DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
    115   ReloadFromIncomingQueueIfEmpty();
    116   return !queue_.empty();
    117 }
    118 
    119 void IncomingTaskQueue::TriageQueue::Clear() {
    120   DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
    121 
    122   // Clear() should be invoked before WillDestroyCurrentMessageLoop().
    123   DCHECK(outer_->accept_new_tasks_);
    124 
    125   // Delete all currently pending tasks but not tasks potentially posted from
    126   // their destructors. See ~MessageLoop() for the full logic mitigating against
    127   // infite loops when clearing pending tasks. The ScopedClosureRunner below
    128   // will be bound to a task posted at the end of the queue. After it is posted,
    129   // tasks will be deleted one by one, when the bound ScopedClosureRunner is
    130   // deleted and sets |deleted_all_originally_pending|, we know we've deleted
    131   // all originally pending tasks.
    132   bool deleted_all_originally_pending = false;
    133   ScopedClosureRunner capture_deleted_all_originally_pending(BindOnce(
    134       [](bool* deleted_all_originally_pending) {
    135         *deleted_all_originally_pending = true;
    136       },
    137       Unretained(&deleted_all_originally_pending)));
    138   outer_->AddToIncomingQueue(
    139       FROM_HERE,
    140       BindOnce([](ScopedClosureRunner) {},
    141                std::move(capture_deleted_all_originally_pending)),
    142       TimeDelta(), Nestable::kNestable);
    143 
    144   while (!deleted_all_originally_pending) {
    145     PendingTask pending_task = Pop();
    146 
    147     if (!pending_task.delayed_run_time.is_null()) {
    148       outer_->delayed_tasks().Push(std::move(pending_task));
    149     }
    150   }
    151 }
    152 
    153 void IncomingTaskQueue::TriageQueue::ReloadFromIncomingQueueIfEmpty() {
    154   DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
    155   if (queue_.empty()) {
    156     outer_->ReloadWorkQueue(&queue_);
    157   }
    158 }
    159 
    160 IncomingTaskQueue::DelayedQueue::DelayedQueue() {
    161   DETACH_FROM_SEQUENCE(sequence_checker_);
    162 }
    163 
    164 IncomingTaskQueue::DelayedQueue::~DelayedQueue() = default;
    165 
    166 void IncomingTaskQueue::DelayedQueue::Push(PendingTask pending_task) {
    167   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    168 
    169   if (pending_task.is_high_res)
    170     ++pending_high_res_tasks_;
    171 
    172   queue_.push(std::move(pending_task));
    173 }
    174 
    175 const PendingTask& IncomingTaskQueue::DelayedQueue::Peek() {
    176   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    177   DCHECK(!queue_.empty());
    178   return queue_.top();
    179 }
    180 
    181 PendingTask IncomingTaskQueue::DelayedQueue::Pop() {
    182   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    183   DCHECK(!queue_.empty());
    184   PendingTask delayed_task = std::move(const_cast<PendingTask&>(queue_.top()));
    185   queue_.pop();
    186 
    187   if (delayed_task.is_high_res)
    188     --pending_high_res_tasks_;
    189   DCHECK_GE(pending_high_res_tasks_, 0);
    190 
    191   return delayed_task;
    192 }
    193 
    194 bool IncomingTaskQueue::DelayedQueue::HasTasks() {
    195   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    196   // TODO(robliao): The other queues don't check for IsCancelled(). Should they?
    197   while (!queue_.empty() && Peek().task.IsCancelled())
    198     Pop();
    199 
    200   return !queue_.empty();
    201 }
    202 
    203 void IncomingTaskQueue::DelayedQueue::Clear() {
    204   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    205   while (!queue_.empty())
    206     Pop();
    207 }
    208 
    209 size_t IncomingTaskQueue::DelayedQueue::Size() const {
    210   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    211   return queue_.size();
    212 }
    213 
    214 IncomingTaskQueue::DeferredQueue::DeferredQueue() {
    215   DETACH_FROM_SEQUENCE(sequence_checker_);
    216 }
    217 
    218 IncomingTaskQueue::DeferredQueue::~DeferredQueue() = default;
    219 
    220 void IncomingTaskQueue::DeferredQueue::Push(PendingTask pending_task) {
    221   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    222   queue_.push(std::move(pending_task));
    223 }
    224 
    225 const PendingTask& IncomingTaskQueue::DeferredQueue::Peek() {
    226   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    227   DCHECK(!queue_.empty());
    228   return queue_.front();
    229 }
    230 
    231 PendingTask IncomingTaskQueue::DeferredQueue::Pop() {
    232   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    233   DCHECK(!queue_.empty());
    234   PendingTask deferred_task = std::move(queue_.front());
    235   queue_.pop();
    236   return deferred_task;
    237 }
    238 
    239 bool IncomingTaskQueue::DeferredQueue::HasTasks() {
    240   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    241   return !queue_.empty();
    242 }
    243 
    244 void IncomingTaskQueue::DeferredQueue::Clear() {
    245   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    246   while (!queue_.empty())
    247     Pop();
    248 }
    249 
    250 bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) {
    251   // Warning: Don't try to short-circuit, and handle this thread's tasks more
    252   // directly, as it could starve handling of foreign threads.  Put every task
    253   // into this queue.
    254   bool accept_new_tasks;
    255   bool was_empty = false;
    256   {
    257     AutoLock auto_lock(incoming_queue_lock_);
    258     accept_new_tasks = accept_new_tasks_;
    259     if (accept_new_tasks) {
    260       was_empty =
    261           PostPendingTaskLockRequired(pending_task) && triage_queue_empty_;
    262     }
    263   }
    264 
    265   if (!accept_new_tasks) {
    266     // Clear the pending task outside of |incoming_queue_lock_| to prevent any
    267     // chance of self-deadlock if destroying a task also posts a task to this
    268     // queue.
    269     pending_task->task.Reset();
    270     return false;
    271   }
    272 
    273   // Let |task_queue_observer_| know of the queued task. This is done outside
    274   // |incoming_queue_lock_| to avoid conflating locks (DidQueueTask() can also
    275   // use a lock).
    276   task_queue_observer_->DidQueueTask(was_empty);
    277 
    278   return true;
    279 }
    280 
    281 bool IncomingTaskQueue::PostPendingTaskLockRequired(PendingTask* pending_task) {
    282   incoming_queue_lock_.AssertAcquired();
    283 
    284   // Initialize the sequence number. The sequence number is used for delayed
    285   // tasks (to facilitate FIFO sorting when two tasks have the same
    286   // delayed_run_time value) and for identifying the task in about:tracing.
    287   pending_task->sequence_num = next_sequence_num_++;
    288 
    289   task_queue_observer_->WillQueueTask(pending_task);
    290 
    291   bool was_empty = incoming_queue_.empty();
    292   incoming_queue_.push(std::move(*pending_task));
    293   return was_empty;
    294 }
    295 
    296 void IncomingTaskQueue::ReloadWorkQueue(TaskQueue* work_queue) {
    297   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    298 
    299   // Make sure no tasks are lost.
    300   DCHECK(work_queue->empty());
    301 
    302   // Acquire all we can from the inter-thread queue with one lock acquisition.
    303   AutoLock lock(incoming_queue_lock_);
    304   incoming_queue_.swap(*work_queue);
    305   triage_queue_empty_ = work_queue->empty();
    306 }
    307 
    308 }  // namespace internal
    309 }  // namespace base
    310