1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/message_loop/incoming_task_queue.h" 6 7 #include <limits> 8 #include <utility> 9 10 #include "base/bind.h" 11 #include "base/callback_helpers.h" 12 #include "base/location.h" 13 #include "base/metrics/histogram_macros.h" 14 #include "base/synchronization/waitable_event.h" 15 #include "base/time/time.h" 16 #include "build/build_config.h" 17 18 namespace base { 19 namespace internal { 20 21 namespace { 22 23 #if DCHECK_IS_ON() 24 // Delays larger than this are often bogus, and a warning should be emitted in 25 // debug builds to warn developers. http://crbug.com/450045 26 constexpr TimeDelta kTaskDelayWarningThreshold = TimeDelta::FromDays(14); 27 #endif 28 29 TimeTicks CalculateDelayedRuntime(TimeDelta delay) { 30 TimeTicks delayed_run_time; 31 if (delay > TimeDelta()) 32 delayed_run_time = TimeTicks::Now() + delay; 33 else 34 DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative"; 35 return delayed_run_time; 36 } 37 38 } // namespace 39 40 IncomingTaskQueue::IncomingTaskQueue( 41 std::unique_ptr<Observer> task_queue_observer) 42 : task_queue_observer_(std::move(task_queue_observer)), 43 triage_tasks_(this) { 44 // The constructing sequence is not necessarily the running sequence, e.g. in 45 // the case of a MessageLoop created unbound. 46 DETACH_FROM_SEQUENCE(sequence_checker_); 47 } 48 49 IncomingTaskQueue::~IncomingTaskQueue() = default; 50 51 bool IncomingTaskQueue::AddToIncomingQueue(const Location& from_here, 52 OnceClosure task, 53 TimeDelta delay, 54 Nestable nestable) { 55 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167 56 // for details. 57 CHECK(task); 58 DLOG_IF(WARNING, delay > kTaskDelayWarningThreshold) 59 << "Requesting super-long task delay period of " << delay.InSeconds() 60 << " seconds from here: " << from_here.ToString(); 61 62 PendingTask pending_task(from_here, std::move(task), 63 CalculateDelayedRuntime(delay), nestable); 64 #if defined(OS_WIN) 65 // We consider the task needs a high resolution timer if the delay is 66 // more than 0 and less than 32ms. This caps the relative error to 67 // less than 50% : a 33ms wait can wake at 48ms since the default 68 // resolution on Windows is between 10 and 15ms. 69 if (delay > TimeDelta() && 70 delay.InMilliseconds() < (2 * Time::kMinLowResolutionThresholdMs)) { 71 pending_task.is_high_res = true; 72 } 73 #endif 74 75 if (!delay.is_zero()) 76 UMA_HISTOGRAM_LONG_TIMES("MessageLoop.DelayedTaskQueue.PostedDelay", delay); 77 78 return PostPendingTask(&pending_task); 79 } 80 81 void IncomingTaskQueue::Shutdown() { 82 AutoLock auto_lock(incoming_queue_lock_); 83 accept_new_tasks_ = false; 84 } 85 86 void IncomingTaskQueue::ReportMetricsOnIdle() const { 87 UMA_HISTOGRAM_COUNTS_1M( 88 "MessageLoop.DelayedTaskQueue.PendingTasksCountOnIdle", 89 delayed_tasks_.Size()); 90 } 91 92 IncomingTaskQueue::TriageQueue::TriageQueue(IncomingTaskQueue* outer) 93 : outer_(outer) {} 94 95 IncomingTaskQueue::TriageQueue::~TriageQueue() = default; 96 97 const PendingTask& IncomingTaskQueue::TriageQueue::Peek() { 98 DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_); 99 ReloadFromIncomingQueueIfEmpty(); 100 DCHECK(!queue_.empty()); 101 return queue_.front(); 102 } 103 104 PendingTask IncomingTaskQueue::TriageQueue::Pop() { 105 DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_); 106 ReloadFromIncomingQueueIfEmpty(); 107 DCHECK(!queue_.empty()); 108 PendingTask pending_task = std::move(queue_.front()); 109 queue_.pop(); 110 return pending_task; 111 } 112 113 bool IncomingTaskQueue::TriageQueue::HasTasks() { 114 DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_); 115 ReloadFromIncomingQueueIfEmpty(); 116 return !queue_.empty(); 117 } 118 119 void IncomingTaskQueue::TriageQueue::Clear() { 120 DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_); 121 122 // Clear() should be invoked before WillDestroyCurrentMessageLoop(). 123 DCHECK(outer_->accept_new_tasks_); 124 125 // Delete all currently pending tasks but not tasks potentially posted from 126 // their destructors. See ~MessageLoop() for the full logic mitigating against 127 // infite loops when clearing pending tasks. The ScopedClosureRunner below 128 // will be bound to a task posted at the end of the queue. After it is posted, 129 // tasks will be deleted one by one, when the bound ScopedClosureRunner is 130 // deleted and sets |deleted_all_originally_pending|, we know we've deleted 131 // all originally pending tasks. 132 bool deleted_all_originally_pending = false; 133 ScopedClosureRunner capture_deleted_all_originally_pending(BindOnce( 134 [](bool* deleted_all_originally_pending) { 135 *deleted_all_originally_pending = true; 136 }, 137 Unretained(&deleted_all_originally_pending))); 138 outer_->AddToIncomingQueue( 139 FROM_HERE, 140 BindOnce([](ScopedClosureRunner) {}, 141 std::move(capture_deleted_all_originally_pending)), 142 TimeDelta(), Nestable::kNestable); 143 144 while (!deleted_all_originally_pending) { 145 PendingTask pending_task = Pop(); 146 147 if (!pending_task.delayed_run_time.is_null()) { 148 outer_->delayed_tasks().Push(std::move(pending_task)); 149 } 150 } 151 } 152 153 void IncomingTaskQueue::TriageQueue::ReloadFromIncomingQueueIfEmpty() { 154 DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_); 155 if (queue_.empty()) { 156 outer_->ReloadWorkQueue(&queue_); 157 } 158 } 159 160 IncomingTaskQueue::DelayedQueue::DelayedQueue() { 161 DETACH_FROM_SEQUENCE(sequence_checker_); 162 } 163 164 IncomingTaskQueue::DelayedQueue::~DelayedQueue() = default; 165 166 void IncomingTaskQueue::DelayedQueue::Push(PendingTask pending_task) { 167 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 168 169 if (pending_task.is_high_res) 170 ++pending_high_res_tasks_; 171 172 queue_.push(std::move(pending_task)); 173 } 174 175 const PendingTask& IncomingTaskQueue::DelayedQueue::Peek() { 176 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 177 DCHECK(!queue_.empty()); 178 return queue_.top(); 179 } 180 181 PendingTask IncomingTaskQueue::DelayedQueue::Pop() { 182 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 183 DCHECK(!queue_.empty()); 184 PendingTask delayed_task = std::move(const_cast<PendingTask&>(queue_.top())); 185 queue_.pop(); 186 187 if (delayed_task.is_high_res) 188 --pending_high_res_tasks_; 189 DCHECK_GE(pending_high_res_tasks_, 0); 190 191 return delayed_task; 192 } 193 194 bool IncomingTaskQueue::DelayedQueue::HasTasks() { 195 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 196 // TODO(robliao): The other queues don't check for IsCancelled(). Should they? 197 while (!queue_.empty() && Peek().task.IsCancelled()) 198 Pop(); 199 200 return !queue_.empty(); 201 } 202 203 void IncomingTaskQueue::DelayedQueue::Clear() { 204 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 205 while (!queue_.empty()) 206 Pop(); 207 } 208 209 size_t IncomingTaskQueue::DelayedQueue::Size() const { 210 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 211 return queue_.size(); 212 } 213 214 IncomingTaskQueue::DeferredQueue::DeferredQueue() { 215 DETACH_FROM_SEQUENCE(sequence_checker_); 216 } 217 218 IncomingTaskQueue::DeferredQueue::~DeferredQueue() = default; 219 220 void IncomingTaskQueue::DeferredQueue::Push(PendingTask pending_task) { 221 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 222 queue_.push(std::move(pending_task)); 223 } 224 225 const PendingTask& IncomingTaskQueue::DeferredQueue::Peek() { 226 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 227 DCHECK(!queue_.empty()); 228 return queue_.front(); 229 } 230 231 PendingTask IncomingTaskQueue::DeferredQueue::Pop() { 232 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 233 DCHECK(!queue_.empty()); 234 PendingTask deferred_task = std::move(queue_.front()); 235 queue_.pop(); 236 return deferred_task; 237 } 238 239 bool IncomingTaskQueue::DeferredQueue::HasTasks() { 240 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 241 return !queue_.empty(); 242 } 243 244 void IncomingTaskQueue::DeferredQueue::Clear() { 245 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 246 while (!queue_.empty()) 247 Pop(); 248 } 249 250 bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) { 251 // Warning: Don't try to short-circuit, and handle this thread's tasks more 252 // directly, as it could starve handling of foreign threads. Put every task 253 // into this queue. 254 bool accept_new_tasks; 255 bool was_empty = false; 256 { 257 AutoLock auto_lock(incoming_queue_lock_); 258 accept_new_tasks = accept_new_tasks_; 259 if (accept_new_tasks) { 260 was_empty = 261 PostPendingTaskLockRequired(pending_task) && triage_queue_empty_; 262 } 263 } 264 265 if (!accept_new_tasks) { 266 // Clear the pending task outside of |incoming_queue_lock_| to prevent any 267 // chance of self-deadlock if destroying a task also posts a task to this 268 // queue. 269 pending_task->task.Reset(); 270 return false; 271 } 272 273 // Let |task_queue_observer_| know of the queued task. This is done outside 274 // |incoming_queue_lock_| to avoid conflating locks (DidQueueTask() can also 275 // use a lock). 276 task_queue_observer_->DidQueueTask(was_empty); 277 278 return true; 279 } 280 281 bool IncomingTaskQueue::PostPendingTaskLockRequired(PendingTask* pending_task) { 282 incoming_queue_lock_.AssertAcquired(); 283 284 // Initialize the sequence number. The sequence number is used for delayed 285 // tasks (to facilitate FIFO sorting when two tasks have the same 286 // delayed_run_time value) and for identifying the task in about:tracing. 287 pending_task->sequence_num = next_sequence_num_++; 288 289 task_queue_observer_->WillQueueTask(pending_task); 290 291 bool was_empty = incoming_queue_.empty(); 292 incoming_queue_.push(std::move(*pending_task)); 293 return was_empty; 294 } 295 296 void IncomingTaskQueue::ReloadWorkQueue(TaskQueue* work_queue) { 297 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 298 299 // Make sure no tasks are lost. 300 DCHECK(work_queue->empty()); 301 302 // Acquire all we can from the inter-thread queue with one lock acquisition. 303 AutoLock lock(incoming_queue_lock_); 304 incoming_queue_.swap(*work_queue); 305 triage_queue_empty_ = work_queue->empty(); 306 } 307 308 } // namespace internal 309 } // namespace base 310