Home | History | Annotate | Download | only in threading
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_
      6 #define BASE_THREADING_SEQUENCED_WORKER_POOL_H_
      7 
      8 #include <stddef.h>
      9 
     10 #include <cstddef>
     11 #include <memory>
     12 #include <string>
     13 
     14 #include "base/base_export.h"
     15 #include "base/callback_forward.h"
     16 #include "base/macros.h"
     17 #include "base/memory/ref_counted.h"
     18 #include "base/single_thread_task_runner.h"
     19 #include "base/task_runner.h"
     20 
     21 namespace tracked_objects {
     22 class Location;
     23 }  // namespace tracked_objects
     24 
     25 namespace base {
     26 
     27 class SingleThreadTaskRunner;
     28 
     29 template <class T> class DeleteHelper;
     30 
     31 class SequencedTaskRunner;
     32 
     33 // A worker thread pool that enforces ordering between sets of tasks. It also
     34 // allows you to specify what should happen to your tasks on shutdown.
     35 //
     36 // To enforce ordering, get a unique sequence token from the pool and post all
     37 // tasks you want to order with the token. All tasks with the same token are
     38 // guaranteed to execute serially, though not necessarily on the same thread.
     39 // This means that:
     40 //
     41 //   - No two tasks with the same token will run at the same time.
     42 //
     43 //   - Given two tasks T1 and T2 with the same token such that T2 will
     44 //     run after T1, then T2 will start after T1 is destroyed.
     45 //
     46 //   - If T2 will run after T1, then all memory changes in T1 and T1's
     47 //     destruction will be visible to T2.
     48 //
     49 // Example:
     50 //   SequencedWorkerPool::SequenceToken token =
     51 //       SequencedWorkerPool::GetSequenceToken();
     52 //   pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
     53 //                                FROM_HERE, base::Bind(...));
     54 //   pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
     55 //                                FROM_HERE, base::Bind(...));
     56 //
     57 // You can make named sequence tokens to make it easier to share a token
     58 // across different components.
     59 //
     60 // You can also post tasks to the pool without ordering using PostWorkerTask.
     61 // These will be executed in an unspecified order. The order of execution
     62 // between tasks with different sequence tokens is also unspecified.
     63 //
     64 // This class may be leaked on shutdown to facilitate fast shutdown. The
     65 // expected usage, however, is to call Shutdown(), which correctly accounts
     66 // for CONTINUE_ON_SHUTDOWN behavior and is required for BLOCK_SHUTDOWN
     67 // behavior.
     68 //
     69 // Implementation note: This does not use a base::WorkerPool since that does
     70 // not enforce shutdown semantics or allow us to specify how many worker
     71 // threads to run. For the typical use case of random background work, we don't
     72 // necessarily want to be super aggressive about creating threads.
     73 //
     74 // Note that SequencedWorkerPool is RefCountedThreadSafe (inherited
     75 // from TaskRunner).
     76 //
     77 // Test-only code should wrap this in a base::SequencedWorkerPoolOwner to avoid
     78 // memory leaks. See http://crbug.com/273800
     79 class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
     80  public:
     81   // Defines what should happen to a task posted to the worker pool on
     82   // shutdown.
     83   enum WorkerShutdown {
     84     // Tasks posted with this mode which have not run at shutdown will be
     85     // deleted rather than run, and any tasks with this mode running at
     86     // shutdown will be ignored (the worker thread will not be joined).
     87     //
     88     // This option provides a nice way to post stuff you don't want blocking
     89     // shutdown. For example, you might be doing a slow DNS lookup and if it's
     90     // blocked on the OS, you may not want to stop shutdown, since the result
     91     // doesn't really matter at that point.
     92     //
     93     // However, you need to be very careful what you do in your callback when
     94     // you use this option. Since the thread will continue to run until the OS
     95     // terminates the process, the app can be in the process of tearing down
     96     // when you're running. This means any singletons or global objects you
     97     // use may suddenly become invalid out from under you. For this reason,
     98     // it's best to use this only for slow but simple operations like the DNS
     99     // example.
    100     CONTINUE_ON_SHUTDOWN,
    101 
    102     // Tasks posted with this mode that have not started executing at
    103     // shutdown will be deleted rather than executed. However, any tasks that
    104     // have already begun executing when shutdown is called will be allowed
    105     // to continue, and will block shutdown until completion.
    106     //
    107     // Note: Because Shutdown() may block while these tasks are executing,
    108     // care must be taken to ensure that they do not block on the thread that
    109     // called Shutdown(), as this may lead to deadlock.
    110     SKIP_ON_SHUTDOWN,
    111 
    112     // Tasks posted with this mode will block shutdown until they're
    113     // executed. Since this can have significant performance implications,
    114     // use sparingly.
    115     //
    116     // Generally, this should be used only for user data, for example, a task
    117     // writing a preference file.
    118     //
    119     // If a task is posted during shutdown, it will not get run since the
    120     // workers may already be stopped. In this case, the post operation will
    121     // fail (return false) and the task will be deleted.
    122     BLOCK_SHUTDOWN,
    123   };
    124 
    125   // Opaque identifier that defines sequencing of tasks posted to the worker
    126   // pool.
    127   class BASE_EXPORT SequenceToken {
    128    public:
    129     SequenceToken() : id_(0) {}
    130     ~SequenceToken() {}
    131 
    132     bool Equals(const SequenceToken& other) const {
    133       return id_ == other.id_;
    134     }
    135 
    136     // Returns false if current thread is executing an unsequenced task.
    137     bool IsValid() const {
    138       return id_ != 0;
    139     }
    140 
    141     // Returns a string representation of this token. This method should only be
    142     // used for debugging.
    143     std::string ToString() const;
    144 
    145    private:
    146     friend class SequencedWorkerPool;
    147 
    148     explicit SequenceToken(int id) : id_(id) {}
    149 
    150     int id_;
    151   };
    152 
    153   // Allows tests to perform certain actions.
    154   class TestingObserver {
    155    public:
    156     virtual ~TestingObserver() {}
    157     virtual void OnHasWork() = 0;
    158     virtual void WillWaitForShutdown() = 0;
    159     virtual void OnDestruct() = 0;
    160   };
    161 
    162   // Gets the SequencedToken of the current thread.
    163   // If current thread is not a SequencedWorkerPool worker thread or is running
    164   // an unsequenced task, returns an invalid SequenceToken.
    165   static SequenceToken GetSequenceTokenForCurrentThread();
    166 
    167   // Gets a SequencedTaskRunner for the current thread. If the current thread is
    168   // running an unsequenced task, a new SequenceToken will be generated and set,
    169   // so that the returned SequencedTaskRunner is guaranteed to run tasks after
    170   // the current task has finished running.
    171   static scoped_refptr<SequencedTaskRunner>
    172   GetSequencedTaskRunnerForCurrentThread();
    173 
    174   // Returns a unique token that can be used to sequence tasks posted to
    175   // PostSequencedWorkerTask(). Valid tokens are always nonzero.
    176   // TODO(bauerb): Rename this to better differentiate from
    177   // GetSequenceTokenForCurrentThread().
    178   static SequenceToken GetSequenceToken();
    179 
    180   // Returns the SequencedWorkerPool that owns this thread, or null if the
    181   // current thread is not a SequencedWorkerPool worker thread.
    182   static scoped_refptr<SequencedWorkerPool> GetWorkerPoolForCurrentThread();
    183 
    184   // When constructing a SequencedWorkerPool, there must be a
    185   // ThreadTaskRunnerHandle on the current thread unless you plan to
    186   // deliberately leak it.
    187 
    188   // Pass the maximum number of threads (they will be lazily created as needed)
    189   // and a prefix for the thread name to aid in debugging.
    190   SequencedWorkerPool(size_t max_threads,
    191                       const std::string& thread_name_prefix);
    192 
    193   // Like above, but with |observer| for testing.  Does not take ownership of
    194   // |observer|.
    195   SequencedWorkerPool(size_t max_threads,
    196                       const std::string& thread_name_prefix,
    197                       TestingObserver* observer);
    198 
    199   // Returns the sequence token associated with the given name. Calling this
    200   // function multiple times with the same string will always produce the
    201   // same sequence token. If the name has not been used before, a new token
    202   // will be created.
    203   SequenceToken GetNamedSequenceToken(const std::string& name);
    204 
    205   // Returns a SequencedTaskRunner wrapper which posts to this
    206   // SequencedWorkerPool using the given sequence token. Tasks with nonzero
    207   // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
    208   // are posted with BLOCK_SHUTDOWN behavior.
    209   scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner(
    210       SequenceToken token);
    211 
    212   // Returns a SequencedTaskRunner wrapper which posts to this
    213   // SequencedWorkerPool using the given sequence token. Tasks with nonzero
    214   // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
    215   // are posted with the given shutdown behavior.
    216   scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunnerWithShutdownBehavior(
    217       SequenceToken token,
    218       WorkerShutdown shutdown_behavior);
    219 
    220   // Returns a TaskRunner wrapper which posts to this SequencedWorkerPool using
    221   // the given shutdown behavior. Tasks with nonzero delay are posted with
    222   // SKIP_ON_SHUTDOWN behavior and tasks with zero delay are posted with the
    223   // given shutdown behavior.
    224   scoped_refptr<TaskRunner> GetTaskRunnerWithShutdownBehavior(
    225       WorkerShutdown shutdown_behavior);
    226 
    227   // Posts the given task for execution in the worker pool. Tasks posted with
    228   // this function will execute in an unspecified order on a background thread.
    229   // Returns true if the task was posted. If your tasks have ordering
    230   // requirements, see PostSequencedWorkerTask().
    231   //
    232   // This class will attempt to delete tasks that aren't run
    233   // (non-block-shutdown semantics) but can't guarantee that this happens. If
    234   // all worker threads are busy running CONTINUE_ON_SHUTDOWN tasks, there
    235   // will be no workers available to delete these tasks. And there may be
    236   // tasks with the same sequence token behind those CONTINUE_ON_SHUTDOWN
    237   // tasks. Deleting those tasks before the previous one has completed could
    238   // cause nondeterministic crashes because the task could be keeping some
    239   // objects alive which do work in their destructor, which could voilate the
    240   // assumptions of the running task.
    241   //
    242   // The task will be guaranteed to run to completion before shutdown
    243   // (BLOCK_SHUTDOWN semantics).
    244   //
    245   // Returns true if the task was posted successfully. This may fail during
    246   // shutdown regardless of the specified ShutdownBehavior.
    247   bool PostWorkerTask(const tracked_objects::Location& from_here,
    248                       const Closure& task);
    249 
    250   // Same as PostWorkerTask but allows a delay to be specified (although doing
    251   // so changes the shutdown behavior). The task will be run after the given
    252   // delay has elapsed.
    253   //
    254   // If the delay is nonzero, the task won't be guaranteed to run to completion
    255   // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
    256   // If the delay is zero, this behaves exactly like PostWorkerTask, i.e. the
    257   // task will be guaranteed to run to completion before shutdown
    258   // (BLOCK_SHUTDOWN semantics).
    259   bool PostDelayedWorkerTask(const tracked_objects::Location& from_here,
    260                              const Closure& task,
    261                              TimeDelta delay);
    262 
    263   // Same as PostWorkerTask but allows specification of the shutdown behavior.
    264   bool PostWorkerTaskWithShutdownBehavior(
    265       const tracked_objects::Location& from_here,
    266       const Closure& task,
    267       WorkerShutdown shutdown_behavior);
    268 
    269   // Like PostWorkerTask above, but provides sequencing semantics. This means
    270   // that tasks posted with the same sequence token (see GetSequenceToken())
    271   // are guaranteed to execute in order. This is useful in cases where you're
    272   // doing operations that may depend on previous ones, like appending to a
    273   // file.
    274   //
    275   // The task will be guaranteed to run to completion before shutdown
    276   // (BLOCK_SHUTDOWN semantics).
    277   //
    278   // Returns true if the task was posted successfully. This may fail during
    279   // shutdown regardless of the specified ShutdownBehavior.
    280   bool PostSequencedWorkerTask(SequenceToken sequence_token,
    281                                const tracked_objects::Location& from_here,
    282                                const Closure& task);
    283 
    284   // Like PostSequencedWorkerTask above, but allows you to specify a named
    285   // token, which saves an extra call to GetNamedSequenceToken.
    286   bool PostNamedSequencedWorkerTask(const std::string& token_name,
    287                                     const tracked_objects::Location& from_here,
    288                                     const Closure& task);
    289 
    290   // Same as PostSequencedWorkerTask but allows a delay to be specified
    291   // (although doing so changes the shutdown behavior). The task will be run
    292   // after the given delay has elapsed.
    293   //
    294   // If the delay is nonzero, the task won't be guaranteed to run to completion
    295   // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
    296   // If the delay is zero, this behaves exactly like PostSequencedWorkerTask,
    297   // i.e. the task will be guaranteed to run to completion before shutdown
    298   // (BLOCK_SHUTDOWN semantics).
    299   bool PostDelayedSequencedWorkerTask(
    300       SequenceToken sequence_token,
    301       const tracked_objects::Location& from_here,
    302       const Closure& task,
    303       TimeDelta delay);
    304 
    305   // Same as PostSequencedWorkerTask but allows specification of the shutdown
    306   // behavior.
    307   bool PostSequencedWorkerTaskWithShutdownBehavior(
    308       SequenceToken sequence_token,
    309       const tracked_objects::Location& from_here,
    310       const Closure& task,
    311       WorkerShutdown shutdown_behavior);
    312 
    313   // TaskRunner implementation. Forwards to PostDelayedWorkerTask().
    314   bool PostDelayedTask(const tracked_objects::Location& from_here,
    315                        const Closure& task,
    316                        TimeDelta delay) override;
    317   bool RunsTasksOnCurrentThread() const override;
    318 
    319   // Returns true if the current thread is processing a task with the given
    320   // sequence_token.
    321   bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
    322 
    323   // Returns true if any thread is currently processing a task with the given
    324   // sequence token. Should only be called with a valid sequence token.
    325   bool IsRunningSequence(SequenceToken sequence_token) const;
    326 
    327   // Blocks until all pending tasks are complete. This should only be called in
    328   // unit tests when you want to validate something that should have happened.
    329   // This will not flush delayed tasks; delayed tasks get deleted.
    330   //
    331   // Note that calling this will not prevent other threads from posting work to
    332   // the queue while the calling thread is waiting on Flush(). In this case,
    333   // Flush will return only when there's no more work in the queue. Normally,
    334   // this doesn't come up since in a test, all the work is being posted from
    335   // the main thread.
    336   void FlushForTesting();
    337 
    338   // Spuriously signal that there is work to be done.
    339   void SignalHasWorkForTesting();
    340 
    341   // Implements the worker pool shutdown. This should be called during app
    342   // shutdown, and will discard/join with appropriate tasks before returning.
    343   // After this call, subsequent calls to post tasks will fail.
    344   //
    345   // Must be called from the same thread this object was constructed on.
    346   void Shutdown() { Shutdown(0); }
    347 
    348   // A variant that allows an arbitrary number of new blocking tasks to be
    349   // posted during shutdown. The tasks cannot be posted within the execution
    350   // context of tasks whose shutdown behavior is not BLOCKING_SHUTDOWN. Once
    351   // the limit is reached, subsequent calls to post task fail in all cases.
    352   // Must be called from the same thread this object was constructed on.
    353   void Shutdown(int max_new_blocking_tasks_after_shutdown);
    354 
    355   // Check if Shutdown was called for given threading pool. This method is used
    356   // for aborting time consuming operation to avoid blocking shutdown.
    357   //
    358   // Can be called from any thread.
    359   bool IsShutdownInProgress();
    360 
    361  protected:
    362   ~SequencedWorkerPool() override;
    363 
    364   void OnDestruct() const override;
    365 
    366  private:
    367   friend class RefCountedThreadSafe<SequencedWorkerPool>;
    368   friend class DeleteHelper<SequencedWorkerPool>;
    369 
    370   class Inner;
    371   class Worker;
    372 
    373   const scoped_refptr<SingleThreadTaskRunner> constructor_task_runner_;
    374 
    375   // Avoid pulling in too many headers by putting (almost) everything
    376   // into |inner_|.
    377   const std::unique_ptr<Inner> inner_;
    378 
    379   DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool);
    380 };
    381 
    382 }  // namespace base
    383 
    384 #endif  // BASE_THREADING_SEQUENCED_WORKER_POOL_H_
    385