1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/threading/sequenced_worker_pool.h" 6 7 #include <stdint.h> 8 9 #include <list> 10 #include <map> 11 #include <memory> 12 #include <set> 13 #include <unordered_map> 14 #include <utility> 15 #include <vector> 16 17 #include "base/atomic_sequence_num.h" 18 #include "base/callback.h" 19 #include "base/compiler_specific.h" 20 #include "base/critical_closure.h" 21 #include "base/debug/dump_without_crashing.h" 22 #include "base/lazy_instance.h" 23 #include "base/logging.h" 24 #include "base/macros.h" 25 #include "base/memory/ptr_util.h" 26 #include "base/stl_util.h" 27 #include "base/strings/stringprintf.h" 28 #include "base/synchronization/condition_variable.h" 29 #include "base/synchronization/lock.h" 30 // Don't enable the redirect to TaskScheduler on Arc++ to avoid pulling a bunch 31 // of dependencies. Some code also #ifdef'ed below. 32 #if 0 33 #include "base/task_scheduler/post_task.h" 34 #include "base/task_scheduler/task_scheduler.h" 35 #endif 36 #include "base/threading/platform_thread.h" 37 #include "base/threading/sequenced_task_runner_handle.h" 38 #include "base/threading/simple_thread.h" 39 #include "base/threading/thread_local.h" 40 #include "base/threading/thread_restrictions.h" 41 #include "base/time/time.h" 42 #include "base/trace_event/trace_event.h" 43 #include "base/tracked_objects.h" 44 #include "base/tracking_info.h" 45 #include "build/build_config.h" 46 47 #if defined(OS_MACOSX) 48 #include "base/mac/scoped_nsautorelease_pool.h" 49 #elif defined(OS_WIN) 50 #include "base/win/scoped_com_initializer.h" 51 #endif 52 53 #if !defined(OS_NACL) 54 #include "base/metrics/histogram_macros.h" 55 #endif 56 57 namespace base { 58 59 namespace { 60 61 // An enum representing the state of all pools. A non-test process should only 62 // ever transition from POST_TASK_DISABLED to one of the active states. A test 63 // process may transition from one of the active states to POST_TASK_DISABLED 64 // when DisableForProcessForTesting() is called. 65 // 66 // External memory synchronization is required to call a method that reads 67 // |g_all_pools_state| after calling a method that modifies it. 68 // 69 // TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool 70 // will be phased out completely otherwise). 71 enum class AllPoolsState { 72 POST_TASK_DISABLED, 73 USE_WORKER_POOL, 74 REDIRECTED_TO_TASK_SCHEDULER, 75 }; 76 77 // TODO(fdoray): Change the initial state to POST_TASK_DISABLED. It is initially 78 // USE_WORKER_POOL to avoid a revert of the CL that adds 79 // debug::DumpWithoutCrashing() in case of waterfall failures. 80 AllPoolsState g_all_pools_state = AllPoolsState::USE_WORKER_POOL; 81 82 TaskPriority g_max_task_priority = TaskPriority::HIGHEST; 83 84 struct SequencedTask : public TrackingInfo { 85 SequencedTask() 86 : sequence_token_id(0), 87 trace_id(0), 88 sequence_task_number(0), 89 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 90 91 explicit SequencedTask(const tracked_objects::Location& from_here) 92 : base::TrackingInfo(from_here, TimeTicks()), 93 sequence_token_id(0), 94 trace_id(0), 95 sequence_task_number(0), 96 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 97 98 ~SequencedTask() {} 99 100 SequencedTask(SequencedTask&&) = default; 101 SequencedTask& operator=(SequencedTask&&) = default; 102 103 int sequence_token_id; 104 int trace_id; 105 int64_t sequence_task_number; 106 SequencedWorkerPool::WorkerShutdown shutdown_behavior; 107 tracked_objects::Location posted_from; 108 OnceClosure task; 109 110 // Non-delayed tasks and delayed tasks are managed together by time-to-run 111 // order. We calculate the time by adding the posted time and the given delay. 112 TimeTicks time_to_run; 113 }; 114 115 struct SequencedTaskLessThan { 116 public: 117 bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const { 118 if (lhs.time_to_run < rhs.time_to_run) 119 return true; 120 121 if (lhs.time_to_run > rhs.time_to_run) 122 return false; 123 124 // If the time happen to match, then we use the sequence number to decide. 125 return lhs.sequence_task_number < rhs.sequence_task_number; 126 } 127 }; 128 129 // Create a process-wide unique ID to represent this task in trace events. This 130 // will be mangled with a Process ID hash to reduce the likelyhood of colliding 131 // with MessageLoop pointers on other processes. 132 uint64_t GetTaskTraceID(const SequencedTask& task, void* pool) { 133 return (static_cast<uint64_t>(task.trace_id) << 32) | 134 static_cast<uint64_t>(reinterpret_cast<intptr_t>(pool)); 135 } 136 137 // SequencedWorkerPoolTaskRunner --------------------------------------------- 138 // A TaskRunner which posts tasks to a SequencedWorkerPool with a 139 // fixed ShutdownBehavior. 140 // 141 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 142 class SequencedWorkerPoolTaskRunner : public TaskRunner { 143 public: 144 SequencedWorkerPoolTaskRunner( 145 scoped_refptr<SequencedWorkerPool> pool, 146 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 147 148 // TaskRunner implementation 149 bool PostDelayedTask(const tracked_objects::Location& from_here, 150 OnceClosure task, 151 TimeDelta delay) override; 152 bool RunsTasksOnCurrentThread() const override; 153 154 private: 155 ~SequencedWorkerPoolTaskRunner() override; 156 157 const scoped_refptr<SequencedWorkerPool> pool_; 158 159 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 160 161 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner); 162 }; 163 164 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner( 165 scoped_refptr<SequencedWorkerPool> pool, 166 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 167 : pool_(std::move(pool)), shutdown_behavior_(shutdown_behavior) {} 168 169 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() { 170 } 171 172 bool SequencedWorkerPoolTaskRunner::PostDelayedTask( 173 const tracked_objects::Location& from_here, 174 OnceClosure task, 175 TimeDelta delay) { 176 if (delay.is_zero()) { 177 return pool_->PostWorkerTaskWithShutdownBehavior(from_here, std::move(task), 178 shutdown_behavior_); 179 } 180 return pool_->PostDelayedWorkerTask(from_here, std::move(task), delay); 181 } 182 183 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const { 184 return pool_->RunsTasksOnCurrentThread(); 185 } 186 187 } // namespace 188 189 // SequencedWorkerPool::PoolSequencedTaskRunner ------------------------------ 190 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a 191 // fixed sequence token. 192 // 193 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 194 class SequencedWorkerPool::PoolSequencedTaskRunner 195 : public SequencedTaskRunner { 196 public: 197 PoolSequencedTaskRunner( 198 scoped_refptr<SequencedWorkerPool> pool, 199 SequencedWorkerPool::SequenceToken token, 200 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 201 202 // TaskRunner implementation 203 bool PostDelayedTask(const tracked_objects::Location& from_here, 204 OnceClosure task, 205 TimeDelta delay) override; 206 bool RunsTasksOnCurrentThread() const override; 207 208 // SequencedTaskRunner implementation 209 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, 210 OnceClosure task, 211 TimeDelta delay) override; 212 213 private: 214 ~PoolSequencedTaskRunner() override; 215 216 const scoped_refptr<SequencedWorkerPool> pool_; 217 218 const SequencedWorkerPool::SequenceToken token_; 219 220 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 221 222 DISALLOW_COPY_AND_ASSIGN(PoolSequencedTaskRunner); 223 }; 224 225 SequencedWorkerPool::PoolSequencedTaskRunner:: 226 PoolSequencedTaskRunner( 227 scoped_refptr<SequencedWorkerPool> pool, 228 SequencedWorkerPool::SequenceToken token, 229 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 230 : pool_(std::move(pool)), 231 token_(token), 232 shutdown_behavior_(shutdown_behavior) {} 233 234 SequencedWorkerPool::PoolSequencedTaskRunner:: 235 ~PoolSequencedTaskRunner() = default; 236 237 bool SequencedWorkerPool::PoolSequencedTaskRunner::PostDelayedTask( 238 const tracked_objects::Location& from_here, 239 OnceClosure task, 240 TimeDelta delay) { 241 if (delay.is_zero()) { 242 return pool_->PostSequencedWorkerTaskWithShutdownBehavior( 243 token_, from_here, std::move(task), shutdown_behavior_); 244 } 245 return pool_->PostDelayedSequencedWorkerTask(token_, from_here, 246 std::move(task), delay); 247 } 248 249 bool SequencedWorkerPool::PoolSequencedTaskRunner:: 250 RunsTasksOnCurrentThread() const { 251 return pool_->IsRunningSequenceOnCurrentThread(token_); 252 } 253 254 bool SequencedWorkerPool::PoolSequencedTaskRunner::PostNonNestableDelayedTask( 255 const tracked_objects::Location& from_here, 256 OnceClosure task, 257 TimeDelta delay) { 258 // There's no way to run nested tasks, so simply forward to 259 // PostDelayedTask. 260 return PostDelayedTask(from_here, std::move(task), delay); 261 } 262 263 // Worker --------------------------------------------------------------------- 264 265 class SequencedWorkerPool::Worker : public SimpleThread { 266 public: 267 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it 268 // around as long as we are running. 269 Worker(scoped_refptr<SequencedWorkerPool> worker_pool, 270 int thread_number, 271 const std::string& thread_name_prefix); 272 ~Worker() override; 273 274 // SimpleThread implementation. This actually runs the background thread. 275 void Run() override; 276 277 // Gets the worker for the current thread out of thread-local storage. 278 static Worker* GetForCurrentThread(); 279 280 // Indicates that a task is about to be run. The parameters provide 281 // additional metainformation about the task being run. 282 void set_running_task_info(SequenceToken token, 283 WorkerShutdown shutdown_behavior) { 284 is_processing_task_ = true; 285 task_sequence_token_ = token; 286 task_shutdown_behavior_ = shutdown_behavior; 287 288 // It is dangerous for tasks with CONTINUE_ON_SHUTDOWN to access a class 289 // that implements a non-leaky base::Singleton because they are generally 290 // destroyed before the process terminates via an AtExitManager 291 // registration. This will trigger a DCHECK to warn of such cases. See the 292 // comment about CONTINUE_ON_SHUTDOWN for more details. 293 ThreadRestrictions::SetSingletonAllowed(task_shutdown_behavior_ != 294 CONTINUE_ON_SHUTDOWN); 295 } 296 297 // Indicates that the task has finished running. 298 void reset_running_task_info() { is_processing_task_ = false; } 299 300 // Whether the worker is processing a task. 301 bool is_processing_task() { return is_processing_task_; } 302 303 SequenceToken task_sequence_token() const { 304 DCHECK(is_processing_task_); 305 return task_sequence_token_; 306 } 307 308 WorkerShutdown task_shutdown_behavior() const { 309 DCHECK(is_processing_task_); 310 return task_shutdown_behavior_; 311 } 312 313 scoped_refptr<SequencedWorkerPool> worker_pool() const { 314 return worker_pool_; 315 } 316 317 private: 318 static LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky 319 lazy_tls_ptr_; 320 321 scoped_refptr<SequencedWorkerPool> worker_pool_; 322 // The sequence token of the task being processed. Only valid when 323 // is_processing_task_ is true. 324 SequenceToken task_sequence_token_; 325 // The shutdown behavior of the task being processed. Only valid when 326 // is_processing_task_ is true. 327 WorkerShutdown task_shutdown_behavior_; 328 // Whether the Worker is processing a task. 329 bool is_processing_task_; 330 331 DISALLOW_COPY_AND_ASSIGN(Worker); 332 }; 333 334 // Inner ---------------------------------------------------------------------- 335 336 class SequencedWorkerPool::Inner { 337 public: 338 // Take a raw pointer to |worker| to avoid cycles (since we're owned 339 // by it). 340 Inner(SequencedWorkerPool* worker_pool, 341 size_t max_threads, 342 const std::string& thread_name_prefix, 343 base::TaskPriority task_priority, 344 TestingObserver* observer); 345 346 ~Inner(); 347 348 static SequenceToken GetSequenceToken(); 349 350 SequenceToken GetNamedSequenceToken(const std::string& name); 351 352 // This function accepts a name and an ID. If the name is null, the 353 // token ID is used. This allows us to implement the optional name lookup 354 // from a single function without having to enter the lock a separate time. 355 bool PostTask(const std::string* optional_token_name, 356 SequenceToken sequence_token, 357 WorkerShutdown shutdown_behavior, 358 const tracked_objects::Location& from_here, 359 OnceClosure task, 360 TimeDelta delay); 361 362 bool RunsTasksOnCurrentThread() const; 363 364 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 365 366 void CleanupForTesting(); 367 368 void SignalHasWorkForTesting(); 369 370 int GetWorkSignalCountForTesting() const; 371 372 void Shutdown(int max_blocking_tasks_after_shutdown); 373 374 bool IsShutdownInProgress(); 375 376 // Runs the worker loop on the background thread. 377 void ThreadLoop(Worker* this_worker); 378 379 private: 380 enum GetWorkStatus { 381 GET_WORK_FOUND, 382 GET_WORK_NOT_FOUND, 383 GET_WORK_WAIT, 384 }; 385 386 enum CleanupState { 387 CLEANUP_REQUESTED, 388 CLEANUP_STARTING, 389 CLEANUP_RUNNING, 390 CLEANUP_FINISHING, 391 CLEANUP_DONE, 392 }; 393 394 // Clears ScheduledTasks in |tasks_to_delete| while ensuring that 395 // |this_worker| has the desired task info context during ~ScheduledTask() to 396 // allow sequence-checking. 397 void DeleteWithoutLock(std::vector<SequencedTask>* tasks_to_delete, 398 Worker* this_worker); 399 400 // Helper used by PostTask() to complete the work when redirection is on. 401 // Returns true if the task may run at some point in the future and false if 402 // it will definitely not run. 403 // Coalesce upon resolution of http://crbug.com/622400. 404 bool PostTaskToTaskScheduler(SequencedTask sequenced, const TimeDelta& delay); 405 406 // Returns the TaskScheduler TaskRunner for the specified |sequence_token_id| 407 // and |traits|. 408 scoped_refptr<TaskRunner> GetTaskSchedulerTaskRunner( 409 int sequence_token_id, 410 const TaskTraits& traits); 411 412 // Called from within the lock, this converts the given token name into a 413 // token ID, creating a new one if necessary. 414 int LockedGetNamedTokenID(const std::string& name); 415 416 // Called from within the lock, this returns the next sequence task number. 417 int64_t LockedGetNextSequenceTaskNumber(); 418 419 // Gets new task. There are 3 cases depending on the return value: 420 // 421 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should 422 // be run immediately. 423 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run, 424 // and |task| is not filled in. In this case, the caller should wait until 425 // a task is posted. 426 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run 427 // immediately, and |task| is not filled in. Likewise, |wait_time| is 428 // filled in the time to wait until the next task to run. In this case, the 429 // caller should wait the time. 430 // 431 // In any case, the calling code should clear the given 432 // delete_these_outside_lock vector the next time the lock is released. 433 // See the implementation for a more detailed description. 434 GetWorkStatus GetWork(SequencedTask* task, 435 TimeDelta* wait_time, 436 std::vector<SequencedTask>* delete_these_outside_lock); 437 438 void HandleCleanup(); 439 440 // Peforms init and cleanup around running the given task. WillRun... 441 // returns the value from PrepareToStartAdditionalThreadIfNecessary. 442 // The calling code should call FinishStartingAdditionalThread once the 443 // lock is released if the return values is nonzero. 444 int WillRunWorkerTask(const SequencedTask& task); 445 void DidRunWorkerTask(const SequencedTask& task); 446 447 // Returns true if there are no threads currently running the given 448 // sequence token. 449 bool IsSequenceTokenRunnable(int sequence_token_id) const; 450 451 // Checks if all threads are busy and the addition of one more could run an 452 // additional task waiting in the queue. This must be called from within 453 // the lock. 454 // 455 // If another thread is helpful, this will mark the thread as being in the 456 // process of starting and returns the index of the new thread which will be 457 // 0 or more. The caller should then call FinishStartingAdditionalThread to 458 // complete initialization once the lock is released. 459 // 460 // If another thread is not necessary, return 0; 461 // 462 // See the implementedion for more. 463 int PrepareToStartAdditionalThreadIfHelpful(); 464 465 // The second part of thread creation after 466 // PrepareToStartAdditionalThreadIfHelpful with the thread number it 467 // generated. This actually creates the thread and should be called outside 468 // the lock to avoid blocking important work starting a thread in the lock. 469 void FinishStartingAdditionalThread(int thread_number); 470 471 // Signal |has_work_| and increment |has_work_signal_count_|. 472 void SignalHasWork(); 473 474 // Checks whether there is work left that's blocking shutdown. Must be 475 // called inside the lock. 476 bool CanShutdown() const; 477 478 SequencedWorkerPool* const worker_pool_; 479 480 // The last sequence number used. Managed by GetSequenceToken, since this 481 // only does threadsafe increment operations, you do not need to hold the 482 // lock. This is class-static to make SequenceTokens issued by 483 // GetSequenceToken unique across SequencedWorkerPool instances. 484 static base::StaticAtomicSequenceNumber g_last_sequence_number_; 485 486 // This lock protects |everything in this class|. Do not read or modify 487 // anything without holding this lock. Do not block while holding this 488 // lock. 489 mutable Lock lock_; 490 491 // Condition variable that is waited on by worker threads until new 492 // tasks are posted or shutdown starts. 493 ConditionVariable has_work_cv_; 494 495 // Condition variable that is waited on by non-worker threads (in 496 // Shutdown()) until CanShutdown() goes to true. 497 ConditionVariable can_shutdown_cv_; 498 499 // The maximum number of worker threads we'll create. 500 const size_t max_threads_; 501 502 const std::string thread_name_prefix_; 503 504 // Associates all known sequence token names with their IDs. 505 std::map<std::string, int> named_sequence_tokens_; 506 507 // Owning pointers to all threads we've created so far, indexed by 508 // ID. Since we lazily create threads, this may be less than 509 // max_threads_ and will be initially empty. 510 using ThreadMap = std::map<PlatformThreadId, std::unique_ptr<Worker>>; 511 ThreadMap threads_; 512 513 // Set to true when we're in the process of creating another thread. 514 // See PrepareToStartAdditionalThreadIfHelpful for more. 515 bool thread_being_created_; 516 517 // Number of threads currently waiting for work. 518 size_t waiting_thread_count_; 519 520 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN 521 // or SKIP_ON_SHUTDOWN flag set. 522 size_t blocking_shutdown_thread_count_; 523 524 // A set of all pending tasks in time-to-run order. These are tasks that are 525 // either waiting for a thread to run on, waiting for their time to run, 526 // or blocked on a previous task in their sequence. We have to iterate over 527 // the tasks by time-to-run order, so we use the set instead of the 528 // traditional priority_queue. 529 typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet; 530 PendingTaskSet pending_tasks_; 531 532 // The next sequence number for a new sequenced task. 533 int64_t next_sequence_task_number_; 534 535 // Number of tasks in the pending_tasks_ list that are marked as blocking 536 // shutdown. 537 size_t blocking_shutdown_pending_task_count_; 538 539 // Lists all sequence tokens currently executing. 540 std::set<int> current_sequences_; 541 542 // An ID for each posted task to distinguish the task from others in traces. 543 int trace_id_; 544 545 // Set when Shutdown is called and no further tasks should be 546 // allowed, though we may still be running existing tasks. 547 bool shutdown_called_; 548 549 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() 550 // has been called. 551 int max_blocking_tasks_after_shutdown_; 552 553 // State used to cleanup for testing, all guarded by lock_. 554 CleanupState cleanup_state_; 555 size_t cleanup_idlers_; 556 ConditionVariable cleanup_cv_; 557 558 TestingObserver* const testing_observer_; 559 560 // Members below are used for the experimental redirection to TaskScheduler. 561 // TODO(gab): Remove these if http://crbug.com/622400 fails 562 // (SequencedWorkerPool will be phased out completely otherwise). 563 564 // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the 565 // TaskScheduler as an experiment (unused otherwise). 566 const base::TaskPriority task_priority_; 567 568 // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect 569 // sequenced tasks to the TaskScheduler. 570 std::unordered_map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_; 571 572 // TaskScheduler TaskRunners to redirect unsequenced tasks to the 573 // TaskScheduler. Indexed by TaskShutdownBehavior. 574 scoped_refptr<TaskRunner> unsequenced_task_runners_[3]; 575 576 // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as 577 // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread(). 578 // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread(). 579 mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_; 580 581 DISALLOW_COPY_AND_ASSIGN(Inner); 582 }; 583 584 // Worker definitions --------------------------------------------------------- 585 586 SequencedWorkerPool::Worker::Worker( 587 scoped_refptr<SequencedWorkerPool> worker_pool, 588 int thread_number, 589 const std::string& prefix) 590 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), 591 worker_pool_(std::move(worker_pool)), 592 task_shutdown_behavior_(BLOCK_SHUTDOWN), 593 is_processing_task_(false) { 594 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 595 Start(); 596 } 597 598 SequencedWorkerPool::Worker::~Worker() { 599 } 600 601 void SequencedWorkerPool::Worker::Run() { 602 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 603 604 #if defined(OS_WIN) 605 win::ScopedCOMInitializer com_initializer; 606 #endif 607 608 // Store a pointer to this worker in thread local storage for static function 609 // access. 610 DCHECK(!lazy_tls_ptr_.Get().Get()); 611 lazy_tls_ptr_.Get().Set(this); 612 613 // Just jump back to the Inner object to run the thread, since it has all the 614 // tracking information and queues. It might be more natural to implement 615 // using DelegateSimpleThread and have Inner implement the Delegate to avoid 616 // having these worker objects at all, but that method lacks the ability to 617 // send thread-specific information easily to the thread loop. 618 worker_pool_->inner_->ThreadLoop(this); 619 // Release our cyclic reference once we're done. 620 worker_pool_ = nullptr; 621 } 622 623 // static 624 SequencedWorkerPool::Worker* 625 SequencedWorkerPool::Worker::GetForCurrentThread() { 626 // Don't construct lazy instance on check. 627 if (lazy_tls_ptr_ == nullptr) 628 return nullptr; 629 630 return lazy_tls_ptr_.Get().Get(); 631 } 632 633 // static 634 LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky 635 SequencedWorkerPool::Worker::lazy_tls_ptr_ = LAZY_INSTANCE_INITIALIZER; 636 637 // Inner definitions --------------------------------------------------------- 638 639 SequencedWorkerPool::Inner::Inner(SequencedWorkerPool* worker_pool, 640 size_t max_threads, 641 const std::string& thread_name_prefix, 642 base::TaskPriority task_priority, 643 TestingObserver* observer) 644 : worker_pool_(worker_pool), 645 lock_(), 646 has_work_cv_(&lock_), 647 can_shutdown_cv_(&lock_), 648 max_threads_(max_threads), 649 thread_name_prefix_(thread_name_prefix), 650 thread_being_created_(false), 651 waiting_thread_count_(0), 652 blocking_shutdown_thread_count_(0), 653 next_sequence_task_number_(0), 654 blocking_shutdown_pending_task_count_(0), 655 trace_id_(0), 656 shutdown_called_(false), 657 max_blocking_tasks_after_shutdown_(0), 658 cleanup_state_(CLEANUP_DONE), 659 cleanup_idlers_(0), 660 cleanup_cv_(&lock_), 661 testing_observer_(observer), 662 task_priority_(static_cast<int>(task_priority) <= 663 static_cast<int>(g_max_task_priority) 664 ? task_priority 665 : g_max_task_priority) { 666 DCHECK_GT(max_threads_, 1U); 667 } 668 669 SequencedWorkerPool::Inner::~Inner() { 670 // You must call Shutdown() before destroying the pool. 671 DCHECK(shutdown_called_); 672 673 // Need to explicitly join with the threads before they're destroyed or else 674 // they will be running when our object is half torn down. 675 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 676 it->second->Join(); 677 threads_.clear(); 678 679 if (testing_observer_) 680 testing_observer_->OnDestruct(); 681 } 682 683 // static 684 SequencedWorkerPool::SequenceToken 685 SequencedWorkerPool::Inner::GetSequenceToken() { 686 // Need to add one because StaticAtomicSequenceNumber starts at zero, which 687 // is used as a sentinel value in SequenceTokens. 688 return SequenceToken(g_last_sequence_number_.GetNext() + 1); 689 } 690 691 SequencedWorkerPool::SequenceToken 692 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 693 AutoLock lock(lock_); 694 return SequenceToken(LockedGetNamedTokenID(name)); 695 } 696 697 bool SequencedWorkerPool::Inner::PostTask( 698 const std::string* optional_token_name, 699 SequenceToken sequence_token, 700 WorkerShutdown shutdown_behavior, 701 const tracked_objects::Location& from_here, 702 OnceClosure task, 703 TimeDelta delay) { 704 DCHECK(task); 705 706 // TODO(fdoray): Uncomment this DCHECK. It is initially commented to avoid a 707 // revert of the CL that adds debug::DumpWithoutCrashing() if it fails on the 708 // waterfall. https://crbug.com/622400 709 // DCHECK_NE(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); 710 if (g_all_pools_state == AllPoolsState::POST_TASK_DISABLED) 711 debug::DumpWithoutCrashing(); 712 713 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN); 714 SequencedTask sequenced(from_here); 715 sequenced.sequence_token_id = sequence_token.id_; 716 sequenced.shutdown_behavior = shutdown_behavior; 717 sequenced.posted_from = from_here; 718 sequenced.task = shutdown_behavior == BLOCK_SHUTDOWN 719 ? base::MakeCriticalClosure(std::move(task)) 720 : std::move(task); 721 sequenced.time_to_run = TimeTicks::Now() + delay; 722 723 int create_thread_id = 0; 724 { 725 AutoLock lock(lock_); 726 727 if (shutdown_called_) { 728 // Don't allow a new task to be posted if it doesn't block shutdown. 729 if (shutdown_behavior != BLOCK_SHUTDOWN) 730 return false; 731 732 // If the current thread is running a task, and that task doesn't block 733 // shutdown, then it shouldn't be allowed to post any more tasks. 734 ThreadMap::const_iterator found = 735 threads_.find(PlatformThread::CurrentId()); 736 if (found != threads_.end() && found->second->is_processing_task() && 737 found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) { 738 return false; 739 } 740 741 if (max_blocking_tasks_after_shutdown_ <= 0) { 742 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; 743 return false; 744 } 745 max_blocking_tasks_after_shutdown_ -= 1; 746 } 747 748 // The trace_id is used for identifying the task in about:tracing. 749 sequenced.trace_id = trace_id_++; 750 751 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), 752 "SequencedWorkerPool::Inner::PostTask", 753 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), 754 TRACE_EVENT_FLAG_FLOW_OUT); 755 756 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); 757 758 // Now that we have the lock, apply the named token rules. 759 if (optional_token_name) 760 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); 761 762 // See on top of the file why we don't compile this on Arc++. 763 #if 0 764 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 765 if (!PostTaskToTaskScheduler(std::move(sequenced), delay)) 766 return false; 767 } else { 768 #endif 769 SequencedWorkerPool::WorkerShutdown shutdown_behavior = 770 sequenced.shutdown_behavior; 771 pending_tasks_.insert(std::move(sequenced)); 772 773 if (shutdown_behavior == BLOCK_SHUTDOWN) 774 blocking_shutdown_pending_task_count_++; 775 776 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 777 #if 0 778 } 779 #endif 780 } 781 782 // Use != REDIRECTED_TO_TASK_SCHEDULER instead of == USE_WORKER_POOL to ensure 783 // correct behavior if a task is posted to a SequencedWorkerPool before 784 // Enable(WithRedirectionToTaskScheduler)ForProcess() in a non-DCHECK build. 785 if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 786 // Actually start the additional thread or signal an existing one outside 787 // the lock. 788 if (create_thread_id) 789 FinishStartingAdditionalThread(create_thread_id); 790 else 791 SignalHasWork(); 792 } 793 794 #if DCHECK_IS_ON() 795 { 796 AutoLock lock_for_dcheck(lock_); 797 // Some variables are exposed in both modes for convenience but only really 798 // intended for one of them at runtime, confirm exclusive usage here. 799 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 800 DCHECK(pending_tasks_.empty()); 801 DCHECK_EQ(0, create_thread_id); 802 } else { 803 DCHECK(sequenced_task_runner_map_.empty()); 804 } 805 } 806 #endif // DCHECK_IS_ON() 807 808 return true; 809 } 810 811 bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler( 812 SequencedTask sequenced, 813 const TimeDelta& delay) { 814 #if 1 815 NOTREACHED(); 816 return false; 817 #else 818 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 819 820 lock_.AssertAcquired(); 821 822 // Confirm that the TaskScheduler's shutdown behaviors use the same 823 // underlying values as SequencedWorkerPool. 824 static_assert( 825 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 826 static_cast<int>(CONTINUE_ON_SHUTDOWN), 827 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " 828 "CONTINUE_ON_SHUTDOWN."); 829 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == 830 static_cast<int>(SKIP_ON_SHUTDOWN), 831 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " 832 "SKIP_ON_SHUTDOWN."); 833 static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) == 834 static_cast<int>(BLOCK_SHUTDOWN), 835 "TaskShutdownBehavior and WorkerShutdown enum mismatch for " 836 "BLOCK_SHUTDOWN."); 837 838 const TaskShutdownBehavior task_shutdown_behavior = 839 static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior); 840 const TaskTraits traits = TaskTraits() 841 .MayBlock() 842 .WithBaseSyncPrimitives() 843 .WithPriority(task_priority_) 844 .WithShutdownBehavior(task_shutdown_behavior); 845 return GetTaskSchedulerTaskRunner(sequenced.sequence_token_id, traits) 846 ->PostDelayedTask(sequenced.posted_from, std::move(sequenced.task), 847 delay); 848 #endif 849 } 850 851 scoped_refptr<TaskRunner> 852 SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner( 853 int sequence_token_id, 854 const TaskTraits& traits) { 855 #if 1 856 NOTREACHED(); 857 return scoped_refptr<TaskRunner>(); 858 #else 859 DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 860 861 lock_.AssertAcquired(); 862 863 static_assert( 864 static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 0, 865 "TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN must be equal to 0 to be " 866 "used as an index in |unsequenced_task_runners_|."); 867 static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == 1, 868 "TaskShutdownBehavior::SKIP_ON_SHUTDOWN must be equal to 1 to " 869 "be used as an index in |unsequenced_task_runners_|."); 870 static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) == 2, 871 "TaskShutdownBehavior::BLOCK_SHUTDOWN must be equal to 2 to be " 872 "used as an index in |unsequenced_task_runners_|."); 873 static_assert(arraysize(unsequenced_task_runners_) == 3, 874 "The size of |unsequenced_task_runners_| doesn't match the " 875 "number of shutdown behaviors."); 876 877 scoped_refptr<TaskRunner>& task_runner = 878 sequence_token_id ? sequenced_task_runner_map_[sequence_token_id] 879 : unsequenced_task_runners_[static_cast<int>( 880 traits.shutdown_behavior())]; 881 882 // TODO(fdoray): DCHECK that all tasks posted to the same sequence have the 883 // same shutdown behavior. 884 885 if (!task_runner) { 886 task_runner = sequence_token_id 887 ? CreateSequencedTaskRunnerWithTraits(traits) 888 : CreateTaskRunnerWithTraits(traits); 889 } 890 891 return task_runner; 892 #endif 893 } 894 895 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 896 AutoLock lock(lock_); 897 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 898 #if 0 899 if (!runs_tasks_on_verifier_) { 900 runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( 901 TaskTraits().MayBlock().WithBaseSyncPrimitives().WithPriority( 902 task_priority_)); 903 } 904 #endif 905 return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); 906 } else { 907 return ContainsKey(threads_, PlatformThread::CurrentId()); 908 } 909 } 910 911 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 912 SequenceToken sequence_token) const { 913 DCHECK(sequence_token.IsValid()); 914 915 AutoLock lock(lock_); 916 917 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 918 const auto sequenced_task_runner_it = 919 sequenced_task_runner_map_.find(sequence_token.id_); 920 return sequenced_task_runner_it != sequenced_task_runner_map_.end() && 921 sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); 922 } else { 923 ThreadMap::const_iterator found = 924 threads_.find(PlatformThread::CurrentId()); 925 return found != threads_.end() && found->second->is_processing_task() && 926 sequence_token.Equals(found->second->task_sequence_token()); 927 } 928 } 929 930 // See https://code.google.com/p/chromium/issues/detail?id=168415 931 void SequencedWorkerPool::Inner::CleanupForTesting() { 932 DCHECK_NE(g_all_pools_state, AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); 933 AutoLock lock(lock_); 934 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 935 if (shutdown_called_) 936 return; 937 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) 938 return; 939 cleanup_state_ = CLEANUP_REQUESTED; 940 cleanup_idlers_ = 0; 941 has_work_cv_.Signal(); 942 while (cleanup_state_ != CLEANUP_DONE) 943 cleanup_cv_.Wait(); 944 } 945 946 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { 947 SignalHasWork(); 948 } 949 950 void SequencedWorkerPool::Inner::Shutdown( 951 int max_new_blocking_tasks_after_shutdown) { 952 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); 953 { 954 AutoLock lock(lock_); 955 // Cleanup and Shutdown should not be called concurrently. 956 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 957 if (shutdown_called_) 958 return; 959 shutdown_called_ = true; 960 961 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; 962 963 if (g_all_pools_state != AllPoolsState::USE_WORKER_POOL) 964 return; 965 966 // Tickle the threads. This will wake up a waiting one so it will know that 967 // it can exit, which in turn will wake up any other waiting ones. 968 SignalHasWork(); 969 970 // There are no pending or running tasks blocking shutdown, we're done. 971 if (CanShutdown()) 972 return; 973 } 974 975 // If we're here, then something is blocking shutdown. So wait for 976 // CanShutdown() to go to true. 977 978 if (testing_observer_) 979 testing_observer_->WillWaitForShutdown(); 980 981 #if !defined(OS_NACL) 982 TimeTicks shutdown_wait_begin = TimeTicks::Now(); 983 #endif 984 985 { 986 base::ThreadRestrictions::ScopedAllowWait allow_wait; 987 AutoLock lock(lock_); 988 while (!CanShutdown()) 989 can_shutdown_cv_.Wait(); 990 } 991 #if !defined(OS_NACL) 992 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", 993 TimeTicks::Now() - shutdown_wait_begin); 994 #endif 995 } 996 997 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { 998 AutoLock lock(lock_); 999 return shutdown_called_; 1000 } 1001 1002 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 1003 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 1004 { 1005 AutoLock lock(lock_); 1006 DCHECK(thread_being_created_); 1007 thread_being_created_ = false; 1008 auto result = threads_.insert( 1009 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); 1010 DCHECK(result.second); 1011 1012 while (true) { 1013 #if defined(OS_MACOSX) 1014 base::mac::ScopedNSAutoreleasePool autorelease_pool; 1015 #endif 1016 1017 HandleCleanup(); 1018 1019 // See GetWork for what delete_these_outside_lock is doing. 1020 SequencedTask task; 1021 TimeDelta wait_time; 1022 std::vector<SequencedTask> delete_these_outside_lock; 1023 GetWorkStatus status = 1024 GetWork(&task, &wait_time, &delete_these_outside_lock); 1025 if (status == GET_WORK_FOUND) { 1026 TRACE_TASK_EXECUTION("SequencedWorkerPool::Inner::ThreadLoop", task); 1027 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), 1028 "SequencedWorkerPool::Inner::PostTask", 1029 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))), 1030 TRACE_EVENT_FLAG_FLOW_IN); 1031 int new_thread_id = WillRunWorkerTask(task); 1032 { 1033 AutoUnlock unlock(lock_); 1034 // There may be more work available, so wake up another 1035 // worker thread. (Technically not required, since we 1036 // already get a signal for each new task, but it doesn't 1037 // hurt.) 1038 SignalHasWork(); 1039 DeleteWithoutLock(&delete_these_outside_lock, this_worker); 1040 1041 // Complete thread creation outside the lock if necessary. 1042 if (new_thread_id) 1043 FinishStartingAdditionalThread(new_thread_id); 1044 1045 this_worker->set_running_task_info( 1046 SequenceToken(task.sequence_token_id), task.shutdown_behavior); 1047 1048 tracked_objects::TaskStopwatch stopwatch; 1049 stopwatch.Start(); 1050 std::move(task.task).Run(); 1051 stopwatch.Stop(); 1052 1053 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( 1054 task, stopwatch); 1055 1056 // Make sure our task is erased outside the lock for the 1057 // same reason we do this with delete_these_oustide_lock. 1058 // Also, do it before calling reset_running_task_info() so 1059 // that sequence-checking from within the task's destructor 1060 // still works. 1061 DCHECK(!task.task); 1062 1063 this_worker->reset_running_task_info(); 1064 } 1065 DidRunWorkerTask(task); // Must be done inside the lock. 1066 } else if (cleanup_state_ == CLEANUP_RUNNING) { 1067 switch (status) { 1068 case GET_WORK_WAIT: { 1069 AutoUnlock unlock(lock_); 1070 DeleteWithoutLock(&delete_these_outside_lock, this_worker); 1071 } 1072 break; 1073 case GET_WORK_NOT_FOUND: 1074 CHECK(delete_these_outside_lock.empty()); 1075 cleanup_state_ = CLEANUP_FINISHING; 1076 cleanup_cv_.Broadcast(); 1077 break; 1078 default: 1079 NOTREACHED(); 1080 } 1081 } else { 1082 // When we're terminating and there's no more work, we can 1083 // shut down, other workers can complete any pending or new tasks. 1084 // We can get additional tasks posted after shutdown_called_ is set 1085 // but only worker threads are allowed to post tasks at that time, and 1086 // the workers responsible for posting those tasks will be available 1087 // to run them. Also, there may be some tasks stuck behind running 1088 // ones with the same sequence token, but additional threads won't 1089 // help this case. 1090 if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) { 1091 AutoUnlock unlock(lock_); 1092 DeleteWithoutLock(&delete_these_outside_lock, this_worker); 1093 break; 1094 } 1095 1096 // No work was found, but there are tasks that need deletion. The 1097 // deletion must happen outside of the lock. 1098 if (delete_these_outside_lock.size()) { 1099 AutoUnlock unlock(lock_); 1100 DeleteWithoutLock(&delete_these_outside_lock, this_worker); 1101 1102 // Since the lock has been released, |status| may no longer be 1103 // accurate. It might read GET_WORK_WAIT even if there are tasks 1104 // ready to perform work. Jump to the top of the loop to recalculate 1105 // |status|. 1106 continue; 1107 } 1108 1109 waiting_thread_count_++; 1110 1111 switch (status) { 1112 case GET_WORK_NOT_FOUND: 1113 has_work_cv_.Wait(); 1114 break; 1115 case GET_WORK_WAIT: 1116 has_work_cv_.TimedWait(wait_time); 1117 break; 1118 default: 1119 NOTREACHED(); 1120 } 1121 waiting_thread_count_--; 1122 } 1123 // |delete_these_outside_lock| should have been cleared via 1124 // DeleteWithoutLock() above already. 1125 DCHECK(delete_these_outside_lock.empty()); 1126 } 1127 } // Release lock_. 1128 1129 // We noticed we should exit. Wake up the next worker so it knows it should 1130 // exit as well (because the Shutdown() code only signals once). 1131 SignalHasWork(); 1132 1133 // Possibly unblock shutdown. 1134 can_shutdown_cv_.Signal(); 1135 } 1136 1137 void SequencedWorkerPool::Inner::DeleteWithoutLock( 1138 std::vector<SequencedTask>* tasks_to_delete, 1139 Worker* this_worker) { 1140 while (!tasks_to_delete->empty()) { 1141 const SequencedTask& deleted_task = tasks_to_delete->back(); 1142 this_worker->set_running_task_info( 1143 SequenceToken(deleted_task.sequence_token_id), 1144 deleted_task.shutdown_behavior); 1145 tasks_to_delete->pop_back(); 1146 } 1147 this_worker->reset_running_task_info(); 1148 } 1149 1150 void SequencedWorkerPool::Inner::HandleCleanup() { 1151 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 1152 1153 lock_.AssertAcquired(); 1154 if (cleanup_state_ == CLEANUP_DONE) 1155 return; 1156 if (cleanup_state_ == CLEANUP_REQUESTED) { 1157 // We win, we get to do the cleanup as soon as the others wise up and idle. 1158 cleanup_state_ = CLEANUP_STARTING; 1159 while (thread_being_created_ || 1160 cleanup_idlers_ != threads_.size() - 1) { 1161 has_work_cv_.Signal(); 1162 cleanup_cv_.Wait(); 1163 } 1164 cleanup_state_ = CLEANUP_RUNNING; 1165 return; 1166 } 1167 if (cleanup_state_ == CLEANUP_STARTING) { 1168 // Another worker thread is cleaning up, we idle here until thats done. 1169 ++cleanup_idlers_; 1170 cleanup_cv_.Broadcast(); 1171 while (cleanup_state_ != CLEANUP_FINISHING) { 1172 cleanup_cv_.Wait(); 1173 } 1174 --cleanup_idlers_; 1175 cleanup_cv_.Broadcast(); 1176 return; 1177 } 1178 if (cleanup_state_ == CLEANUP_FINISHING) { 1179 // We wait for all idlers to wake up prior to being DONE. 1180 while (cleanup_idlers_ != 0) { 1181 cleanup_cv_.Broadcast(); 1182 cleanup_cv_.Wait(); 1183 } 1184 if (cleanup_state_ == CLEANUP_FINISHING) { 1185 cleanup_state_ = CLEANUP_DONE; 1186 cleanup_cv_.Signal(); 1187 } 1188 return; 1189 } 1190 } 1191 1192 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( 1193 const std::string& name) { 1194 lock_.AssertAcquired(); 1195 DCHECK(!name.empty()); 1196 1197 std::map<std::string, int>::const_iterator found = 1198 named_sequence_tokens_.find(name); 1199 if (found != named_sequence_tokens_.end()) 1200 return found->second; // Got an existing one. 1201 1202 // Create a new one for this name. 1203 SequenceToken result = GetSequenceToken(); 1204 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); 1205 return result.id_; 1206 } 1207 1208 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { 1209 lock_.AssertAcquired(); 1210 // We assume that we never create enough tasks to wrap around. 1211 return next_sequence_task_number_++; 1212 } 1213 1214 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 1215 SequencedTask* task, 1216 TimeDelta* wait_time, 1217 std::vector<SequencedTask>* delete_these_outside_lock) { 1218 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 1219 1220 lock_.AssertAcquired(); 1221 1222 // Find the next task with a sequence token that's not currently in use. 1223 // If the token is in use, that means another thread is running something 1224 // in that sequence, and we can't run it without going out-of-order. 1225 // 1226 // This algorithm is simple and fair, but inefficient in some cases. For 1227 // example, say somebody schedules 1000 slow tasks with the same sequence 1228 // number. We'll have to go through all those tasks each time we feel like 1229 // there might be work to schedule. If this proves to be a problem, we 1230 // should make this more efficient. 1231 // 1232 // One possible enhancement would be to keep a map from sequence ID to a 1233 // list of pending but currently blocked SequencedTasks for that ID. 1234 // When a worker finishes a task of one sequence token, it can pick up the 1235 // next one from that token right away. 1236 // 1237 // This may lead to starvation if there are sufficient numbers of sequences 1238 // in use. To alleviate this, we could add an incrementing priority counter 1239 // to each SequencedTask. Then maintain a priority_queue of all runnable 1240 // tasks, sorted by priority counter. When a sequenced task is completed 1241 // we would pop the head element off of that tasks pending list and add it 1242 // to the priority queue. Then we would run the first item in the priority 1243 // queue. 1244 1245 GetWorkStatus status = GET_WORK_NOT_FOUND; 1246 int unrunnable_tasks = 0; 1247 PendingTaskSet::iterator i = pending_tasks_.begin(); 1248 // We assume that the loop below doesn't take too long and so we can just do 1249 // a single call to TimeTicks::Now(). 1250 const TimeTicks current_time = TimeTicks::Now(); 1251 while (i != pending_tasks_.end()) { 1252 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { 1253 unrunnable_tasks++; 1254 ++i; 1255 continue; 1256 } 1257 1258 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { 1259 // We're shutting down and the task we just found isn't blocking 1260 // shutdown. Delete it and get more work. 1261 // 1262 // Note that we do not want to delete unrunnable tasks. Deleting a task 1263 // can have side effects (like freeing some objects) and deleting a task 1264 // that's supposed to run after one that's currently running could cause 1265 // an obscure crash. 1266 // 1267 // We really want to delete these tasks outside the lock in case the 1268 // closures are holding refs to objects that want to post work from their 1269 // destructors (which would deadlock). The closures are internally 1270 // refcounted, so we just need to keep a copy of them alive until the lock 1271 // is exited. The calling code can just clear() the vector they passed to 1272 // us once the lock is exited to make this happen. 1273 // 1274 // The const_cast here is safe since the object is erased from 1275 // |pending_tasks_| soon after the move. 1276 delete_these_outside_lock->push_back( 1277 std::move(const_cast<SequencedTask&>(*i))); 1278 pending_tasks_.erase(i++); 1279 continue; 1280 } 1281 1282 if (i->time_to_run > current_time) { 1283 // The time to run has not come yet. 1284 *wait_time = i->time_to_run - current_time; 1285 status = GET_WORK_WAIT; 1286 if (cleanup_state_ == CLEANUP_RUNNING) { 1287 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. 1288 // The const_cast here is safe since the object is erased from 1289 // |pending_tasks_| soon after the move. 1290 delete_these_outside_lock->push_back( 1291 std::move(const_cast<SequencedTask&>(*i))); 1292 pending_tasks_.erase(i); 1293 } 1294 break; 1295 } 1296 1297 // Found a runnable task. The const_cast is safe here since the object is 1298 // erased from |pending_tasks_| soon after the move. 1299 *task = std::move(const_cast<SequencedTask&>(*i)); 1300 pending_tasks_.erase(i); 1301 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { 1302 blocking_shutdown_pending_task_count_--; 1303 } 1304 1305 status = GET_WORK_FOUND; 1306 break; 1307 } 1308 1309 return status; 1310 } 1311 1312 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { 1313 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 1314 1315 lock_.AssertAcquired(); 1316 1317 // Mark the task's sequence number as in use. 1318 if (task.sequence_token_id) 1319 current_sequences_.insert(task.sequence_token_id); 1320 1321 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN 1322 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread 1323 // completes. 1324 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) 1325 blocking_shutdown_thread_count_++; 1326 1327 // We just picked up a task. Since StartAdditionalThreadIfHelpful only 1328 // creates a new thread if there is no free one, there is a race when posting 1329 // tasks that many tasks could have been posted before a thread started 1330 // running them, so only one thread would have been created. So we also check 1331 // whether we should create more threads after removing our task from the 1332 // queue, which also has the nice side effect of creating the workers from 1333 // background threads rather than the main thread of the app. 1334 // 1335 // If another thread wasn't created, we want to wake up an existing thread 1336 // if there is one waiting to pick up the next task. 1337 // 1338 // Note that we really need to do this *before* running the task, not 1339 // after. Otherwise, if more than one task is posted, the creation of the 1340 // second thread (since we only create one at a time) will be blocked by 1341 // the execution of the first task, which could be arbitrarily long. 1342 return PrepareToStartAdditionalThreadIfHelpful(); 1343 } 1344 1345 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { 1346 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 1347 1348 lock_.AssertAcquired(); 1349 1350 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { 1351 DCHECK_GT(blocking_shutdown_thread_count_, 0u); 1352 blocking_shutdown_thread_count_--; 1353 } 1354 1355 if (task.sequence_token_id) 1356 current_sequences_.erase(task.sequence_token_id); 1357 } 1358 1359 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( 1360 int sequence_token_id) const { 1361 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 1362 1363 lock_.AssertAcquired(); 1364 return !sequence_token_id || 1365 current_sequences_.find(sequence_token_id) == 1366 current_sequences_.end(); 1367 } 1368 1369 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { 1370 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 1371 1372 lock_.AssertAcquired(); 1373 // How thread creation works: 1374 // 1375 // We'de like to avoid creating threads with the lock held. However, we 1376 // need to be sure that we have an accurate accounting of the threads for 1377 // proper Joining and deltion on shutdown. 1378 // 1379 // We need to figure out if we need another thread with the lock held, which 1380 // is what this function does. It then marks us as in the process of creating 1381 // a thread. When we do shutdown, we wait until the thread_being_created_ 1382 // flag is cleared, which ensures that the new thread is properly added to 1383 // all the data structures and we can't leak it. Once shutdown starts, we'll 1384 // refuse to create more threads or they would be leaked. 1385 // 1386 // Note that this creates a mostly benign race condition on shutdown that 1387 // will cause fewer workers to be created than one would expect. It isn't 1388 // much of an issue in real life, but affects some tests. Since we only spawn 1389 // one worker at a time, the following sequence of events can happen: 1390 // 1391 // 1. Main thread posts a bunch of unrelated tasks that would normally be 1392 // run on separate threads. 1393 // 2. The first task post causes us to start a worker. Other tasks do not 1394 // cause a worker to start since one is pending. 1395 // 3. Main thread initiates shutdown. 1396 // 4. No more threads are created since the shutdown_called_ flag is set. 1397 // 1398 // The result is that one may expect that max_threads_ workers to be created 1399 // given the workload, but in reality fewer may be created because the 1400 // sequence of thread creation on the background threads is racing with the 1401 // shutdown call. 1402 if (!shutdown_called_ && 1403 !thread_being_created_ && 1404 cleanup_state_ == CLEANUP_DONE && 1405 threads_.size() < max_threads_ && 1406 waiting_thread_count_ == 0) { 1407 // We could use an additional thread if there's work to be done. 1408 for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); 1409 i != pending_tasks_.end(); ++i) { 1410 if (IsSequenceTokenRunnable(i->sequence_token_id)) { 1411 // Found a runnable task, mark the thread as being started. 1412 thread_being_created_ = true; 1413 return static_cast<int>(threads_.size() + 1); 1414 } 1415 } 1416 } 1417 return 0; 1418 } 1419 1420 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( 1421 int thread_number) { 1422 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 1423 1424 // Called outside of the lock. 1425 DCHECK_GT(thread_number, 0); 1426 1427 // The worker is assigned to the list when the thread actually starts, which 1428 // will manage the memory of the pointer. 1429 new Worker(worker_pool_, thread_number, thread_name_prefix_); 1430 } 1431 1432 void SequencedWorkerPool::Inner::SignalHasWork() { 1433 DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); 1434 1435 has_work_cv_.Signal(); 1436 if (testing_observer_) { 1437 testing_observer_->OnHasWork(); 1438 } 1439 } 1440 1441 bool SequencedWorkerPool::Inner::CanShutdown() const { 1442 DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); 1443 lock_.AssertAcquired(); 1444 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 1445 return !thread_being_created_ && 1446 blocking_shutdown_thread_count_ == 0 && 1447 blocking_shutdown_pending_task_count_ == 0; 1448 } 1449 1450 base::StaticAtomicSequenceNumber 1451 SequencedWorkerPool::Inner::g_last_sequence_number_; 1452 1453 // SequencedWorkerPool -------------------------------------------------------- 1454 1455 std::string SequencedWorkerPool::SequenceToken::ToString() const { 1456 return base::StringPrintf("[%d]", id_); 1457 } 1458 1459 // static 1460 SequencedWorkerPool::SequenceToken 1461 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { 1462 Worker* worker = Worker::GetForCurrentThread(); 1463 if (!worker) 1464 return SequenceToken(); 1465 1466 return worker->task_sequence_token(); 1467 } 1468 1469 // static 1470 scoped_refptr<SequencedWorkerPool> 1471 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { 1472 Worker* worker = Worker::GetForCurrentThread(); 1473 if (!worker) 1474 return nullptr; 1475 1476 return worker->worker_pool(); 1477 } 1478 1479 // static 1480 void SequencedWorkerPool::EnableForProcess() { 1481 // TODO(fdoray): Uncomment this line. It is initially commented to avoid a 1482 // revert of the CL that adds debug::DumpWithoutCrashing() in case of 1483 // waterfall failures. 1484 // DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); 1485 g_all_pools_state = AllPoolsState::USE_WORKER_POOL; 1486 } 1487 1488 // static 1489 void SequencedWorkerPool::EnableWithRedirectionToTaskSchedulerForProcess( 1490 TaskPriority max_task_priority) { 1491 #if 1 1492 NOTREACHED(); 1493 #else 1494 // TODO(fdoray): Uncomment this line. It is initially commented to avoid a 1495 // revert of the CL that adds debug::DumpWithoutCrashing() in case of 1496 // waterfall failures. 1497 // DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); 1498 DCHECK(TaskScheduler::GetInstance()); 1499 g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; 1500 g_max_task_priority = max_task_priority; 1501 #endif 1502 } 1503 1504 // static 1505 void SequencedWorkerPool::DisableForProcessForTesting() { 1506 g_all_pools_state = AllPoolsState::POST_TASK_DISABLED; 1507 } 1508 1509 // static 1510 bool SequencedWorkerPool::IsEnabled() { 1511 return g_all_pools_state != AllPoolsState::POST_TASK_DISABLED; 1512 } 1513 1514 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1515 const std::string& thread_name_prefix, 1516 base::TaskPriority task_priority) 1517 : constructor_task_runner_(SequencedTaskRunnerHandle::Get()), 1518 inner_(new Inner(this, 1519 max_threads, 1520 thread_name_prefix, 1521 task_priority, 1522 NULL)) {} 1523 1524 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1525 const std::string& thread_name_prefix, 1526 base::TaskPriority task_priority, 1527 TestingObserver* observer) 1528 : constructor_task_runner_(SequencedTaskRunnerHandle::Get()), 1529 inner_(new Inner(this, 1530 max_threads, 1531 thread_name_prefix, 1532 task_priority, 1533 observer)) {} 1534 1535 SequencedWorkerPool::~SequencedWorkerPool() {} 1536 1537 void SequencedWorkerPool::OnDestruct() const { 1538 // Avoid deleting ourselves on a worker thread (which would deadlock). 1539 if (RunsTasksOnCurrentThread()) { 1540 constructor_task_runner_->DeleteSoon(FROM_HERE, this); 1541 } else { 1542 delete this; 1543 } 1544 } 1545 1546 // static 1547 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 1548 return Inner::GetSequenceToken(); 1549 } 1550 1551 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 1552 const std::string& name) { 1553 return inner_->GetNamedSequenceToken(name); 1554 } 1555 1556 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( 1557 SequenceToken token) { 1558 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); 1559 } 1560 1561 scoped_refptr<SequencedTaskRunner> 1562 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior( 1563 SequenceToken token, WorkerShutdown shutdown_behavior) { 1564 return new PoolSequencedTaskRunner( 1565 this, token, shutdown_behavior); 1566 } 1567 1568 scoped_refptr<TaskRunner> 1569 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior( 1570 WorkerShutdown shutdown_behavior) { 1571 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior); 1572 } 1573 1574 bool SequencedWorkerPool::PostWorkerTask( 1575 const tracked_objects::Location& from_here, 1576 OnceClosure task) { 1577 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, from_here, 1578 std::move(task), TimeDelta()); 1579 } 1580 1581 bool SequencedWorkerPool::PostDelayedWorkerTask( 1582 const tracked_objects::Location& from_here, 1583 OnceClosure task, 1584 TimeDelta delay) { 1585 WorkerShutdown shutdown_behavior = 1586 delay.is_zero() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1587 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, from_here, 1588 std::move(task), delay); 1589 } 1590 1591 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( 1592 const tracked_objects::Location& from_here, 1593 OnceClosure task, 1594 WorkerShutdown shutdown_behavior) { 1595 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, from_here, 1596 std::move(task), TimeDelta()); 1597 } 1598 1599 bool SequencedWorkerPool::PostSequencedWorkerTask( 1600 SequenceToken sequence_token, 1601 const tracked_objects::Location& from_here, 1602 OnceClosure task) { 1603 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN, from_here, 1604 std::move(task), TimeDelta()); 1605 } 1606 1607 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask( 1608 SequenceToken sequence_token, 1609 const tracked_objects::Location& from_here, 1610 OnceClosure task, 1611 TimeDelta delay) { 1612 WorkerShutdown shutdown_behavior = 1613 delay.is_zero() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1614 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, from_here, 1615 std::move(task), delay); 1616 } 1617 1618 bool SequencedWorkerPool::PostNamedSequencedWorkerTask( 1619 const std::string& token_name, 1620 const tracked_objects::Location& from_here, 1621 OnceClosure task) { 1622 DCHECK(!token_name.empty()); 1623 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN, 1624 from_here, std::move(task), TimeDelta()); 1625 } 1626 1627 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( 1628 SequenceToken sequence_token, 1629 const tracked_objects::Location& from_here, 1630 OnceClosure task, 1631 WorkerShutdown shutdown_behavior) { 1632 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, from_here, 1633 std::move(task), TimeDelta()); 1634 } 1635 1636 bool SequencedWorkerPool::PostDelayedTask( 1637 const tracked_objects::Location& from_here, 1638 OnceClosure task, 1639 TimeDelta delay) { 1640 return PostDelayedWorkerTask(from_here, std::move(task), delay); 1641 } 1642 1643 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1644 return inner_->RunsTasksOnCurrentThread(); 1645 } 1646 1647 void SequencedWorkerPool::FlushForTesting() { 1648 DCHECK(!RunsTasksOnCurrentThread()); 1649 base::ThreadRestrictions::ScopedAllowWait allow_wait; 1650 if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { 1651 #if 1 1652 NOTREACHED(); 1653 #else 1654 // TODO(gab): Remove this if http://crbug.com/622400 fails. 1655 TaskScheduler::GetInstance()->FlushForTesting(); 1656 #endif 1657 } else { 1658 inner_->CleanupForTesting(); 1659 } 1660 } 1661 1662 void SequencedWorkerPool::SignalHasWorkForTesting() { 1663 inner_->SignalHasWorkForTesting(); 1664 } 1665 1666 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1667 DCHECK(constructor_task_runner_->RunsTasksOnCurrentThread()); 1668 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1669 } 1670 1671 bool SequencedWorkerPool::IsShutdownInProgress() { 1672 return inner_->IsShutdownInProgress(); 1673 } 1674 1675 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1676 SequenceToken sequence_token) const { 1677 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1678 } 1679 1680 } // namespace base 1681