Home | History | Annotate | Download | only in threading
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "base/threading/sequenced_worker_pool.h"
      6 
      7 #include <stdint.h>
      8 
      9 #include <list>
     10 #include <map>
     11 #include <memory>
     12 #include <set>
     13 #include <utility>
     14 #include <vector>
     15 
     16 #include "base/atomic_sequence_num.h"
     17 #include "base/callback.h"
     18 #include "base/compiler_specific.h"
     19 #include "base/critical_closure.h"
     20 #include "base/lazy_instance.h"
     21 #include "base/logging.h"
     22 #include "base/macros.h"
     23 #include "base/memory/ptr_util.h"
     24 #include "base/stl_util.h"
     25 #include "base/strings/stringprintf.h"
     26 #include "base/synchronization/condition_variable.h"
     27 #include "base/synchronization/lock.h"
     28 #include "base/threading/platform_thread.h"
     29 #include "base/threading/simple_thread.h"
     30 #include "base/threading/thread_local.h"
     31 #include "base/threading/thread_restrictions.h"
     32 #include "base/threading/thread_task_runner_handle.h"
     33 #include "base/time/time.h"
     34 #include "base/trace_event/heap_profiler.h"
     35 #include "base/trace_event/trace_event.h"
     36 #include "base/tracked_objects.h"
     37 #include "build/build_config.h"
     38 
     39 #if defined(OS_MACOSX)
     40 #include "base/mac/scoped_nsautorelease_pool.h"
     41 #elif defined(OS_WIN)
     42 #include "base/win/scoped_com_initializer.h"
     43 #endif
     44 
     45 #if !defined(OS_NACL)
     46 #include "base/metrics/histogram.h"
     47 #endif
     48 
     49 namespace base {
     50 
     51 namespace {
     52 
     53 struct SequencedTask : public TrackingInfo  {
     54   SequencedTask()
     55       : sequence_token_id(0),
     56         trace_id(0),
     57         sequence_task_number(0),
     58         shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
     59 
     60   explicit SequencedTask(const tracked_objects::Location& from_here)
     61       : base::TrackingInfo(from_here, TimeTicks()),
     62         sequence_token_id(0),
     63         trace_id(0),
     64         sequence_task_number(0),
     65         shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
     66 
     67   ~SequencedTask() {}
     68 
     69   int sequence_token_id;
     70   int trace_id;
     71   int64_t sequence_task_number;
     72   SequencedWorkerPool::WorkerShutdown shutdown_behavior;
     73   tracked_objects::Location posted_from;
     74   Closure task;
     75 
     76   // Non-delayed tasks and delayed tasks are managed together by time-to-run
     77   // order. We calculate the time by adding the posted time and the given delay.
     78   TimeTicks time_to_run;
     79 };
     80 
     81 struct SequencedTaskLessThan {
     82  public:
     83   bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const {
     84     if (lhs.time_to_run < rhs.time_to_run)
     85       return true;
     86 
     87     if (lhs.time_to_run > rhs.time_to_run)
     88       return false;
     89 
     90     // If the time happen to match, then we use the sequence number to decide.
     91     return lhs.sequence_task_number < rhs.sequence_task_number;
     92   }
     93 };
     94 
     95 // SequencedWorkerPoolTaskRunner ---------------------------------------------
     96 // A TaskRunner which posts tasks to a SequencedWorkerPool with a
     97 // fixed ShutdownBehavior.
     98 //
     99 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
    100 class SequencedWorkerPoolTaskRunner : public TaskRunner {
    101  public:
    102   SequencedWorkerPoolTaskRunner(
    103       scoped_refptr<SequencedWorkerPool> pool,
    104       SequencedWorkerPool::WorkerShutdown shutdown_behavior);
    105 
    106   // TaskRunner implementation
    107   bool PostDelayedTask(const tracked_objects::Location& from_here,
    108                        const Closure& task,
    109                        TimeDelta delay) override;
    110   bool RunsTasksOnCurrentThread() const override;
    111 
    112  private:
    113   ~SequencedWorkerPoolTaskRunner() override;
    114 
    115   const scoped_refptr<SequencedWorkerPool> pool_;
    116 
    117   const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
    118 
    119   DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner);
    120 };
    121 
    122 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
    123     scoped_refptr<SequencedWorkerPool> pool,
    124     SequencedWorkerPool::WorkerShutdown shutdown_behavior)
    125     : pool_(std::move(pool)), shutdown_behavior_(shutdown_behavior) {}
    126 
    127 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
    128 }
    129 
    130 bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
    131     const tracked_objects::Location& from_here,
    132     const Closure& task,
    133     TimeDelta delay) {
    134   if (delay.is_zero()) {
    135     return pool_->PostWorkerTaskWithShutdownBehavior(
    136         from_here, task, shutdown_behavior_);
    137   }
    138   return pool_->PostDelayedWorkerTask(from_here, task, delay);
    139 }
    140 
    141 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
    142   return pool_->RunsTasksOnCurrentThread();
    143 }
    144 
    145 // SequencedWorkerPoolSequencedTaskRunner ------------------------------------
    146 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
    147 // fixed sequence token.
    148 //
    149 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
    150 class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
    151  public:
    152   SequencedWorkerPoolSequencedTaskRunner(
    153       scoped_refptr<SequencedWorkerPool> pool,
    154       SequencedWorkerPool::SequenceToken token,
    155       SequencedWorkerPool::WorkerShutdown shutdown_behavior);
    156 
    157   // TaskRunner implementation
    158   bool PostDelayedTask(const tracked_objects::Location& from_here,
    159                        const Closure& task,
    160                        TimeDelta delay) override;
    161   bool RunsTasksOnCurrentThread() const override;
    162 
    163   // SequencedTaskRunner implementation
    164   bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
    165                                   const Closure& task,
    166                                   TimeDelta delay) override;
    167 
    168  private:
    169   ~SequencedWorkerPoolSequencedTaskRunner() override;
    170 
    171   const scoped_refptr<SequencedWorkerPool> pool_;
    172 
    173   const SequencedWorkerPool::SequenceToken token_;
    174 
    175   const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
    176 
    177   DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner);
    178 };
    179 
    180 SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
    181     scoped_refptr<SequencedWorkerPool> pool,
    182     SequencedWorkerPool::SequenceToken token,
    183     SequencedWorkerPool::WorkerShutdown shutdown_behavior)
    184     : pool_(std::move(pool)),
    185       token_(token),
    186       shutdown_behavior_(shutdown_behavior) {}
    187 
    188 SequencedWorkerPoolSequencedTaskRunner::
    189 ~SequencedWorkerPoolSequencedTaskRunner() {
    190 }
    191 
    192 bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
    193     const tracked_objects::Location& from_here,
    194     const Closure& task,
    195     TimeDelta delay) {
    196   if (delay.is_zero()) {
    197     return pool_->PostSequencedWorkerTaskWithShutdownBehavior(
    198         token_, from_here, task, shutdown_behavior_);
    199   }
    200   return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay);
    201 }
    202 
    203 bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
    204   return pool_->IsRunningSequenceOnCurrentThread(token_);
    205 }
    206 
    207 bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
    208     const tracked_objects::Location& from_here,
    209     const Closure& task,
    210     TimeDelta delay) {
    211   // There's no way to run nested tasks, so simply forward to
    212   // PostDelayedTask.
    213   return PostDelayedTask(from_here, task, delay);
    214 }
    215 
    216 // Create a process-wide unique ID to represent this task in trace events. This
    217 // will be mangled with a Process ID hash to reduce the likelyhood of colliding
    218 // with MessageLoop pointers on other processes.
    219 uint64_t GetTaskTraceID(const SequencedTask& task, void* pool) {
    220   return (static_cast<uint64_t>(task.trace_id) << 32) |
    221          static_cast<uint64_t>(reinterpret_cast<intptr_t>(pool));
    222 }
    223 
    224 }  // namespace
    225 
    226 // Worker ---------------------------------------------------------------------
    227 
    228 class SequencedWorkerPool::Worker : public SimpleThread {
    229  public:
    230   // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
    231   // around as long as we are running.
    232   Worker(scoped_refptr<SequencedWorkerPool> worker_pool,
    233          int thread_number,
    234          const std::string& thread_name_prefix);
    235   ~Worker() override;
    236 
    237   // SimpleThread implementation. This actually runs the background thread.
    238   void Run() override;
    239 
    240   // Gets the worker for the current thread out of thread-local storage.
    241   static Worker* GetForCurrentThread();
    242 
    243   // Indicates that a task is about to be run. The parameters provide
    244   // additional metainformation about the task being run.
    245   void set_running_task_info(SequenceToken token,
    246                              WorkerShutdown shutdown_behavior) {
    247     is_processing_task_ = true;
    248     task_sequence_token_ = token;
    249     task_shutdown_behavior_ = shutdown_behavior;
    250   }
    251 
    252   // Indicates that the task has finished running.
    253   void reset_running_task_info() { is_processing_task_ = false; }
    254 
    255   // Whether the worker is processing a task.
    256   bool is_processing_task() { return is_processing_task_; }
    257 
    258   SequenceToken task_sequence_token() const {
    259     DCHECK(is_processing_task_);
    260     return task_sequence_token_;
    261   }
    262 
    263   WorkerShutdown task_shutdown_behavior() const {
    264     DCHECK(is_processing_task_);
    265     return task_shutdown_behavior_;
    266   }
    267 
    268   scoped_refptr<SequencedWorkerPool> worker_pool() const {
    269     return worker_pool_;
    270   }
    271 
    272  private:
    273   static LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky
    274       lazy_tls_ptr_;
    275 
    276   scoped_refptr<SequencedWorkerPool> worker_pool_;
    277   // The sequence token of the task being processed. Only valid when
    278   // is_processing_task_ is true.
    279   SequenceToken task_sequence_token_;
    280   // The shutdown behavior of the task being processed. Only valid when
    281   // is_processing_task_ is true.
    282   WorkerShutdown task_shutdown_behavior_;
    283   // Whether the Worker is processing a task.
    284   bool is_processing_task_;
    285 
    286   DISALLOW_COPY_AND_ASSIGN(Worker);
    287 };
    288 
    289 // Inner ----------------------------------------------------------------------
    290 
    291 class SequencedWorkerPool::Inner {
    292  public:
    293   // Take a raw pointer to |worker| to avoid cycles (since we're owned
    294   // by it).
    295   Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
    296         const std::string& thread_name_prefix,
    297         TestingObserver* observer);
    298 
    299   ~Inner();
    300 
    301   static SequenceToken GetSequenceToken();
    302 
    303   SequenceToken GetNamedSequenceToken(const std::string& name);
    304 
    305   // This function accepts a name and an ID. If the name is null, the
    306   // token ID is used. This allows us to implement the optional name lookup
    307   // from a single function without having to enter the lock a separate time.
    308   bool PostTask(const std::string* optional_token_name,
    309                 SequenceToken sequence_token,
    310                 WorkerShutdown shutdown_behavior,
    311                 const tracked_objects::Location& from_here,
    312                 const Closure& task,
    313                 TimeDelta delay);
    314 
    315   bool RunsTasksOnCurrentThread() const;
    316 
    317   bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
    318 
    319   bool IsRunningSequence(SequenceToken sequence_token) const;
    320 
    321   void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token,
    322                                           WorkerShutdown shutdown_behavior);
    323 
    324   void CleanupForTesting();
    325 
    326   void SignalHasWorkForTesting();
    327 
    328   int GetWorkSignalCountForTesting() const;
    329 
    330   void Shutdown(int max_blocking_tasks_after_shutdown);
    331 
    332   bool IsShutdownInProgress();
    333 
    334   // Runs the worker loop on the background thread.
    335   void ThreadLoop(Worker* this_worker);
    336 
    337  private:
    338   enum GetWorkStatus {
    339     GET_WORK_FOUND,
    340     GET_WORK_NOT_FOUND,
    341     GET_WORK_WAIT,
    342   };
    343 
    344   enum CleanupState {
    345     CLEANUP_REQUESTED,
    346     CLEANUP_STARTING,
    347     CLEANUP_RUNNING,
    348     CLEANUP_FINISHING,
    349     CLEANUP_DONE,
    350   };
    351 
    352   // Called from within the lock, this converts the given token name into a
    353   // token ID, creating a new one if necessary.
    354   int LockedGetNamedTokenID(const std::string& name);
    355 
    356   // Called from within the lock, this returns the next sequence task number.
    357   int64_t LockedGetNextSequenceTaskNumber();
    358 
    359   // Gets new task. There are 3 cases depending on the return value:
    360   //
    361   // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
    362   //    be run immediately.
    363   // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run,
    364   //    and |task| is not filled in. In this case, the caller should wait until
    365   //    a task is posted.
    366   // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
    367   //    immediately, and |task| is not filled in. Likewise, |wait_time| is
    368   //    filled in the time to wait until the next task to run. In this case, the
    369   //    caller should wait the time.
    370   //
    371   // In any case, the calling code should clear the given
    372   // delete_these_outside_lock vector the next time the lock is released.
    373   // See the implementation for a more detailed description.
    374   GetWorkStatus GetWork(SequencedTask* task,
    375                         TimeDelta* wait_time,
    376                         std::vector<Closure>* delete_these_outside_lock);
    377 
    378   void HandleCleanup();
    379 
    380   // Peforms init and cleanup around running the given task. WillRun...
    381   // returns the value from PrepareToStartAdditionalThreadIfNecessary.
    382   // The calling code should call FinishStartingAdditionalThread once the
    383   // lock is released if the return values is nonzero.
    384   int WillRunWorkerTask(const SequencedTask& task);
    385   void DidRunWorkerTask(const SequencedTask& task);
    386 
    387   // Returns true if there are no threads currently running the given
    388   // sequence token.
    389   bool IsSequenceTokenRunnable(int sequence_token_id) const;
    390 
    391   // Checks if all threads are busy and the addition of one more could run an
    392   // additional task waiting in the queue. This must be called from within
    393   // the lock.
    394   //
    395   // If another thread is helpful, this will mark the thread as being in the
    396   // process of starting and returns the index of the new thread which will be
    397   // 0 or more. The caller should then call FinishStartingAdditionalThread to
    398   // complete initialization once the lock is released.
    399   //
    400   // If another thread is not necessary, returne 0;
    401   //
    402   // See the implementedion for more.
    403   int PrepareToStartAdditionalThreadIfHelpful();
    404 
    405   // The second part of thread creation after
    406   // PrepareToStartAdditionalThreadIfHelpful with the thread number it
    407   // generated. This actually creates the thread and should be called outside
    408   // the lock to avoid blocking important work starting a thread in the lock.
    409   void FinishStartingAdditionalThread(int thread_number);
    410 
    411   // Signal |has_work_| and increment |has_work_signal_count_|.
    412   void SignalHasWork();
    413 
    414   // Checks whether there is work left that's blocking shutdown. Must be
    415   // called inside the lock.
    416   bool CanShutdown() const;
    417 
    418   SequencedWorkerPool* const worker_pool_;
    419 
    420   // The last sequence number used. Managed by GetSequenceToken, since this
    421   // only does threadsafe increment operations, you do not need to hold the
    422   // lock. This is class-static to make SequenceTokens issued by
    423   // GetSequenceToken unique across SequencedWorkerPool instances.
    424   static base::StaticAtomicSequenceNumber g_last_sequence_number_;
    425 
    426   // This lock protects |everything in this class|. Do not read or modify
    427   // anything without holding this lock. Do not block while holding this
    428   // lock.
    429   mutable Lock lock_;
    430 
    431   // Condition variable that is waited on by worker threads until new
    432   // tasks are posted or shutdown starts.
    433   ConditionVariable has_work_cv_;
    434 
    435   // Condition variable that is waited on by non-worker threads (in
    436   // Shutdown()) until CanShutdown() goes to true.
    437   ConditionVariable can_shutdown_cv_;
    438 
    439   // The maximum number of worker threads we'll create.
    440   const size_t max_threads_;
    441 
    442   const std::string thread_name_prefix_;
    443 
    444   // Associates all known sequence token names with their IDs.
    445   std::map<std::string, int> named_sequence_tokens_;
    446 
    447   // Owning pointers to all threads we've created so far, indexed by
    448   // ID. Since we lazily create threads, this may be less than
    449   // max_threads_ and will be initially empty.
    450   using ThreadMap = std::map<PlatformThreadId, std::unique_ptr<Worker>>;
    451   ThreadMap threads_;
    452 
    453   // Set to true when we're in the process of creating another thread.
    454   // See PrepareToStartAdditionalThreadIfHelpful for more.
    455   bool thread_being_created_;
    456 
    457   // Number of threads currently waiting for work.
    458   size_t waiting_thread_count_;
    459 
    460   // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
    461   // or SKIP_ON_SHUTDOWN flag set.
    462   size_t blocking_shutdown_thread_count_;
    463 
    464   // A set of all pending tasks in time-to-run order. These are tasks that are
    465   // either waiting for a thread to run on, waiting for their time to run,
    466   // or blocked on a previous task in their sequence. We have to iterate over
    467   // the tasks by time-to-run order, so we use the set instead of the
    468   // traditional priority_queue.
    469   typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet;
    470   PendingTaskSet pending_tasks_;
    471 
    472   // The next sequence number for a new sequenced task.
    473   int64_t next_sequence_task_number_;
    474 
    475   // Number of tasks in the pending_tasks_ list that are marked as blocking
    476   // shutdown.
    477   size_t blocking_shutdown_pending_task_count_;
    478 
    479   // Lists all sequence tokens currently executing.
    480   std::set<int> current_sequences_;
    481 
    482   // An ID for each posted task to distinguish the task from others in traces.
    483   int trace_id_;
    484 
    485   // Set when Shutdown is called and no further tasks should be
    486   // allowed, though we may still be running existing tasks.
    487   bool shutdown_called_;
    488 
    489   // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown()
    490   // has been called.
    491   int max_blocking_tasks_after_shutdown_;
    492 
    493   // State used to cleanup for testing, all guarded by lock_.
    494   CleanupState cleanup_state_;
    495   size_t cleanup_idlers_;
    496   ConditionVariable cleanup_cv_;
    497 
    498   TestingObserver* const testing_observer_;
    499 
    500   DISALLOW_COPY_AND_ASSIGN(Inner);
    501 };
    502 
    503 // Worker definitions ---------------------------------------------------------
    504 
    505 SequencedWorkerPool::Worker::Worker(
    506     scoped_refptr<SequencedWorkerPool> worker_pool,
    507     int thread_number,
    508     const std::string& prefix)
    509     : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)),
    510       worker_pool_(std::move(worker_pool)),
    511       task_shutdown_behavior_(BLOCK_SHUTDOWN),
    512       is_processing_task_(false) {
    513   Start();
    514 }
    515 
    516 SequencedWorkerPool::Worker::~Worker() {
    517 }
    518 
    519 void SequencedWorkerPool::Worker::Run() {
    520 #if defined(OS_WIN)
    521   win::ScopedCOMInitializer com_initializer;
    522 #endif
    523 
    524   // Store a pointer to this worker in thread local storage for static function
    525   // access.
    526   DCHECK(!lazy_tls_ptr_.Get().Get());
    527   lazy_tls_ptr_.Get().Set(this);
    528 
    529   // Just jump back to the Inner object to run the thread, since it has all the
    530   // tracking information and queues. It might be more natural to implement
    531   // using DelegateSimpleThread and have Inner implement the Delegate to avoid
    532   // having these worker objects at all, but that method lacks the ability to
    533   // send thread-specific information easily to the thread loop.
    534   worker_pool_->inner_->ThreadLoop(this);
    535   // Release our cyclic reference once we're done.
    536   worker_pool_ = nullptr;
    537 }
    538 
    539 // static
    540 SequencedWorkerPool::Worker*
    541 SequencedWorkerPool::Worker::GetForCurrentThread() {
    542   // Don't construct lazy instance on check.
    543   if (lazy_tls_ptr_ == nullptr)
    544     return nullptr;
    545 
    546   return lazy_tls_ptr_.Get().Get();
    547 }
    548 
    549 // static
    550 LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky
    551     SequencedWorkerPool::Worker::lazy_tls_ptr_ = LAZY_INSTANCE_INITIALIZER;
    552 
    553 // Inner definitions ---------------------------------------------------------
    554 
    555 SequencedWorkerPool::Inner::Inner(
    556     SequencedWorkerPool* worker_pool,
    557     size_t max_threads,
    558     const std::string& thread_name_prefix,
    559     TestingObserver* observer)
    560     : worker_pool_(worker_pool),
    561       lock_(),
    562       has_work_cv_(&lock_),
    563       can_shutdown_cv_(&lock_),
    564       max_threads_(max_threads),
    565       thread_name_prefix_(thread_name_prefix),
    566       thread_being_created_(false),
    567       waiting_thread_count_(0),
    568       blocking_shutdown_thread_count_(0),
    569       next_sequence_task_number_(0),
    570       blocking_shutdown_pending_task_count_(0),
    571       trace_id_(0),
    572       shutdown_called_(false),
    573       max_blocking_tasks_after_shutdown_(0),
    574       cleanup_state_(CLEANUP_DONE),
    575       cleanup_idlers_(0),
    576       cleanup_cv_(&lock_),
    577       testing_observer_(observer) {}
    578 
    579 SequencedWorkerPool::Inner::~Inner() {
    580   // You must call Shutdown() before destroying the pool.
    581   DCHECK(shutdown_called_);
    582 
    583   // Need to explicitly join with the threads before they're destroyed or else
    584   // they will be running when our object is half torn down.
    585   for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
    586     it->second->Join();
    587   threads_.clear();
    588 
    589   if (testing_observer_)
    590     testing_observer_->OnDestruct();
    591 }
    592 
    593 // static
    594 SequencedWorkerPool::SequenceToken
    595 SequencedWorkerPool::Inner::GetSequenceToken() {
    596   // Need to add one because StaticAtomicSequenceNumber starts at zero, which
    597   // is used as a sentinel value in SequenceTokens.
    598   return SequenceToken(g_last_sequence_number_.GetNext() + 1);
    599 }
    600 
    601 SequencedWorkerPool::SequenceToken
    602 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
    603   AutoLock lock(lock_);
    604   return SequenceToken(LockedGetNamedTokenID(name));
    605 }
    606 
    607 bool SequencedWorkerPool::Inner::PostTask(
    608     const std::string* optional_token_name,
    609     SequenceToken sequence_token,
    610     WorkerShutdown shutdown_behavior,
    611     const tracked_objects::Location& from_here,
    612     const Closure& task,
    613     TimeDelta delay) {
    614   DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN);
    615   SequencedTask sequenced(from_here);
    616   sequenced.sequence_token_id = sequence_token.id_;
    617   sequenced.shutdown_behavior = shutdown_behavior;
    618   sequenced.posted_from = from_here;
    619   sequenced.task =
    620       shutdown_behavior == BLOCK_SHUTDOWN ?
    621       base::MakeCriticalClosure(task) : task;
    622   sequenced.time_to_run = TimeTicks::Now() + delay;
    623 
    624   int create_thread_id = 0;
    625   {
    626     AutoLock lock(lock_);
    627     if (shutdown_called_) {
    628       // Don't allow a new task to be posted if it doesn't block shutdown.
    629       if (shutdown_behavior != BLOCK_SHUTDOWN)
    630         return false;
    631 
    632       // If the current thread is running a task, and that task doesn't block
    633       // shutdown, then it shouldn't be allowed to post any more tasks.
    634       ThreadMap::const_iterator found =
    635           threads_.find(PlatformThread::CurrentId());
    636       if (found != threads_.end() && found->second->is_processing_task() &&
    637           found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) {
    638         return false;
    639       }
    640 
    641       if (max_blocking_tasks_after_shutdown_ <= 0) {
    642         DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
    643         return false;
    644       }
    645       max_blocking_tasks_after_shutdown_ -= 1;
    646     }
    647 
    648     // The trace_id is used for identifying the task in about:tracing.
    649     sequenced.trace_id = trace_id_++;
    650 
    651     TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
    652         "SequencedWorkerPool::Inner::PostTask",
    653         TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))),
    654         TRACE_EVENT_FLAG_FLOW_OUT);
    655 
    656     sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
    657 
    658     // Now that we have the lock, apply the named token rules.
    659     if (optional_token_name)
    660       sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
    661 
    662     pending_tasks_.insert(sequenced);
    663     if (shutdown_behavior == BLOCK_SHUTDOWN)
    664       blocking_shutdown_pending_task_count_++;
    665 
    666     create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
    667   }
    668 
    669   // Actually start the additional thread or signal an existing one now that
    670   // we're outside the lock.
    671   if (create_thread_id)
    672     FinishStartingAdditionalThread(create_thread_id);
    673   else
    674     SignalHasWork();
    675 
    676   return true;
    677 }
    678 
    679 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
    680   AutoLock lock(lock_);
    681   return ContainsKey(threads_, PlatformThread::CurrentId());
    682 }
    683 
    684 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
    685     SequenceToken sequence_token) const {
    686   AutoLock lock(lock_);
    687   ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
    688   if (found == threads_.end())
    689     return false;
    690   return found->second->is_processing_task() &&
    691          sequence_token.Equals(found->second->task_sequence_token());
    692 }
    693 
    694 bool SequencedWorkerPool::Inner::IsRunningSequence(
    695     SequenceToken sequence_token) const {
    696   DCHECK(sequence_token.IsValid());
    697   AutoLock lock(lock_);
    698   return !IsSequenceTokenRunnable(sequence_token.id_);
    699 }
    700 
    701 void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread(
    702     SequenceToken sequence_token,
    703     WorkerShutdown shutdown_behavior) {
    704   AutoLock lock(lock_);
    705   ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
    706   DCHECK(found != threads_.end());
    707   DCHECK(found->second->is_processing_task());
    708   DCHECK(!found->second->task_sequence_token().IsValid());
    709   found->second->set_running_task_info(sequence_token, shutdown_behavior);
    710 
    711   // Mark the sequence token as in use.
    712   bool success = current_sequences_.insert(sequence_token.id_).second;
    713   DCHECK(success);
    714 }
    715 
    716 // See https://code.google.com/p/chromium/issues/detail?id=168415
    717 void SequencedWorkerPool::Inner::CleanupForTesting() {
    718   DCHECK(!RunsTasksOnCurrentThread());
    719   base::ThreadRestrictions::ScopedAllowWait allow_wait;
    720   AutoLock lock(lock_);
    721   CHECK_EQ(CLEANUP_DONE, cleanup_state_);
    722   if (shutdown_called_)
    723     return;
    724   if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
    725     return;
    726   cleanup_state_ = CLEANUP_REQUESTED;
    727   cleanup_idlers_ = 0;
    728   has_work_cv_.Signal();
    729   while (cleanup_state_ != CLEANUP_DONE)
    730     cleanup_cv_.Wait();
    731 }
    732 
    733 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
    734   SignalHasWork();
    735 }
    736 
    737 void SequencedWorkerPool::Inner::Shutdown(
    738     int max_new_blocking_tasks_after_shutdown) {
    739   DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
    740   {
    741     AutoLock lock(lock_);
    742     // Cleanup and Shutdown should not be called concurrently.
    743     CHECK_EQ(CLEANUP_DONE, cleanup_state_);
    744     if (shutdown_called_)
    745       return;
    746     shutdown_called_ = true;
    747     max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
    748 
    749     // Tickle the threads. This will wake up a waiting one so it will know that
    750     // it can exit, which in turn will wake up any other waiting ones.
    751     SignalHasWork();
    752 
    753     // There are no pending or running tasks blocking shutdown, we're done.
    754     if (CanShutdown())
    755       return;
    756   }
    757 
    758   // If we're here, then something is blocking shutdown.  So wait for
    759   // CanShutdown() to go to true.
    760 
    761   if (testing_observer_)
    762     testing_observer_->WillWaitForShutdown();
    763 
    764 #if !defined(OS_NACL)
    765   TimeTicks shutdown_wait_begin = TimeTicks::Now();
    766 #endif
    767 
    768   {
    769     base::ThreadRestrictions::ScopedAllowWait allow_wait;
    770     AutoLock lock(lock_);
    771     while (!CanShutdown())
    772       can_shutdown_cv_.Wait();
    773   }
    774 #if !defined(OS_NACL)
    775   UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
    776                       TimeTicks::Now() - shutdown_wait_begin);
    777 #endif
    778 }
    779 
    780 bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
    781     AutoLock lock(lock_);
    782     return shutdown_called_;
    783 }
    784 
    785 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
    786   {
    787     AutoLock lock(lock_);
    788     DCHECK(thread_being_created_);
    789     thread_being_created_ = false;
    790     auto result = threads_.insert(
    791         std::make_pair(this_worker->tid(), WrapUnique(this_worker)));
    792     DCHECK(result.second);
    793 
    794     while (true) {
    795 #if defined(OS_MACOSX)
    796       base::mac::ScopedNSAutoreleasePool autorelease_pool;
    797 #endif
    798 
    799       HandleCleanup();
    800 
    801       // See GetWork for what delete_these_outside_lock is doing.
    802       SequencedTask task;
    803       TimeDelta wait_time;
    804       std::vector<Closure> delete_these_outside_lock;
    805       GetWorkStatus status =
    806           GetWork(&task, &wait_time, &delete_these_outside_lock);
    807       if (status == GET_WORK_FOUND) {
    808         TRACE_EVENT_WITH_FLOW2(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
    809             "SequencedWorkerPool::Inner::ThreadLoop",
    810             TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))),
    811             TRACE_EVENT_FLAG_FLOW_IN,
    812             "src_file", task.posted_from.file_name(),
    813             "src_func", task.posted_from.function_name());
    814         TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION task_event(
    815             task.posted_from.file_name());
    816         int new_thread_id = WillRunWorkerTask(task);
    817         {
    818           AutoUnlock unlock(lock_);
    819           // There may be more work available, so wake up another
    820           // worker thread. (Technically not required, since we
    821           // already get a signal for each new task, but it doesn't
    822           // hurt.)
    823           SignalHasWork();
    824           delete_these_outside_lock.clear();
    825 
    826           // Complete thread creation outside the lock if necessary.
    827           if (new_thread_id)
    828             FinishStartingAdditionalThread(new_thread_id);
    829 
    830           this_worker->set_running_task_info(
    831               SequenceToken(task.sequence_token_id), task.shutdown_behavior);
    832 
    833           tracked_objects::TaskStopwatch stopwatch;
    834           stopwatch.Start();
    835           task.task.Run();
    836           stopwatch.Stop();
    837 
    838           tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(
    839               task, stopwatch);
    840 
    841           // Update the sequence token in case it has been set from within the
    842           // task, so it can be removed from the set of currently running
    843           // sequences in DidRunWorkerTask() below.
    844           task.sequence_token_id = this_worker->task_sequence_token().id_;
    845 
    846           // Make sure our task is erased outside the lock for the
    847           // same reason we do this with delete_these_oustide_lock.
    848           // Also, do it before calling reset_running_task_info() so
    849           // that sequence-checking from within the task's destructor
    850           // still works.
    851           task.task = Closure();
    852 
    853           this_worker->reset_running_task_info();
    854         }
    855         DidRunWorkerTask(task);  // Must be done inside the lock.
    856       } else if (cleanup_state_ == CLEANUP_RUNNING) {
    857         switch (status) {
    858           case GET_WORK_WAIT: {
    859               AutoUnlock unlock(lock_);
    860               delete_these_outside_lock.clear();
    861             }
    862             break;
    863           case GET_WORK_NOT_FOUND:
    864             CHECK(delete_these_outside_lock.empty());
    865             cleanup_state_ = CLEANUP_FINISHING;
    866             cleanup_cv_.Broadcast();
    867             break;
    868           default:
    869             NOTREACHED();
    870         }
    871       } else {
    872         // When we're terminating and there's no more work, we can
    873         // shut down, other workers can complete any pending or new tasks.
    874         // We can get additional tasks posted after shutdown_called_ is set
    875         // but only worker threads are allowed to post tasks at that time, and
    876         // the workers responsible for posting those tasks will be available
    877         // to run them. Also, there may be some tasks stuck behind running
    878         // ones with the same sequence token, but additional threads won't
    879         // help this case.
    880         if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) {
    881           AutoUnlock unlock(lock_);
    882           delete_these_outside_lock.clear();
    883           break;
    884         }
    885 
    886         // No work was found, but there are tasks that need deletion. The
    887         // deletion must happen outside of the lock.
    888         if (delete_these_outside_lock.size()) {
    889           AutoUnlock unlock(lock_);
    890           delete_these_outside_lock.clear();
    891 
    892           // Since the lock has been released, |status| may no longer be
    893           // accurate. It might read GET_WORK_WAIT even if there are tasks
    894           // ready to perform work. Jump to the top of the loop to recalculate
    895           // |status|.
    896           continue;
    897         }
    898 
    899         waiting_thread_count_++;
    900 
    901         switch (status) {
    902           case GET_WORK_NOT_FOUND:
    903             has_work_cv_.Wait();
    904             break;
    905           case GET_WORK_WAIT:
    906             has_work_cv_.TimedWait(wait_time);
    907             break;
    908           default:
    909             NOTREACHED();
    910         }
    911         waiting_thread_count_--;
    912       }
    913     }
    914   }  // Release lock_.
    915 
    916   // We noticed we should exit. Wake up the next worker so it knows it should
    917   // exit as well (because the Shutdown() code only signals once).
    918   SignalHasWork();
    919 
    920   // Possibly unblock shutdown.
    921   can_shutdown_cv_.Signal();
    922 }
    923 
    924 void SequencedWorkerPool::Inner::HandleCleanup() {
    925   lock_.AssertAcquired();
    926   if (cleanup_state_ == CLEANUP_DONE)
    927     return;
    928   if (cleanup_state_ == CLEANUP_REQUESTED) {
    929     // We win, we get to do the cleanup as soon as the others wise up and idle.
    930     cleanup_state_ = CLEANUP_STARTING;
    931     while (thread_being_created_ ||
    932            cleanup_idlers_ != threads_.size() - 1) {
    933       has_work_cv_.Signal();
    934       cleanup_cv_.Wait();
    935     }
    936     cleanup_state_ = CLEANUP_RUNNING;
    937     return;
    938   }
    939   if (cleanup_state_ == CLEANUP_STARTING) {
    940     // Another worker thread is cleaning up, we idle here until thats done.
    941     ++cleanup_idlers_;
    942     cleanup_cv_.Broadcast();
    943     while (cleanup_state_ != CLEANUP_FINISHING) {
    944       cleanup_cv_.Wait();
    945     }
    946     --cleanup_idlers_;
    947     cleanup_cv_.Broadcast();
    948     return;
    949   }
    950   if (cleanup_state_ == CLEANUP_FINISHING) {
    951     // We wait for all idlers to wake up prior to being DONE.
    952     while (cleanup_idlers_ != 0) {
    953       cleanup_cv_.Broadcast();
    954       cleanup_cv_.Wait();
    955     }
    956     if (cleanup_state_ == CLEANUP_FINISHING) {
    957       cleanup_state_ = CLEANUP_DONE;
    958       cleanup_cv_.Signal();
    959     }
    960     return;
    961   }
    962 }
    963 
    964 int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
    965     const std::string& name) {
    966   lock_.AssertAcquired();
    967   DCHECK(!name.empty());
    968 
    969   std::map<std::string, int>::const_iterator found =
    970       named_sequence_tokens_.find(name);
    971   if (found != named_sequence_tokens_.end())
    972     return found->second;  // Got an existing one.
    973 
    974   // Create a new one for this name.
    975   SequenceToken result = GetSequenceToken();
    976   named_sequence_tokens_.insert(std::make_pair(name, result.id_));
    977   return result.id_;
    978 }
    979 
    980 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
    981   lock_.AssertAcquired();
    982   // We assume that we never create enough tasks to wrap around.
    983   return next_sequence_task_number_++;
    984 }
    985 
    986 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
    987     SequencedTask* task,
    988     TimeDelta* wait_time,
    989     std::vector<Closure>* delete_these_outside_lock) {
    990   lock_.AssertAcquired();
    991 
    992   // Find the next task with a sequence token that's not currently in use.
    993   // If the token is in use, that means another thread is running something
    994   // in that sequence, and we can't run it without going out-of-order.
    995   //
    996   // This algorithm is simple and fair, but inefficient in some cases. For
    997   // example, say somebody schedules 1000 slow tasks with the same sequence
    998   // number. We'll have to go through all those tasks each time we feel like
    999   // there might be work to schedule. If this proves to be a problem, we
   1000   // should make this more efficient.
   1001   //
   1002   // One possible enhancement would be to keep a map from sequence ID to a
   1003   // list of pending but currently blocked SequencedTasks for that ID.
   1004   // When a worker finishes a task of one sequence token, it can pick up the
   1005   // next one from that token right away.
   1006   //
   1007   // This may lead to starvation if there are sufficient numbers of sequences
   1008   // in use. To alleviate this, we could add an incrementing priority counter
   1009   // to each SequencedTask. Then maintain a priority_queue of all runnable
   1010   // tasks, sorted by priority counter. When a sequenced task is completed
   1011   // we would pop the head element off of that tasks pending list and add it
   1012   // to the priority queue. Then we would run the first item in the priority
   1013   // queue.
   1014 
   1015   GetWorkStatus status = GET_WORK_NOT_FOUND;
   1016   int unrunnable_tasks = 0;
   1017   PendingTaskSet::iterator i = pending_tasks_.begin();
   1018   // We assume that the loop below doesn't take too long and so we can just do
   1019   // a single call to TimeTicks::Now().
   1020   const TimeTicks current_time = TimeTicks::Now();
   1021   while (i != pending_tasks_.end()) {
   1022     if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
   1023       unrunnable_tasks++;
   1024       ++i;
   1025       continue;
   1026     }
   1027 
   1028     if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
   1029       // We're shutting down and the task we just found isn't blocking
   1030       // shutdown. Delete it and get more work.
   1031       //
   1032       // Note that we do not want to delete unrunnable tasks. Deleting a task
   1033       // can have side effects (like freeing some objects) and deleting a
   1034       // task that's supposed to run after one that's currently running could
   1035       // cause an obscure crash.
   1036       //
   1037       // We really want to delete these tasks outside the lock in case the
   1038       // closures are holding refs to objects that want to post work from
   1039       // their destructorss (which would deadlock). The closures are
   1040       // internally refcounted, so we just need to keep a copy of them alive
   1041       // until the lock is exited. The calling code can just clear() the
   1042       // vector they passed to us once the lock is exited to make this
   1043       // happen.
   1044       delete_these_outside_lock->push_back(i->task);
   1045       pending_tasks_.erase(i++);
   1046       continue;
   1047     }
   1048 
   1049     if (i->time_to_run > current_time) {
   1050       // The time to run has not come yet.
   1051       *wait_time = i->time_to_run - current_time;
   1052       status = GET_WORK_WAIT;
   1053       if (cleanup_state_ == CLEANUP_RUNNING) {
   1054         // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
   1055         delete_these_outside_lock->push_back(i->task);
   1056         pending_tasks_.erase(i);
   1057       }
   1058       break;
   1059     }
   1060 
   1061     // Found a runnable task.
   1062     *task = *i;
   1063     pending_tasks_.erase(i);
   1064     if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
   1065       blocking_shutdown_pending_task_count_--;
   1066     }
   1067 
   1068     status = GET_WORK_FOUND;
   1069     break;
   1070   }
   1071 
   1072   return status;
   1073 }
   1074 
   1075 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
   1076   lock_.AssertAcquired();
   1077 
   1078   // Mark the task's sequence number as in use.
   1079   if (task.sequence_token_id)
   1080     current_sequences_.insert(task.sequence_token_id);
   1081 
   1082   // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
   1083   // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
   1084   // completes.
   1085   if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN)
   1086     blocking_shutdown_thread_count_++;
   1087 
   1088   // We just picked up a task. Since StartAdditionalThreadIfHelpful only
   1089   // creates a new thread if there is no free one, there is a race when posting
   1090   // tasks that many tasks could have been posted before a thread started
   1091   // running them, so only one thread would have been created. So we also check
   1092   // whether we should create more threads after removing our task from the
   1093   // queue, which also has the nice side effect of creating the workers from
   1094   // background threads rather than the main thread of the app.
   1095   //
   1096   // If another thread wasn't created, we want to wake up an existing thread
   1097   // if there is one waiting to pick up the next task.
   1098   //
   1099   // Note that we really need to do this *before* running the task, not
   1100   // after. Otherwise, if more than one task is posted, the creation of the
   1101   // second thread (since we only create one at a time) will be blocked by
   1102   // the execution of the first task, which could be arbitrarily long.
   1103   return PrepareToStartAdditionalThreadIfHelpful();
   1104 }
   1105 
   1106 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
   1107   lock_.AssertAcquired();
   1108 
   1109   if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
   1110     DCHECK_GT(blocking_shutdown_thread_count_, 0u);
   1111     blocking_shutdown_thread_count_--;
   1112   }
   1113 
   1114   if (task.sequence_token_id)
   1115     current_sequences_.erase(task.sequence_token_id);
   1116 }
   1117 
   1118 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
   1119     int sequence_token_id) const {
   1120   lock_.AssertAcquired();
   1121   return !sequence_token_id ||
   1122       current_sequences_.find(sequence_token_id) ==
   1123           current_sequences_.end();
   1124 }
   1125 
   1126 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
   1127   lock_.AssertAcquired();
   1128   // How thread creation works:
   1129   //
   1130   // We'de like to avoid creating threads with the lock held. However, we
   1131   // need to be sure that we have an accurate accounting of the threads for
   1132   // proper Joining and deltion on shutdown.
   1133   //
   1134   // We need to figure out if we need another thread with the lock held, which
   1135   // is what this function does. It then marks us as in the process of creating
   1136   // a thread. When we do shutdown, we wait until the thread_being_created_
   1137   // flag is cleared, which ensures that the new thread is properly added to
   1138   // all the data structures and we can't leak it. Once shutdown starts, we'll
   1139   // refuse to create more threads or they would be leaked.
   1140   //
   1141   // Note that this creates a mostly benign race condition on shutdown that
   1142   // will cause fewer workers to be created than one would expect. It isn't
   1143   // much of an issue in real life, but affects some tests. Since we only spawn
   1144   // one worker at a time, the following sequence of events can happen:
   1145   //
   1146   //  1. Main thread posts a bunch of unrelated tasks that would normally be
   1147   //     run on separate threads.
   1148   //  2. The first task post causes us to start a worker. Other tasks do not
   1149   //     cause a worker to start since one is pending.
   1150   //  3. Main thread initiates shutdown.
   1151   //  4. No more threads are created since the shutdown_called_ flag is set.
   1152   //
   1153   // The result is that one may expect that max_threads_ workers to be created
   1154   // given the workload, but in reality fewer may be created because the
   1155   // sequence of thread creation on the background threads is racing with the
   1156   // shutdown call.
   1157   if (!shutdown_called_ &&
   1158       !thread_being_created_ &&
   1159       cleanup_state_ == CLEANUP_DONE &&
   1160       threads_.size() < max_threads_ &&
   1161       waiting_thread_count_ == 0) {
   1162     // We could use an additional thread if there's work to be done.
   1163     for (PendingTaskSet::const_iterator i = pending_tasks_.begin();
   1164          i != pending_tasks_.end(); ++i) {
   1165       if (IsSequenceTokenRunnable(i->sequence_token_id)) {
   1166         // Found a runnable task, mark the thread as being started.
   1167         thread_being_created_ = true;
   1168         return static_cast<int>(threads_.size() + 1);
   1169       }
   1170     }
   1171   }
   1172   return 0;
   1173 }
   1174 
   1175 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
   1176     int thread_number) {
   1177   // Called outside of the lock.
   1178   DCHECK_GT(thread_number, 0);
   1179 
   1180   // The worker is assigned to the list when the thread actually starts, which
   1181   // will manage the memory of the pointer.
   1182   new Worker(worker_pool_, thread_number, thread_name_prefix_);
   1183 }
   1184 
   1185 void SequencedWorkerPool::Inner::SignalHasWork() {
   1186   has_work_cv_.Signal();
   1187   if (testing_observer_) {
   1188     testing_observer_->OnHasWork();
   1189   }
   1190 }
   1191 
   1192 bool SequencedWorkerPool::Inner::CanShutdown() const {
   1193   lock_.AssertAcquired();
   1194   // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
   1195   return !thread_being_created_ &&
   1196          blocking_shutdown_thread_count_ == 0 &&
   1197          blocking_shutdown_pending_task_count_ == 0;
   1198 }
   1199 
   1200 base::StaticAtomicSequenceNumber
   1201 SequencedWorkerPool::Inner::g_last_sequence_number_;
   1202 
   1203 // SequencedWorkerPool --------------------------------------------------------
   1204 
   1205 std::string SequencedWorkerPool::SequenceToken::ToString() const {
   1206   return base::StringPrintf("[%d]", id_);
   1207 }
   1208 
   1209 // static
   1210 SequencedWorkerPool::SequenceToken
   1211 SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
   1212   Worker* worker = Worker::GetForCurrentThread();
   1213   if (!worker)
   1214     return SequenceToken();
   1215 
   1216   return worker->task_sequence_token();
   1217 }
   1218 
   1219 // static
   1220 scoped_refptr<SequencedWorkerPool>
   1221 SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
   1222   Worker* worker = Worker::GetForCurrentThread();
   1223   if (!worker)
   1224     return nullptr;
   1225 
   1226   return worker->worker_pool();
   1227 }
   1228 
   1229 // static
   1230 scoped_refptr<SequencedTaskRunner>
   1231 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() {
   1232   Worker* worker = Worker::GetForCurrentThread();
   1233 
   1234   // If there is no worker, this thread is not a worker thread. Otherwise, it is
   1235   // currently running a task (sequenced or unsequenced).
   1236   if (!worker)
   1237     return nullptr;
   1238 
   1239   scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool();
   1240   SequenceToken sequence_token = worker->task_sequence_token();
   1241   WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior();
   1242   if (!sequence_token.IsValid()) {
   1243     // Create a new sequence token and bind this thread to it, to make sure that
   1244     // a task posted to the SequencedTaskRunner we are going to return is not
   1245     // immediately going to run on a different thread.
   1246     sequence_token = Inner::GetSequenceToken();
   1247     pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token,
   1248                                                      shutdown_behavior);
   1249   }
   1250 
   1251   DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token));
   1252   return new SequencedWorkerPoolSequencedTaskRunner(
   1253       std::move(pool), sequence_token, shutdown_behavior);
   1254 }
   1255 
   1256 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
   1257                                          const std::string& thread_name_prefix)
   1258     : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
   1259       inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
   1260 }
   1261 
   1262 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
   1263                                          const std::string& thread_name_prefix,
   1264                                          TestingObserver* observer)
   1265     : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
   1266       inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
   1267 }
   1268 
   1269 SequencedWorkerPool::~SequencedWorkerPool() {}
   1270 
   1271 void SequencedWorkerPool::OnDestruct() const {
   1272   // Avoid deleting ourselves on a worker thread (which would deadlock).
   1273   if (RunsTasksOnCurrentThread()) {
   1274     constructor_task_runner_->DeleteSoon(FROM_HERE, this);
   1275   } else {
   1276     delete this;
   1277   }
   1278 }
   1279 
   1280 // static
   1281 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
   1282   return Inner::GetSequenceToken();
   1283 }
   1284 
   1285 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
   1286     const std::string& name) {
   1287   return inner_->GetNamedSequenceToken(name);
   1288 }
   1289 
   1290 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
   1291     SequenceToken token) {
   1292   return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
   1293 }
   1294 
   1295 scoped_refptr<SequencedTaskRunner>
   1296 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior(
   1297     SequenceToken token, WorkerShutdown shutdown_behavior) {
   1298   return new SequencedWorkerPoolSequencedTaskRunner(
   1299       this, token, shutdown_behavior);
   1300 }
   1301 
   1302 scoped_refptr<TaskRunner>
   1303 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
   1304     WorkerShutdown shutdown_behavior) {
   1305   return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior);
   1306 }
   1307 
   1308 bool SequencedWorkerPool::PostWorkerTask(
   1309     const tracked_objects::Location& from_here,
   1310     const Closure& task) {
   1311   return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
   1312                           from_here, task, TimeDelta());
   1313 }
   1314 
   1315 bool SequencedWorkerPool::PostDelayedWorkerTask(
   1316     const tracked_objects::Location& from_here,
   1317     const Closure& task,
   1318     TimeDelta delay) {
   1319   WorkerShutdown shutdown_behavior =
   1320       delay.is_zero() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
   1321   return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
   1322                           from_here, task, delay);
   1323 }
   1324 
   1325 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
   1326     const tracked_objects::Location& from_here,
   1327     const Closure& task,
   1328     WorkerShutdown shutdown_behavior) {
   1329   return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
   1330                           from_here, task, TimeDelta());
   1331 }
   1332 
   1333 bool SequencedWorkerPool::PostSequencedWorkerTask(
   1334     SequenceToken sequence_token,
   1335     const tracked_objects::Location& from_here,
   1336     const Closure& task) {
   1337   return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN,
   1338                           from_here, task, TimeDelta());
   1339 }
   1340 
   1341 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
   1342     SequenceToken sequence_token,
   1343     const tracked_objects::Location& from_here,
   1344     const Closure& task,
   1345     TimeDelta delay) {
   1346   WorkerShutdown shutdown_behavior =
   1347       delay.is_zero() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
   1348   return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
   1349                           from_here, task, delay);
   1350 }
   1351 
   1352 bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
   1353     const std::string& token_name,
   1354     const tracked_objects::Location& from_here,
   1355     const Closure& task) {
   1356   DCHECK(!token_name.empty());
   1357   return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
   1358                           from_here, task, TimeDelta());
   1359 }
   1360 
   1361 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
   1362     SequenceToken sequence_token,
   1363     const tracked_objects::Location& from_here,
   1364     const Closure& task,
   1365     WorkerShutdown shutdown_behavior) {
   1366   return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
   1367                           from_here, task, TimeDelta());
   1368 }
   1369 
   1370 bool SequencedWorkerPool::PostDelayedTask(
   1371     const tracked_objects::Location& from_here,
   1372     const Closure& task,
   1373     TimeDelta delay) {
   1374   return PostDelayedWorkerTask(from_here, task, delay);
   1375 }
   1376 
   1377 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
   1378   return inner_->RunsTasksOnCurrentThread();
   1379 }
   1380 
   1381 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
   1382     SequenceToken sequence_token) const {
   1383   return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
   1384 }
   1385 
   1386 bool SequencedWorkerPool::IsRunningSequence(
   1387     SequenceToken sequence_token) const {
   1388   return inner_->IsRunningSequence(sequence_token);
   1389 }
   1390 
   1391 void SequencedWorkerPool::FlushForTesting() {
   1392   inner_->CleanupForTesting();
   1393 }
   1394 
   1395 void SequencedWorkerPool::SignalHasWorkForTesting() {
   1396   inner_->SignalHasWorkForTesting();
   1397 }
   1398 
   1399 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
   1400   DCHECK(constructor_task_runner_->BelongsToCurrentThread());
   1401   inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
   1402 }
   1403 
   1404 bool SequencedWorkerPool::IsShutdownInProgress() {
   1405   return inner_->IsShutdownInProgress();
   1406 }
   1407 
   1408 }  // namespace base
   1409