Home | History | Annotate | Download | only in threading
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_
      6 #define BASE_THREADING_SEQUENCED_WORKER_POOL_H_
      7 
      8 #include <cstddef>
      9 #include <string>
     10 
     11 #include "base/base_export.h"
     12 #include "base/basictypes.h"
     13 #include "base/callback_forward.h"
     14 #include "base/memory/ref_counted.h"
     15 #include "base/memory/scoped_ptr.h"
     16 #include "base/task_runner.h"
     17 
     18 namespace tracked_objects {
     19 class Location;
     20 }  // namespace tracked_objects
     21 
     22 namespace base {
     23 
     24 class MessageLoopProxy;
     25 
     26 template <class T> class DeleteHelper;
     27 
     28 class SequencedTaskRunner;
     29 
     30 // A worker thread pool that enforces ordering between sets of tasks. It also
     31 // allows you to specify what should happen to your tasks on shutdown.
     32 //
     33 // To enforce ordering, get a unique sequence token from the pool and post all
     34 // tasks you want to order with the token. All tasks with the same token are
     35 // guaranteed to execute serially, though not necessarily on the same thread.
     36 // This means that:
     37 //
     38 //   - No two tasks with the same token will run at the same time.
     39 //
     40 //   - Given two tasks T1 and T2 with the same token such that T2 will
     41 //     run after T1, then T2 will start after T1 is destroyed.
     42 //
     43 //   - If T2 will run after T1, then all memory changes in T1 and T1's
     44 //     destruction will be visible to T2.
     45 //
     46 // Example:
     47 //   SequencedWorkerPool::SequenceToken token = pool.GetSequenceToken();
     48 //   pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
     49 //                                FROM_HERE, base::Bind(...));
     50 //   pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
     51 //                                FROM_HERE, base::Bind(...));
     52 //
     53 // You can make named sequence tokens to make it easier to share a token
     54 // across different components.
     55 //
     56 // You can also post tasks to the pool without ordering using PostWorkerTask.
     57 // These will be executed in an unspecified order. The order of execution
     58 // between tasks with different sequence tokens is also unspecified.
     59 //
     60 // This class may be leaked on shutdown to facilitate fast shutdown. The
     61 // expected usage, however, is to call Shutdown(), which correctly accounts
     62 // for CONTINUE_ON_SHUTDOWN behavior and is required for BLOCK_SHUTDOWN
     63 // behavior.
     64 //
     65 // Implementation note: This does not use a base::WorkerPool since that does
     66 // not enforce shutdown semantics or allow us to specify how many worker
     67 // threads to run. For the typical use case of random background work, we don't
     68 // necessarily want to be super aggressive about creating threads.
     69 //
     70 // Note that SequencedWorkerPool is RefCountedThreadSafe (inherited
     71 // from TaskRunner).
     72 class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
     73  public:
     74   // Defines what should happen to a task posted to the worker pool on
     75   // shutdown.
     76   enum WorkerShutdown {
     77     // Tasks posted with this mode which have not run at shutdown will be
     78     // deleted rather than run, and any tasks with this mode running at
     79     // shutdown will be ignored (the worker thread will not be joined).
     80     //
     81     // This option provides a nice way to post stuff you don't want blocking
     82     // shutdown. For example, you might be doing a slow DNS lookup and if it's
     83     // blocked on the OS, you may not want to stop shutdown, since the result
     84     // doesn't really matter at that point.
     85     //
     86     // However, you need to be very careful what you do in your callback when
     87     // you use this option. Since the thread will continue to run until the OS
     88     // terminates the process, the app can be in the process of tearing down
     89     // when you're running. This means any singletons or global objects you
     90     // use may suddenly become invalid out from under you. For this reason,
     91     // it's best to use this only for slow but simple operations like the DNS
     92     // example.
     93     CONTINUE_ON_SHUTDOWN,
     94 
     95     // Tasks posted with this mode that have not started executing at
     96     // shutdown will be deleted rather than executed. However, any tasks that
     97     // have already begun executing when shutdown is called will be allowed
     98     // to continue, and will block shutdown until completion.
     99     //
    100     // Note: Because Shutdown() may block while these tasks are executing,
    101     // care must be taken to ensure that they do not block on the thread that
    102     // called Shutdown(), as this may lead to deadlock.
    103     SKIP_ON_SHUTDOWN,
    104 
    105     // Tasks posted with this mode will block shutdown until they're
    106     // executed. Since this can have significant performance implications,
    107     // use sparingly.
    108     //
    109     // Generally, this should be used only for user data, for example, a task
    110     // writing a preference file.
    111     //
    112     // If a task is posted during shutdown, it will not get run since the
    113     // workers may already be stopped. In this case, the post operation will
    114     // fail (return false) and the task will be deleted.
    115     BLOCK_SHUTDOWN,
    116   };
    117 
    118   // Opaque identifier that defines sequencing of tasks posted to the worker
    119   // pool.
    120   class SequenceToken {
    121    public:
    122     SequenceToken() : id_(0) {}
    123     ~SequenceToken() {}
    124 
    125     bool Equals(const SequenceToken& other) const {
    126       return id_ == other.id_;
    127     }
    128 
    129     // Returns false if current thread is executing an unsequenced task.
    130     bool IsValid() const {
    131       return id_ != 0;
    132     }
    133 
    134    private:
    135     friend class SequencedWorkerPool;
    136 
    137     explicit SequenceToken(int id) : id_(id) {}
    138 
    139     int id_;
    140   };
    141 
    142   // Allows tests to perform certain actions.
    143   class TestingObserver {
    144    public:
    145     virtual ~TestingObserver() {}
    146     virtual void OnHasWork() = 0;
    147     virtual void WillWaitForShutdown() = 0;
    148     virtual void OnDestruct() = 0;
    149   };
    150 
    151   // Gets the SequencedToken of the current thread.
    152   // If current thread is not a SequencedWorkerPool worker thread or is running
    153   // an unsequenced task, returns an invalid SequenceToken.
    154   static SequenceToken GetSequenceTokenForCurrentThread();
    155 
    156   // When constructing a SequencedWorkerPool, there must be a
    157   // MessageLoop on the current thread unless you plan to deliberately
    158   // leak it.
    159 
    160   // Pass the maximum number of threads (they will be lazily created as needed)
    161   // and a prefix for the thread name to aid in debugging.
    162   SequencedWorkerPool(size_t max_threads,
    163                       const std::string& thread_name_prefix);
    164 
    165   // Like above, but with |observer| for testing.  Does not take
    166   // ownership of |observer|.
    167   SequencedWorkerPool(size_t max_threads,
    168                       const std::string& thread_name_prefix,
    169                       TestingObserver* observer);
    170 
    171   // Returns a unique token that can be used to sequence tasks posted to
    172   // PostSequencedWorkerTask(). Valid tokens are always nonzero.
    173   SequenceToken GetSequenceToken();
    174 
    175   // Returns the sequence token associated with the given name. Calling this
    176   // function multiple times with the same string will always produce the
    177   // same sequence token. If the name has not been used before, a new token
    178   // will be created.
    179   SequenceToken GetNamedSequenceToken(const std::string& name);
    180 
    181   // Returns a SequencedTaskRunner wrapper which posts to this
    182   // SequencedWorkerPool using the given sequence token. Tasks with nonzero
    183   // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
    184   // are posted with BLOCK_SHUTDOWN behavior.
    185   scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner(
    186       SequenceToken token);
    187 
    188   // Returns a SequencedTaskRunner wrapper which posts to this
    189   // SequencedWorkerPool using the given sequence token. Tasks with nonzero
    190   // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
    191   // are posted with the given shutdown behavior.
    192   scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunnerWithShutdownBehavior(
    193       SequenceToken token,
    194       WorkerShutdown shutdown_behavior);
    195 
    196   // Returns a TaskRunner wrapper which posts to this SequencedWorkerPool using
    197   // the given shutdown behavior. Tasks with nonzero delay are posted with
    198   // SKIP_ON_SHUTDOWN behavior and tasks with zero delay are posted with the
    199   // given shutdown behavior.
    200   scoped_refptr<TaskRunner> GetTaskRunnerWithShutdownBehavior(
    201       WorkerShutdown shutdown_behavior);
    202 
    203   // Posts the given task for execution in the worker pool. Tasks posted with
    204   // this function will execute in an unspecified order on a background thread.
    205   // Returns true if the task was posted. If your tasks have ordering
    206   // requirements, see PostSequencedWorkerTask().
    207   //
    208   // This class will attempt to delete tasks that aren't run
    209   // (non-block-shutdown semantics) but can't guarantee that this happens. If
    210   // all worker threads are busy running CONTINUE_ON_SHUTDOWN tasks, there
    211   // will be no workers available to delete these tasks. And there may be
    212   // tasks with the same sequence token behind those CONTINUE_ON_SHUTDOWN
    213   // tasks. Deleting those tasks before the previous one has completed could
    214   // cause nondeterministic crashes because the task could be keeping some
    215   // objects alive which do work in their destructor, which could voilate the
    216   // assumptions of the running task.
    217   //
    218   // The task will be guaranteed to run to completion before shutdown
    219   // (BLOCK_SHUTDOWN semantics).
    220   //
    221   // Returns true if the task was posted successfully. This may fail during
    222   // shutdown regardless of the specified ShutdownBehavior.
    223   bool PostWorkerTask(const tracked_objects::Location& from_here,
    224                       const Closure& task);
    225 
    226   // Same as PostWorkerTask but allows a delay to be specified (although doing
    227   // so changes the shutdown behavior). The task will be run after the given
    228   // delay has elapsed.
    229   //
    230   // If the delay is nonzero, the task won't be guaranteed to run to completion
    231   // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
    232   // If the delay is zero, this behaves exactly like PostWorkerTask, i.e. the
    233   // task will be guaranteed to run to completion before shutdown
    234   // (BLOCK_SHUTDOWN semantics).
    235   bool PostDelayedWorkerTask(const tracked_objects::Location& from_here,
    236                              const Closure& task,
    237                              TimeDelta delay);
    238 
    239   // Same as PostWorkerTask but allows specification of the shutdown behavior.
    240   bool PostWorkerTaskWithShutdownBehavior(
    241       const tracked_objects::Location& from_here,
    242       const Closure& task,
    243       WorkerShutdown shutdown_behavior);
    244 
    245   // Like PostWorkerTask above, but provides sequencing semantics. This means
    246   // that tasks posted with the same sequence token (see GetSequenceToken())
    247   // are guaranteed to execute in order. This is useful in cases where you're
    248   // doing operations that may depend on previous ones, like appending to a
    249   // file.
    250   //
    251   // The task will be guaranteed to run to completion before shutdown
    252   // (BLOCK_SHUTDOWN semantics).
    253   //
    254   // Returns true if the task was posted successfully. This may fail during
    255   // shutdown regardless of the specified ShutdownBehavior.
    256   bool PostSequencedWorkerTask(SequenceToken sequence_token,
    257                                const tracked_objects::Location& from_here,
    258                                const Closure& task);
    259 
    260   // Like PostSequencedWorkerTask above, but allows you to specify a named
    261   // token, which saves an extra call to GetNamedSequenceToken.
    262   bool PostNamedSequencedWorkerTask(const std::string& token_name,
    263                                     const tracked_objects::Location& from_here,
    264                                     const Closure& task);
    265 
    266   // Same as PostSequencedWorkerTask but allows a delay to be specified
    267   // (although doing so changes the shutdown behavior). The task will be run
    268   // after the given delay has elapsed.
    269   //
    270   // If the delay is nonzero, the task won't be guaranteed to run to completion
    271   // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
    272   // If the delay is zero, this behaves exactly like PostSequencedWorkerTask,
    273   // i.e. the task will be guaranteed to run to completion before shutdown
    274   // (BLOCK_SHUTDOWN semantics).
    275   bool PostDelayedSequencedWorkerTask(
    276       SequenceToken sequence_token,
    277       const tracked_objects::Location& from_here,
    278       const Closure& task,
    279       TimeDelta delay);
    280 
    281   // Same as PostSequencedWorkerTask but allows specification of the shutdown
    282   // behavior.
    283   bool PostSequencedWorkerTaskWithShutdownBehavior(
    284       SequenceToken sequence_token,
    285       const tracked_objects::Location& from_here,
    286       const Closure& task,
    287       WorkerShutdown shutdown_behavior);
    288 
    289   // TaskRunner implementation. Forwards to PostDelayedWorkerTask().
    290   virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
    291                                const Closure& task,
    292                                TimeDelta delay) OVERRIDE;
    293   virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
    294 
    295   // Returns true if the current thread is processing a task with the given
    296   // sequence_token.
    297   bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
    298 
    299   // Blocks until all pending tasks are complete. This should only be called in
    300   // unit tests when you want to validate something that should have happened.
    301   // This will not flush delayed tasks; delayed tasks get deleted.
    302   //
    303   // Note that calling this will not prevent other threads from posting work to
    304   // the queue while the calling thread is waiting on Flush(). In this case,
    305   // Flush will return only when there's no more work in the queue. Normally,
    306   // this doesn't come up since in a test, all the work is being posted from
    307   // the main thread.
    308   void FlushForTesting();
    309 
    310   // Spuriously signal that there is work to be done.
    311   void SignalHasWorkForTesting();
    312 
    313   // Implements the worker pool shutdown. This should be called during app
    314   // shutdown, and will discard/join with appropriate tasks before returning.
    315   // After this call, subsequent calls to post tasks will fail.
    316   //
    317   // Must be called from the same thread this object was constructed on.
    318   void Shutdown() { Shutdown(0); }
    319 
    320   // A variant that allows an arbitrary number of new blocking tasks to
    321   // be posted during shutdown from within tasks that execute during shutdown.
    322   // Only tasks designated as BLOCKING_SHUTDOWN will be allowed, and only if
    323   // posted by tasks that are not designated as CONTINUE_ON_SHUTDOWN. Once
    324   // the limit is reached, subsequent calls to post task fail in all cases.
    325   //
    326   // Must be called from the same thread this object was constructed on.
    327   void Shutdown(int max_new_blocking_tasks_after_shutdown);
    328 
    329   // Check if Shutdown was called for given threading pool. This method is used
    330   // for aborting time consuming operation to avoid blocking shutdown.
    331   //
    332   // Can be called from any thread.
    333   bool IsShutdownInProgress();
    334 
    335  protected:
    336   virtual ~SequencedWorkerPool();
    337 
    338   virtual void OnDestruct() const OVERRIDE;
    339 
    340  private:
    341   friend class RefCountedThreadSafe<SequencedWorkerPool>;
    342   friend class DeleteHelper<SequencedWorkerPool>;
    343 
    344   class Inner;
    345   class Worker;
    346 
    347   const scoped_refptr<MessageLoopProxy> constructor_message_loop_;
    348 
    349   // Avoid pulling in too many headers by putting (almost) everything
    350   // into |inner_|.
    351   const scoped_ptr<Inner> inner_;
    352 
    353   DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool);
    354 };
    355 
    356 }  // namespace base
    357 
    358 #endif  // BASE_THREADING_SEQUENCED_WORKER_POOL_H_
    359