1 // Copyright 2015 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/task/sequence_manager/task_queue_impl.h" 6 7 #include <memory> 8 #include <utility> 9 10 #include "base/strings/stringprintf.h" 11 #include "base/task/sequence_manager/sequence_manager_impl.h" 12 #include "base/task/sequence_manager/time_domain.h" 13 #include "base/task/sequence_manager/work_queue.h" 14 #include "base/time/time.h" 15 #include "base/trace_event/blame_context.h" 16 17 namespace base { 18 namespace sequence_manager { 19 20 // static 21 const char* TaskQueue::PriorityToString(TaskQueue::QueuePriority priority) { 22 switch (priority) { 23 case kControlPriority: 24 return "control"; 25 case kHighestPriority: 26 return "highest"; 27 case kHighPriority: 28 return "high"; 29 case kNormalPriority: 30 return "normal"; 31 case kLowPriority: 32 return "low"; 33 case kBestEffortPriority: 34 return "best_effort"; 35 default: 36 NOTREACHED(); 37 return nullptr; 38 } 39 } 40 41 namespace internal { 42 43 TaskQueueImpl::TaskQueueImpl(SequenceManagerImpl* sequence_manager, 44 TimeDomain* time_domain, 45 const TaskQueue::Spec& spec) 46 : name_(spec.name), 47 thread_id_(PlatformThread::CurrentId()), 48 any_thread_(sequence_manager, time_domain), 49 main_thread_only_(sequence_manager, this, time_domain), 50 should_monitor_quiescence_(spec.should_monitor_quiescence), 51 should_notify_observers_(spec.should_notify_observers) { 52 DCHECK(time_domain); 53 } 54 55 TaskQueueImpl::~TaskQueueImpl() { 56 #if DCHECK_IS_ON() 57 AutoLock lock(any_thread_lock_); 58 // NOTE this check shouldn't fire because |SequenceManagerImpl::queues_| 59 // contains a strong reference to this TaskQueueImpl and the 60 // SequenceManagerImpl destructor calls UnregisterTaskQueue on all task 61 // queues. 62 DCHECK(!any_thread().sequence_manager) 63 << "UnregisterTaskQueue must be called first!"; 64 #endif 65 } 66 67 TaskQueueImpl::PostTaskResult::PostTaskResult() 68 : success(false), task(OnceClosure(), Location()) {} 69 70 TaskQueueImpl::PostTaskResult::PostTaskResult(bool success, 71 TaskQueue::PostedTask task) 72 : success(success), task(std::move(task)) {} 73 74 TaskQueueImpl::PostTaskResult::PostTaskResult(PostTaskResult&& move_from) 75 : success(move_from.success), task(std::move(move_from.task)) {} 76 77 TaskQueueImpl::PostTaskResult::~PostTaskResult() = default; 78 79 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostTaskResult::Success() { 80 return PostTaskResult(true, TaskQueue::PostedTask(OnceClosure(), Location())); 81 } 82 83 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostTaskResult::Fail( 84 TaskQueue::PostedTask task) { 85 return PostTaskResult(false, std::move(task)); 86 } 87 88 TaskQueueImpl::Task::Task(TaskQueue::PostedTask task, 89 TimeTicks desired_run_time, 90 EnqueueOrder sequence_number) 91 : TaskQueue::Task(std::move(task), desired_run_time) { 92 // It might wrap around to a negative number but it's handled properly. 93 sequence_num = static_cast<int>(sequence_number); 94 } 95 96 TaskQueueImpl::Task::Task(TaskQueue::PostedTask task, 97 TimeTicks desired_run_time, 98 EnqueueOrder sequence_number, 99 EnqueueOrder enqueue_order) 100 : TaskQueue::Task(std::move(task), desired_run_time), 101 enqueue_order_(enqueue_order) { 102 // It might wrap around to a negative number but it's handled properly. 103 sequence_num = static_cast<int>(sequence_number); 104 } 105 106 TaskQueueImpl::AnyThread::AnyThread(SequenceManagerImpl* sequence_manager, 107 TimeDomain* time_domain) 108 : sequence_manager(sequence_manager), time_domain(time_domain) {} 109 110 TaskQueueImpl::AnyThread::~AnyThread() = default; 111 112 TaskQueueImpl::MainThreadOnly::MainThreadOnly( 113 SequenceManagerImpl* sequence_manager, 114 TaskQueueImpl* task_queue, 115 TimeDomain* time_domain) 116 : sequence_manager(sequence_manager), 117 time_domain(time_domain), 118 delayed_work_queue( 119 new WorkQueue(task_queue, "delayed", WorkQueue::QueueType::kDelayed)), 120 immediate_work_queue(new WorkQueue(task_queue, 121 "immediate", 122 WorkQueue::QueueType::kImmediate)), 123 set_index(0), 124 is_enabled_refcount(0), 125 voter_refcount(0), 126 blame_context(nullptr), 127 is_enabled_for_test(true) {} 128 129 TaskQueueImpl::MainThreadOnly::~MainThreadOnly() = default; 130 131 void TaskQueueImpl::UnregisterTaskQueue() { 132 TaskDeque immediate_incoming_queue; 133 134 { 135 AutoLock lock(any_thread_lock_); 136 AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_); 137 138 if (main_thread_only().time_domain) 139 main_thread_only().time_domain->UnregisterQueue(this); 140 141 if (!any_thread().sequence_manager) 142 return; 143 144 main_thread_only().on_task_completed_handler = OnTaskCompletedHandler(); 145 any_thread().time_domain = nullptr; 146 main_thread_only().time_domain = nullptr; 147 148 any_thread().sequence_manager = nullptr; 149 main_thread_only().sequence_manager = nullptr; 150 any_thread().on_next_wake_up_changed_callback = 151 OnNextWakeUpChangedCallback(); 152 main_thread_only().on_next_wake_up_changed_callback = 153 OnNextWakeUpChangedCallback(); 154 immediate_incoming_queue.swap(immediate_incoming_queue_); 155 } 156 157 // It is possible for a task to hold a scoped_refptr to this, which 158 // will lead to TaskQueueImpl destructor being called when deleting a task. 159 // To avoid use-after-free, we need to clear all fields of a task queue 160 // before starting to delete the tasks. 161 // All work queues and priority queues containing tasks should be moved to 162 // local variables on stack (std::move for unique_ptrs and swap for queues) 163 // before clearing them and deleting tasks. 164 165 // Flush the queues outside of the lock because TSAN complains about a lock 166 // order inversion for tasks that are posted from within a lock, with a 167 // destructor that acquires the same lock. 168 169 std::priority_queue<Task> delayed_incoming_queue; 170 delayed_incoming_queue.swap(main_thread_only().delayed_incoming_queue); 171 172 std::unique_ptr<WorkQueue> immediate_work_queue = 173 std::move(main_thread_only().immediate_work_queue); 174 std::unique_ptr<WorkQueue> delayed_work_queue = 175 std::move(main_thread_only().delayed_work_queue); 176 } 177 178 const char* TaskQueueImpl::GetName() const { 179 return name_; 180 } 181 182 bool TaskQueueImpl::RunsTasksInCurrentSequence() const { 183 return PlatformThread::CurrentId() == thread_id_; 184 } 185 186 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostDelayedTask( 187 TaskQueue::PostedTask task) { 188 if (task.delay.is_zero()) 189 return PostImmediateTaskImpl(std::move(task)); 190 191 return PostDelayedTaskImpl(std::move(task)); 192 } 193 194 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostImmediateTaskImpl( 195 TaskQueue::PostedTask task) { 196 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167 197 // for details. 198 CHECK(task.callback); 199 AutoLock lock(any_thread_lock_); 200 if (!any_thread().sequence_manager) 201 return PostTaskResult::Fail(std::move(task)); 202 203 EnqueueOrder sequence_number = 204 any_thread().sequence_manager->GetNextSequenceNumber(); 205 206 PushOntoImmediateIncomingQueueLocked(Task(std::move(task), 207 any_thread().time_domain->Now(), 208 sequence_number, sequence_number)); 209 return PostTaskResult::Success(); 210 } 211 212 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostDelayedTaskImpl( 213 TaskQueue::PostedTask task) { 214 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167 215 // for details. 216 CHECK(task.callback); 217 DCHECK_GT(task.delay, TimeDelta()); 218 if (PlatformThread::CurrentId() == thread_id_) { 219 // Lock-free fast path for delayed tasks posted from the main thread. 220 if (!main_thread_only().sequence_manager) 221 return PostTaskResult::Fail(std::move(task)); 222 223 EnqueueOrder sequence_number = 224 main_thread_only().sequence_manager->GetNextSequenceNumber(); 225 226 TimeTicks time_domain_now = main_thread_only().time_domain->Now(); 227 TimeTicks time_domain_delayed_run_time = time_domain_now + task.delay; 228 PushOntoDelayedIncomingQueueFromMainThread( 229 Task(std::move(task), time_domain_delayed_run_time, sequence_number), 230 time_domain_now); 231 } else { 232 // NOTE posting a delayed task from a different thread is not expected to 233 // be common. This pathway is less optimal than perhaps it could be 234 // because it causes two main thread tasks to be run. Should this 235 // assumption prove to be false in future, we may need to revisit this. 236 AutoLock lock(any_thread_lock_); 237 if (!any_thread().sequence_manager) 238 return PostTaskResult::Fail(std::move(task)); 239 240 EnqueueOrder sequence_number = 241 any_thread().sequence_manager->GetNextSequenceNumber(); 242 243 TimeTicks time_domain_now = any_thread().time_domain->Now(); 244 TimeTicks time_domain_delayed_run_time = time_domain_now + task.delay; 245 PushOntoDelayedIncomingQueueLocked( 246 Task(std::move(task), time_domain_delayed_run_time, sequence_number)); 247 } 248 return PostTaskResult::Success(); 249 } 250 251 void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread( 252 Task pending_task, 253 TimeTicks now) { 254 main_thread_only().sequence_manager->WillQueueTask(&pending_task); 255 main_thread_only().delayed_incoming_queue.push(std::move(pending_task)); 256 257 LazyNow lazy_now(now); 258 UpdateDelayedWakeUp(&lazy_now); 259 260 TraceQueueSize(); 261 } 262 263 void TaskQueueImpl::PushOntoDelayedIncomingQueueLocked(Task pending_task) { 264 any_thread().sequence_manager->WillQueueTask(&pending_task); 265 266 EnqueueOrder thread_hop_task_sequence_number = 267 any_thread().sequence_manager->GetNextSequenceNumber(); 268 // TODO(altimin): Add a copy method to Task to capture metadata here. 269 PushOntoImmediateIncomingQueueLocked(Task( 270 TaskQueue::PostedTask(BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask, 271 Unretained(this), std::move(pending_task)), 272 FROM_HERE, TimeDelta(), Nestable::kNonNestable, 273 pending_task.task_type()), 274 TimeTicks(), thread_hop_task_sequence_number, 275 thread_hop_task_sequence_number)); 276 } 277 278 void TaskQueueImpl::ScheduleDelayedWorkTask(Task pending_task) { 279 DCHECK(main_thread_checker_.CalledOnValidThread()); 280 TimeTicks delayed_run_time = pending_task.delayed_run_time; 281 TimeTicks time_domain_now = main_thread_only().time_domain->Now(); 282 if (delayed_run_time <= time_domain_now) { 283 // If |delayed_run_time| is in the past then push it onto the work queue 284 // immediately. To ensure the right task ordering we need to temporarily 285 // push it onto the |delayed_incoming_queue|. 286 delayed_run_time = time_domain_now; 287 pending_task.delayed_run_time = time_domain_now; 288 main_thread_only().delayed_incoming_queue.push(std::move(pending_task)); 289 LazyNow lazy_now(time_domain_now); 290 WakeUpForDelayedWork(&lazy_now); 291 } else { 292 // If |delayed_run_time| is in the future we can queue it as normal. 293 PushOntoDelayedIncomingQueueFromMainThread(std::move(pending_task), 294 time_domain_now); 295 } 296 TraceQueueSize(); 297 } 298 299 void TaskQueueImpl::PushOntoImmediateIncomingQueueLocked(Task task) { 300 // If the |immediate_incoming_queue| is empty we need a DoWork posted to make 301 // it run. 302 bool was_immediate_incoming_queue_empty; 303 304 EnqueueOrder sequence_number = task.enqueue_order(); 305 TimeTicks desired_run_time = task.delayed_run_time; 306 307 { 308 AutoLock lock(immediate_incoming_queue_lock_); 309 was_immediate_incoming_queue_empty = immediate_incoming_queue().empty(); 310 any_thread().sequence_manager->WillQueueTask(&task); 311 immediate_incoming_queue().push_back(std::move(task)); 312 } 313 314 if (was_immediate_incoming_queue_empty) { 315 // However there's no point posting a DoWork for a blocked queue. NB we can 316 // only tell if it's disabled from the main thread. 317 bool queue_is_blocked = 318 RunsTasksInCurrentSequence() && 319 (!IsQueueEnabled() || main_thread_only().current_fence); 320 any_thread().sequence_manager->OnQueueHasIncomingImmediateWork( 321 this, sequence_number, queue_is_blocked); 322 if (!any_thread().on_next_wake_up_changed_callback.is_null()) 323 any_thread().on_next_wake_up_changed_callback.Run(desired_run_time); 324 } 325 326 TraceQueueSize(); 327 } 328 329 void TaskQueueImpl::ReloadImmediateWorkQueueIfEmpty() { 330 if (!main_thread_only().immediate_work_queue->Empty()) 331 return; 332 333 main_thread_only().immediate_work_queue->ReloadEmptyImmediateQueue(); 334 } 335 336 void TaskQueueImpl::ReloadEmptyImmediateQueue(TaskDeque* queue) { 337 DCHECK(queue->empty()); 338 339 AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_); 340 queue->swap(immediate_incoming_queue()); 341 342 // Activate delayed fence if necessary. This is ideologically similar to 343 // ActivateDelayedFenceIfNeeded, but due to immediate tasks being posted 344 // from any thread we can't generate an enqueue order for the fence there, 345 // so we have to check all immediate tasks and use their enqueue order for 346 // a fence. 347 if (main_thread_only().delayed_fence) { 348 for (const Task& task : *queue) { 349 if (task.delayed_run_time >= main_thread_only().delayed_fence.value()) { 350 main_thread_only().delayed_fence = nullopt; 351 DCHECK(!main_thread_only().current_fence); 352 main_thread_only().current_fence = task.enqueue_order(); 353 // Do not trigger WorkQueueSets notification when taking incoming 354 // immediate queue. 355 main_thread_only().immediate_work_queue->InsertFenceSilently( 356 main_thread_only().current_fence); 357 main_thread_only().delayed_work_queue->InsertFenceSilently( 358 main_thread_only().current_fence); 359 break; 360 } 361 } 362 } 363 } 364 365 bool TaskQueueImpl::IsEmpty() const { 366 if (!main_thread_only().delayed_work_queue->Empty() || 367 !main_thread_only().delayed_incoming_queue.empty() || 368 !main_thread_only().immediate_work_queue->Empty()) { 369 return false; 370 } 371 372 AutoLock lock(immediate_incoming_queue_lock_); 373 return immediate_incoming_queue().empty(); 374 } 375 376 size_t TaskQueueImpl::GetNumberOfPendingTasks() const { 377 size_t task_count = 0; 378 task_count += main_thread_only().delayed_work_queue->Size(); 379 task_count += main_thread_only().delayed_incoming_queue.size(); 380 task_count += main_thread_only().immediate_work_queue->Size(); 381 382 AutoLock lock(immediate_incoming_queue_lock_); 383 task_count += immediate_incoming_queue().size(); 384 return task_count; 385 } 386 387 bool TaskQueueImpl::HasTaskToRunImmediately() const { 388 // Any work queue tasks count as immediate work. 389 if (!main_thread_only().delayed_work_queue->Empty() || 390 !main_thread_only().immediate_work_queue->Empty()) { 391 return true; 392 } 393 394 // Tasks on |delayed_incoming_queue| that could run now, count as 395 // immediate work. 396 if (!main_thread_only().delayed_incoming_queue.empty() && 397 main_thread_only().delayed_incoming_queue.top().delayed_run_time <= 398 main_thread_only().time_domain->CreateLazyNow().Now()) { 399 return true; 400 } 401 402 // Finally tasks on |immediate_incoming_queue| count as immediate work. 403 AutoLock lock(immediate_incoming_queue_lock_); 404 return !immediate_incoming_queue().empty(); 405 } 406 407 Optional<TaskQueueImpl::DelayedWakeUp> 408 TaskQueueImpl::GetNextScheduledWakeUpImpl() { 409 // Note we don't scheduled a wake-up for disabled queues. 410 if (main_thread_only().delayed_incoming_queue.empty() || !IsQueueEnabled()) 411 return nullopt; 412 413 return main_thread_only().delayed_incoming_queue.top().delayed_wake_up(); 414 } 415 416 Optional<TimeTicks> TaskQueueImpl::GetNextScheduledWakeUp() { 417 Optional<DelayedWakeUp> wake_up = GetNextScheduledWakeUpImpl(); 418 if (!wake_up) 419 return nullopt; 420 return wake_up->time; 421 } 422 423 void TaskQueueImpl::WakeUpForDelayedWork(LazyNow* lazy_now) { 424 // Enqueue all delayed tasks that should be running now, skipping any that 425 // have been canceled. 426 while (!main_thread_only().delayed_incoming_queue.empty()) { 427 Task& task = 428 const_cast<Task&>(main_thread_only().delayed_incoming_queue.top()); 429 if (!task.task || task.task.IsCancelled()) { 430 main_thread_only().delayed_incoming_queue.pop(); 431 continue; 432 } 433 if (task.delayed_run_time > lazy_now->Now()) 434 break; 435 ActivateDelayedFenceIfNeeded(task.delayed_run_time); 436 task.set_enqueue_order( 437 main_thread_only().sequence_manager->GetNextSequenceNumber()); 438 main_thread_only().delayed_work_queue->Push(std::move(task)); 439 main_thread_only().delayed_incoming_queue.pop(); 440 441 // Normally WakeUpForDelayedWork is called inside DoWork, but it also 442 // can be called elsewhere (e.g. tests and fast-path for posting 443 // delayed tasks). Ensure that there is a DoWork posting. No-op inside 444 // existing DoWork due to DoWork deduplication. 445 if (IsQueueEnabled() || !main_thread_only().current_fence) { 446 main_thread_only().sequence_manager->MaybeScheduleImmediateWork( 447 FROM_HERE); 448 } 449 } 450 451 UpdateDelayedWakeUp(lazy_now); 452 } 453 454 void TaskQueueImpl::TraceQueueSize() const { 455 bool is_tracing; 456 TRACE_EVENT_CATEGORY_GROUP_ENABLED( 457 TRACE_DISABLED_BY_DEFAULT("sequence_manager"), &is_tracing); 458 if (!is_tracing) 459 return; 460 461 // It's only safe to access the work queues from the main thread. 462 // TODO(alexclarke): We should find another way of tracing this 463 if (PlatformThread::CurrentId() != thread_id_) 464 return; 465 466 AutoLock lock(immediate_incoming_queue_lock_); 467 TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("sequence_manager"), GetName(), 468 immediate_incoming_queue().size() + 469 main_thread_only().immediate_work_queue->Size() + 470 main_thread_only().delayed_work_queue->Size() + 471 main_thread_only().delayed_incoming_queue.size()); 472 } 473 474 void TaskQueueImpl::SetQueuePriority(TaskQueue::QueuePriority priority) { 475 if (!main_thread_only().sequence_manager || priority == GetQueuePriority()) 476 return; 477 main_thread_only() 478 .sequence_manager->main_thread_only() 479 .selector.SetQueuePriority(this, priority); 480 } 481 482 TaskQueue::QueuePriority TaskQueueImpl::GetQueuePriority() const { 483 size_t set_index = immediate_work_queue()->work_queue_set_index(); 484 DCHECK_EQ(set_index, delayed_work_queue()->work_queue_set_index()); 485 return static_cast<TaskQueue::QueuePriority>(set_index); 486 } 487 488 void TaskQueueImpl::AsValueInto(TimeTicks now, 489 trace_event::TracedValue* state) const { 490 AutoLock lock(any_thread_lock_); 491 AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_); 492 state->BeginDictionary(); 493 state->SetString("name", GetName()); 494 if (!main_thread_only().sequence_manager) { 495 state->SetBoolean("unregistered", true); 496 state->EndDictionary(); 497 return; 498 } 499 DCHECK(main_thread_only().time_domain); 500 DCHECK(main_thread_only().delayed_work_queue); 501 DCHECK(main_thread_only().immediate_work_queue); 502 503 state->SetString( 504 "task_queue_id", 505 StringPrintf("0x%" PRIx64, 506 static_cast<uint64_t>(reinterpret_cast<uintptr_t>(this)))); 507 state->SetBoolean("enabled", IsQueueEnabled()); 508 state->SetString("time_domain_name", 509 main_thread_only().time_domain->GetName()); 510 state->SetInteger("immediate_incoming_queue_size", 511 immediate_incoming_queue().size()); 512 state->SetInteger("delayed_incoming_queue_size", 513 main_thread_only().delayed_incoming_queue.size()); 514 state->SetInteger("immediate_work_queue_size", 515 main_thread_only().immediate_work_queue->Size()); 516 state->SetInteger("delayed_work_queue_size", 517 main_thread_only().delayed_work_queue->Size()); 518 519 if (!main_thread_only().delayed_incoming_queue.empty()) { 520 TimeDelta delay_to_next_task = 521 (main_thread_only().delayed_incoming_queue.top().delayed_run_time - 522 main_thread_only().time_domain->CreateLazyNow().Now()); 523 state->SetDouble("delay_to_next_task_ms", 524 delay_to_next_task.InMillisecondsF()); 525 } 526 if (main_thread_only().current_fence) 527 state->SetInteger("current_fence", main_thread_only().current_fence); 528 if (main_thread_only().delayed_fence) { 529 state->SetDouble( 530 "delayed_fence_seconds_from_now", 531 (main_thread_only().delayed_fence.value() - now).InSecondsF()); 532 } 533 534 bool verbose = false; 535 TRACE_EVENT_CATEGORY_GROUP_ENABLED( 536 TRACE_DISABLED_BY_DEFAULT("sequence_manager.verbose_snapshots"), 537 &verbose); 538 539 if (verbose) { 540 state->BeginArray("immediate_incoming_queue"); 541 QueueAsValueInto(immediate_incoming_queue(), now, state); 542 state->EndArray(); 543 state->BeginArray("delayed_work_queue"); 544 main_thread_only().delayed_work_queue->AsValueInto(now, state); 545 state->EndArray(); 546 state->BeginArray("immediate_work_queue"); 547 main_thread_only().immediate_work_queue->AsValueInto(now, state); 548 state->EndArray(); 549 state->BeginArray("delayed_incoming_queue"); 550 QueueAsValueInto(main_thread_only().delayed_incoming_queue, now, state); 551 state->EndArray(); 552 } 553 state->SetString("priority", TaskQueue::PriorityToString(GetQueuePriority())); 554 state->EndDictionary(); 555 } 556 557 void TaskQueueImpl::AddTaskObserver(MessageLoop::TaskObserver* task_observer) { 558 main_thread_only().task_observers.AddObserver(task_observer); 559 } 560 561 void TaskQueueImpl::RemoveTaskObserver( 562 MessageLoop::TaskObserver* task_observer) { 563 main_thread_only().task_observers.RemoveObserver(task_observer); 564 } 565 566 void TaskQueueImpl::NotifyWillProcessTask(const PendingTask& pending_task) { 567 DCHECK(should_notify_observers_); 568 if (main_thread_only().blame_context) 569 main_thread_only().blame_context->Enter(); 570 for (auto& observer : main_thread_only().task_observers) 571 observer.WillProcessTask(pending_task); 572 } 573 574 void TaskQueueImpl::NotifyDidProcessTask(const PendingTask& pending_task) { 575 DCHECK(should_notify_observers_); 576 for (auto& observer : main_thread_only().task_observers) 577 observer.DidProcessTask(pending_task); 578 if (main_thread_only().blame_context) 579 main_thread_only().blame_context->Leave(); 580 } 581 582 void TaskQueueImpl::SetTimeDomain(TimeDomain* time_domain) { 583 { 584 AutoLock lock(any_thread_lock_); 585 DCHECK(time_domain); 586 // NOTE this is similar to checking |any_thread().sequence_manager| but 587 // the TaskQueueSelectorTests constructs TaskQueueImpl directly with a null 588 // sequence_manager. Instead we check |any_thread().time_domain| which is 589 // another way of asserting that UnregisterTaskQueue has not been called. 590 DCHECK(any_thread().time_domain); 591 if (!any_thread().time_domain) 592 return; 593 DCHECK(main_thread_checker_.CalledOnValidThread()); 594 if (time_domain == main_thread_only().time_domain) 595 return; 596 597 any_thread().time_domain = time_domain; 598 } 599 600 main_thread_only().time_domain->UnregisterQueue(this); 601 main_thread_only().time_domain = time_domain; 602 603 LazyNow lazy_now = time_domain->CreateLazyNow(); 604 // Clear scheduled wake up to ensure that new notifications are issued 605 // correctly. 606 // TODO(altimin): Remove this when we won't have to support changing time 607 // domains. 608 main_thread_only().scheduled_wake_up = nullopt; 609 UpdateDelayedWakeUp(&lazy_now); 610 } 611 612 TimeDomain* TaskQueueImpl::GetTimeDomain() const { 613 if (PlatformThread::CurrentId() == thread_id_) 614 return main_thread_only().time_domain; 615 616 AutoLock lock(any_thread_lock_); 617 return any_thread().time_domain; 618 } 619 620 void TaskQueueImpl::SetBlameContext(trace_event::BlameContext* blame_context) { 621 main_thread_only().blame_context = blame_context; 622 } 623 624 void TaskQueueImpl::InsertFence(TaskQueue::InsertFencePosition position) { 625 if (!main_thread_only().sequence_manager) 626 return; 627 628 // Only one fence may be present at a time. 629 main_thread_only().delayed_fence = nullopt; 630 631 EnqueueOrder previous_fence = main_thread_only().current_fence; 632 EnqueueOrder current_fence = 633 position == TaskQueue::InsertFencePosition::kNow 634 ? main_thread_only().sequence_manager->GetNextSequenceNumber() 635 : EnqueueOrder::blocking_fence(); 636 637 // Tasks posted after this point will have a strictly higher enqueue order 638 // and will be blocked from running. 639 main_thread_only().current_fence = current_fence; 640 bool task_unblocked = 641 main_thread_only().immediate_work_queue->InsertFence(current_fence); 642 task_unblocked |= 643 main_thread_only().delayed_work_queue->InsertFence(current_fence); 644 645 if (!task_unblocked && previous_fence && previous_fence < current_fence) { 646 AutoLock lock(immediate_incoming_queue_lock_); 647 if (!immediate_incoming_queue().empty() && 648 immediate_incoming_queue().front().enqueue_order() > previous_fence && 649 immediate_incoming_queue().front().enqueue_order() < current_fence) { 650 task_unblocked = true; 651 } 652 } 653 654 if (IsQueueEnabled() && task_unblocked) { 655 main_thread_only().sequence_manager->MaybeScheduleImmediateWork(FROM_HERE); 656 } 657 } 658 659 void TaskQueueImpl::InsertFenceAt(TimeTicks time) { 660 // Task queue can have only one fence, delayed or not. 661 RemoveFence(); 662 main_thread_only().delayed_fence = time; 663 } 664 665 void TaskQueueImpl::RemoveFence() { 666 if (!main_thread_only().sequence_manager) 667 return; 668 669 EnqueueOrder previous_fence = main_thread_only().current_fence; 670 main_thread_only().current_fence = EnqueueOrder::none(); 671 main_thread_only().delayed_fence = nullopt; 672 673 bool task_unblocked = main_thread_only().immediate_work_queue->RemoveFence(); 674 task_unblocked |= main_thread_only().delayed_work_queue->RemoveFence(); 675 676 if (!task_unblocked && previous_fence) { 677 AutoLock lock(immediate_incoming_queue_lock_); 678 if (!immediate_incoming_queue().empty() && 679 immediate_incoming_queue().front().enqueue_order() > previous_fence) { 680 task_unblocked = true; 681 } 682 } 683 684 if (IsQueueEnabled() && task_unblocked) { 685 main_thread_only().sequence_manager->MaybeScheduleImmediateWork(FROM_HERE); 686 } 687 } 688 689 bool TaskQueueImpl::BlockedByFence() const { 690 if (!main_thread_only().current_fence) 691 return false; 692 693 if (!main_thread_only().immediate_work_queue->BlockedByFence() || 694 !main_thread_only().delayed_work_queue->BlockedByFence()) { 695 return false; 696 } 697 698 AutoLock lock(immediate_incoming_queue_lock_); 699 if (immediate_incoming_queue().empty()) 700 return true; 701 702 return immediate_incoming_queue().front().enqueue_order() > 703 main_thread_only().current_fence; 704 } 705 706 bool TaskQueueImpl::HasActiveFence() { 707 if (main_thread_only().delayed_fence && 708 main_thread_only().time_domain->Now() > 709 main_thread_only().delayed_fence.value()) { 710 return true; 711 } 712 return !!main_thread_only().current_fence; 713 } 714 715 bool TaskQueueImpl::CouldTaskRun(EnqueueOrder enqueue_order) const { 716 if (!IsQueueEnabled()) 717 return false; 718 719 if (!main_thread_only().current_fence) 720 return true; 721 722 return enqueue_order < main_thread_only().current_fence; 723 } 724 725 // static 726 void TaskQueueImpl::QueueAsValueInto(const TaskDeque& queue, 727 TimeTicks now, 728 trace_event::TracedValue* state) { 729 for (const Task& task : queue) { 730 TaskAsValueInto(task, now, state); 731 } 732 } 733 734 // static 735 void TaskQueueImpl::QueueAsValueInto(const std::priority_queue<Task>& queue, 736 TimeTicks now, 737 trace_event::TracedValue* state) { 738 // Remove const to search |queue| in the destructive manner. Restore the 739 // content from |visited| later. 740 std::priority_queue<Task>* mutable_queue = 741 const_cast<std::priority_queue<Task>*>(&queue); 742 std::priority_queue<Task> visited; 743 while (!mutable_queue->empty()) { 744 TaskAsValueInto(mutable_queue->top(), now, state); 745 visited.push(std::move(const_cast<Task&>(mutable_queue->top()))); 746 mutable_queue->pop(); 747 } 748 *mutable_queue = std::move(visited); 749 } 750 751 // static 752 void TaskQueueImpl::TaskAsValueInto(const Task& task, 753 TimeTicks now, 754 trace_event::TracedValue* state) { 755 state->BeginDictionary(); 756 state->SetString("posted_from", task.posted_from.ToString()); 757 if (task.enqueue_order_set()) 758 state->SetInteger("enqueue_order", task.enqueue_order()); 759 state->SetInteger("sequence_num", task.sequence_num); 760 state->SetBoolean("nestable", task.nestable == Nestable::kNestable); 761 state->SetBoolean("is_high_res", task.is_high_res); 762 state->SetBoolean("is_cancelled", task.task.IsCancelled()); 763 state->SetDouble("delayed_run_time", 764 (task.delayed_run_time - TimeTicks()).InMillisecondsF()); 765 state->SetDouble("delayed_run_time_milliseconds_from_now", 766 (task.delayed_run_time - now).InMillisecondsF()); 767 state->EndDictionary(); 768 } 769 770 TaskQueueImpl::QueueEnabledVoterImpl::QueueEnabledVoterImpl( 771 scoped_refptr<TaskQueue> task_queue) 772 : task_queue_(task_queue), enabled_(true) {} 773 774 TaskQueueImpl::QueueEnabledVoterImpl::~QueueEnabledVoterImpl() { 775 if (task_queue_->GetTaskQueueImpl()) 776 task_queue_->GetTaskQueueImpl()->RemoveQueueEnabledVoter(this); 777 } 778 779 void TaskQueueImpl::QueueEnabledVoterImpl::SetQueueEnabled(bool enabled) { 780 if (enabled_ == enabled) 781 return; 782 783 task_queue_->GetTaskQueueImpl()->OnQueueEnabledVoteChanged(enabled); 784 enabled_ = enabled; 785 } 786 787 void TaskQueueImpl::RemoveQueueEnabledVoter( 788 const QueueEnabledVoterImpl* voter) { 789 // Bail out if we're being called from TaskQueueImpl::UnregisterTaskQueue. 790 if (!main_thread_only().time_domain) 791 return; 792 793 bool was_enabled = IsQueueEnabled(); 794 if (voter->enabled_) { 795 main_thread_only().is_enabled_refcount--; 796 DCHECK_GE(main_thread_only().is_enabled_refcount, 0); 797 } 798 799 main_thread_only().voter_refcount--; 800 DCHECK_GE(main_thread_only().voter_refcount, 0); 801 802 bool is_enabled = IsQueueEnabled(); 803 if (was_enabled != is_enabled) 804 EnableOrDisableWithSelector(is_enabled); 805 } 806 807 bool TaskQueueImpl::IsQueueEnabled() const { 808 // By default is_enabled_refcount and voter_refcount both equal zero. 809 return (main_thread_only().is_enabled_refcount == 810 main_thread_only().voter_refcount) && 811 main_thread_only().is_enabled_for_test; 812 } 813 814 void TaskQueueImpl::OnQueueEnabledVoteChanged(bool enabled) { 815 bool was_enabled = IsQueueEnabled(); 816 if (enabled) { 817 main_thread_only().is_enabled_refcount++; 818 DCHECK_LE(main_thread_only().is_enabled_refcount, 819 main_thread_only().voter_refcount); 820 } else { 821 main_thread_only().is_enabled_refcount--; 822 DCHECK_GE(main_thread_only().is_enabled_refcount, 0); 823 } 824 825 bool is_enabled = IsQueueEnabled(); 826 if (was_enabled != is_enabled) 827 EnableOrDisableWithSelector(is_enabled); 828 } 829 830 void TaskQueueImpl::EnableOrDisableWithSelector(bool enable) { 831 if (!main_thread_only().sequence_manager) 832 return; 833 834 LazyNow lazy_now = main_thread_only().time_domain->CreateLazyNow(); 835 UpdateDelayedWakeUp(&lazy_now); 836 837 if (enable) { 838 if (HasPendingImmediateWork() && 839 !main_thread_only().on_next_wake_up_changed_callback.is_null()) { 840 // Delayed work notification will be issued via time domain. 841 main_thread_only().on_next_wake_up_changed_callback.Run(TimeTicks()); 842 } 843 844 // Note the selector calls SequenceManager::OnTaskQueueEnabled which posts 845 // a DoWork if needed. 846 main_thread_only() 847 .sequence_manager->main_thread_only() 848 .selector.EnableQueue(this); 849 } else { 850 main_thread_only() 851 .sequence_manager->main_thread_only() 852 .selector.DisableQueue(this); 853 } 854 } 855 856 std::unique_ptr<TaskQueue::QueueEnabledVoter> 857 TaskQueueImpl::CreateQueueEnabledVoter(scoped_refptr<TaskQueue> task_queue) { 858 DCHECK_EQ(task_queue->GetTaskQueueImpl(), this); 859 main_thread_only().voter_refcount++; 860 main_thread_only().is_enabled_refcount++; 861 return std::make_unique<QueueEnabledVoterImpl>(task_queue); 862 } 863 864 void TaskQueueImpl::SweepCanceledDelayedTasks(TimeTicks now) { 865 if (main_thread_only().delayed_incoming_queue.empty()) 866 return; 867 868 // Remove canceled tasks. 869 std::priority_queue<Task> remaining_tasks; 870 while (!main_thread_only().delayed_incoming_queue.empty()) { 871 if (!main_thread_only().delayed_incoming_queue.top().task.IsCancelled()) { 872 remaining_tasks.push(std::move( 873 const_cast<Task&>(main_thread_only().delayed_incoming_queue.top()))); 874 } 875 main_thread_only().delayed_incoming_queue.pop(); 876 } 877 878 main_thread_only().delayed_incoming_queue = std::move(remaining_tasks); 879 880 LazyNow lazy_now(now); 881 UpdateDelayedWakeUp(&lazy_now); 882 } 883 884 void TaskQueueImpl::PushImmediateIncomingTaskForTest( 885 TaskQueueImpl::Task&& task) { 886 AutoLock lock(immediate_incoming_queue_lock_); 887 immediate_incoming_queue().push_back(std::move(task)); 888 } 889 890 void TaskQueueImpl::RequeueDeferredNonNestableTask( 891 DeferredNonNestableTask task) { 892 DCHECK(task.task.nestable == Nestable::kNonNestable); 893 // The re-queued tasks have to be pushed onto the front because we'd otherwise 894 // violate the strict monotonically increasing enqueue order within the 895 // WorkQueue. We can't assign them a new enqueue order here because that will 896 // not behave correctly with fences and things will break (e.g Idle TQ). 897 if (task.work_queue_type == WorkQueueType::kDelayed) { 898 main_thread_only().delayed_work_queue->PushNonNestableTaskToFront( 899 std::move(task.task)); 900 } else { 901 main_thread_only().immediate_work_queue->PushNonNestableTaskToFront( 902 std::move(task.task)); 903 } 904 } 905 906 void TaskQueueImpl::SetOnNextWakeUpChangedCallback( 907 TaskQueueImpl::OnNextWakeUpChangedCallback callback) { 908 #if DCHECK_IS_ON() 909 if (callback) { 910 DCHECK(main_thread_only().on_next_wake_up_changed_callback.is_null()) 911 << "Can't assign two different observers to " 912 "blink::scheduler::TaskQueue"; 913 } 914 #endif 915 AutoLock lock(any_thread_lock_); 916 any_thread().on_next_wake_up_changed_callback = callback; 917 main_thread_only().on_next_wake_up_changed_callback = callback; 918 } 919 920 void TaskQueueImpl::UpdateDelayedWakeUp(LazyNow* lazy_now) { 921 return UpdateDelayedWakeUpImpl(lazy_now, GetNextScheduledWakeUpImpl()); 922 } 923 924 void TaskQueueImpl::UpdateDelayedWakeUpImpl( 925 LazyNow* lazy_now, 926 Optional<TaskQueueImpl::DelayedWakeUp> wake_up) { 927 if (main_thread_only().scheduled_wake_up == wake_up) 928 return; 929 main_thread_only().scheduled_wake_up = wake_up; 930 931 if (wake_up && 932 !main_thread_only().on_next_wake_up_changed_callback.is_null() && 933 !HasPendingImmediateWork()) { 934 main_thread_only().on_next_wake_up_changed_callback.Run(wake_up->time); 935 } 936 937 main_thread_only().time_domain->SetNextWakeUpForQueue(this, wake_up, 938 lazy_now); 939 } 940 941 void TaskQueueImpl::SetDelayedWakeUpForTesting( 942 Optional<TaskQueueImpl::DelayedWakeUp> wake_up) { 943 LazyNow lazy_now = main_thread_only().time_domain->CreateLazyNow(); 944 UpdateDelayedWakeUpImpl(&lazy_now, wake_up); 945 } 946 947 bool TaskQueueImpl::HasPendingImmediateWork() { 948 // Any work queue tasks count as immediate work. 949 if (!main_thread_only().delayed_work_queue->Empty() || 950 !main_thread_only().immediate_work_queue->Empty()) { 951 return true; 952 } 953 954 // Finally tasks on |immediate_incoming_queue| count as immediate work. 955 AutoLock lock(immediate_incoming_queue_lock_); 956 return !immediate_incoming_queue().empty(); 957 } 958 959 void TaskQueueImpl::SetOnTaskStartedHandler( 960 TaskQueueImpl::OnTaskStartedHandler handler) { 961 main_thread_only().on_task_started_handler = std::move(handler); 962 } 963 964 void TaskQueueImpl::OnTaskStarted(const TaskQueue::Task& task, 965 const TaskQueue::TaskTiming& task_timing) { 966 if (!main_thread_only().on_task_started_handler.is_null()) 967 main_thread_only().on_task_started_handler.Run(task, task_timing); 968 } 969 970 void TaskQueueImpl::SetOnTaskCompletedHandler( 971 TaskQueueImpl::OnTaskCompletedHandler handler) { 972 main_thread_only().on_task_completed_handler = std::move(handler); 973 } 974 975 void TaskQueueImpl::OnTaskCompleted(const TaskQueue::Task& task, 976 const TaskQueue::TaskTiming& task_timing) { 977 if (!main_thread_only().on_task_completed_handler.is_null()) 978 main_thread_only().on_task_completed_handler.Run(task, task_timing); 979 } 980 981 bool TaskQueueImpl::RequiresTaskTiming() const { 982 return !main_thread_only().on_task_started_handler.is_null() || 983 !main_thread_only().on_task_completed_handler.is_null(); 984 } 985 986 bool TaskQueueImpl::IsUnregistered() const { 987 AutoLock lock(any_thread_lock_); 988 return !any_thread().sequence_manager; 989 } 990 991 WeakPtr<SequenceManagerImpl> TaskQueueImpl::GetSequenceManagerWeakPtr() { 992 return main_thread_only().sequence_manager->GetWeakPtr(); 993 } 994 995 scoped_refptr<GracefulQueueShutdownHelper> 996 TaskQueueImpl::GetGracefulQueueShutdownHelper() { 997 return main_thread_only().sequence_manager->GetGracefulQueueShutdownHelper(); 998 } 999 1000 void TaskQueueImpl::SetQueueEnabledForTest(bool enabled) { 1001 main_thread_only().is_enabled_for_test = enabled; 1002 EnableOrDisableWithSelector(IsQueueEnabled()); 1003 } 1004 1005 void TaskQueueImpl::ActivateDelayedFenceIfNeeded(TimeTicks now) { 1006 if (!main_thread_only().delayed_fence) 1007 return; 1008 if (main_thread_only().delayed_fence.value() > now) 1009 return; 1010 InsertFence(TaskQueue::InsertFencePosition::kNow); 1011 main_thread_only().delayed_fence = nullopt; 1012 } 1013 1014 } // namespace internal 1015 } // namespace sequence_manager 1016 } // namespace base 1017