1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_ 6 #define BASE_THREADING_SEQUENCED_WORKER_POOL_H_ 7 8 #include <cstddef> 9 #include <string> 10 11 #include "base/base_export.h" 12 #include "base/basictypes.h" 13 #include "base/callback_forward.h" 14 #include "base/memory/ref_counted.h" 15 #include "base/memory/scoped_ptr.h" 16 #include "base/task_runner.h" 17 18 namespace tracked_objects { 19 class Location; 20 } // namespace tracked_objects 21 22 namespace base { 23 24 class MessageLoopProxy; 25 26 template <class T> class DeleteHelper; 27 28 class SequencedTaskRunner; 29 30 // A worker thread pool that enforces ordering between sets of tasks. It also 31 // allows you to specify what should happen to your tasks on shutdown. 32 // 33 // To enforce ordering, get a unique sequence token from the pool and post all 34 // tasks you want to order with the token. All tasks with the same token are 35 // guaranteed to execute serially, though not necessarily on the same thread. 36 // This means that: 37 // 38 // - No two tasks with the same token will run at the same time. 39 // 40 // - Given two tasks T1 and T2 with the same token such that T2 will 41 // run after T1, then T2 will start after T1 is destroyed. 42 // 43 // - If T2 will run after T1, then all memory changes in T1 and T1's 44 // destruction will be visible to T2. 45 // 46 // Example: 47 // SequencedWorkerPool::SequenceToken token = pool.GetSequenceToken(); 48 // pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN, 49 // FROM_HERE, base::Bind(...)); 50 // pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN, 51 // FROM_HERE, base::Bind(...)); 52 // 53 // You can make named sequence tokens to make it easier to share a token 54 // across different components. 55 // 56 // You can also post tasks to the pool without ordering using PostWorkerTask. 57 // These will be executed in an unspecified order. The order of execution 58 // between tasks with different sequence tokens is also unspecified. 59 // 60 // This class may be leaked on shutdown to facilitate fast shutdown. The 61 // expected usage, however, is to call Shutdown(), which correctly accounts 62 // for CONTINUE_ON_SHUTDOWN behavior and is required for BLOCK_SHUTDOWN 63 // behavior. 64 // 65 // Implementation note: This does not use a base::WorkerPool since that does 66 // not enforce shutdown semantics or allow us to specify how many worker 67 // threads to run. For the typical use case of random background work, we don't 68 // necessarily want to be super aggressive about creating threads. 69 // 70 // Note that SequencedWorkerPool is RefCountedThreadSafe (inherited 71 // from TaskRunner). 72 class BASE_EXPORT SequencedWorkerPool : public TaskRunner { 73 public: 74 // Defines what should happen to a task posted to the worker pool on 75 // shutdown. 76 enum WorkerShutdown { 77 // Tasks posted with this mode which have not run at shutdown will be 78 // deleted rather than run, and any tasks with this mode running at 79 // shutdown will be ignored (the worker thread will not be joined). 80 // 81 // This option provides a nice way to post stuff you don't want blocking 82 // shutdown. For example, you might be doing a slow DNS lookup and if it's 83 // blocked on the OS, you may not want to stop shutdown, since the result 84 // doesn't really matter at that point. 85 // 86 // However, you need to be very careful what you do in your callback when 87 // you use this option. Since the thread will continue to run until the OS 88 // terminates the process, the app can be in the process of tearing down 89 // when you're running. This means any singletons or global objects you 90 // use may suddenly become invalid out from under you. For this reason, 91 // it's best to use this only for slow but simple operations like the DNS 92 // example. 93 CONTINUE_ON_SHUTDOWN, 94 95 // Tasks posted with this mode that have not started executing at 96 // shutdown will be deleted rather than executed. However, any tasks that 97 // have already begun executing when shutdown is called will be allowed 98 // to continue, and will block shutdown until completion. 99 // 100 // Note: Because Shutdown() may block while these tasks are executing, 101 // care must be taken to ensure that they do not block on the thread that 102 // called Shutdown(), as this may lead to deadlock. 103 SKIP_ON_SHUTDOWN, 104 105 // Tasks posted with this mode will block shutdown until they're 106 // executed. Since this can have significant performance implications, 107 // use sparingly. 108 // 109 // Generally, this should be used only for user data, for example, a task 110 // writing a preference file. 111 // 112 // If a task is posted during shutdown, it will not get run since the 113 // workers may already be stopped. In this case, the post operation will 114 // fail (return false) and the task will be deleted. 115 BLOCK_SHUTDOWN, 116 }; 117 118 // Opaque identifier that defines sequencing of tasks posted to the worker 119 // pool. 120 class SequenceToken { 121 public: 122 SequenceToken() : id_(0) {} 123 ~SequenceToken() {} 124 125 bool Equals(const SequenceToken& other) const { 126 return id_ == other.id_; 127 } 128 129 // Returns false if current thread is executing an unsequenced task. 130 bool IsValid() const { 131 return id_ != 0; 132 } 133 134 private: 135 friend class SequencedWorkerPool; 136 137 explicit SequenceToken(int id) : id_(id) {} 138 139 int id_; 140 }; 141 142 // Allows tests to perform certain actions. 143 class TestingObserver { 144 public: 145 virtual ~TestingObserver() {} 146 virtual void OnHasWork() = 0; 147 virtual void WillWaitForShutdown() = 0; 148 virtual void OnDestruct() = 0; 149 }; 150 151 // Gets the SequencedToken of the current thread. 152 // If current thread is not a SequencedWorkerPool worker thread or is running 153 // an unsequenced task, returns an invalid SequenceToken. 154 static SequenceToken GetSequenceTokenForCurrentThread(); 155 156 // When constructing a SequencedWorkerPool, there must be a 157 // MessageLoop on the current thread unless you plan to deliberately 158 // leak it. 159 160 // Pass the maximum number of threads (they will be lazily created as needed) 161 // and a prefix for the thread name to aid in debugging. 162 SequencedWorkerPool(size_t max_threads, 163 const std::string& thread_name_prefix); 164 165 // Like above, but with |observer| for testing. Does not take 166 // ownership of |observer|. 167 SequencedWorkerPool(size_t max_threads, 168 const std::string& thread_name_prefix, 169 TestingObserver* observer); 170 171 // Returns a unique token that can be used to sequence tasks posted to 172 // PostSequencedWorkerTask(). Valid tokens are always nonzero. 173 SequenceToken GetSequenceToken(); 174 175 // Returns the sequence token associated with the given name. Calling this 176 // function multiple times with the same string will always produce the 177 // same sequence token. If the name has not been used before, a new token 178 // will be created. 179 SequenceToken GetNamedSequenceToken(const std::string& name); 180 181 // Returns a SequencedTaskRunner wrapper which posts to this 182 // SequencedWorkerPool using the given sequence token. Tasks with nonzero 183 // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay 184 // are posted with BLOCK_SHUTDOWN behavior. 185 scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner( 186 SequenceToken token); 187 188 // Returns a SequencedTaskRunner wrapper which posts to this 189 // SequencedWorkerPool using the given sequence token. Tasks with nonzero 190 // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay 191 // are posted with the given shutdown behavior. 192 scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunnerWithShutdownBehavior( 193 SequenceToken token, 194 WorkerShutdown shutdown_behavior); 195 196 // Returns a TaskRunner wrapper which posts to this SequencedWorkerPool using 197 // the given shutdown behavior. Tasks with nonzero delay are posted with 198 // SKIP_ON_SHUTDOWN behavior and tasks with zero delay are posted with the 199 // given shutdown behavior. 200 scoped_refptr<TaskRunner> GetTaskRunnerWithShutdownBehavior( 201 WorkerShutdown shutdown_behavior); 202 203 // Posts the given task for execution in the worker pool. Tasks posted with 204 // this function will execute in an unspecified order on a background thread. 205 // Returns true if the task was posted. If your tasks have ordering 206 // requirements, see PostSequencedWorkerTask(). 207 // 208 // This class will attempt to delete tasks that aren't run 209 // (non-block-shutdown semantics) but can't guarantee that this happens. If 210 // all worker threads are busy running CONTINUE_ON_SHUTDOWN tasks, there 211 // will be no workers available to delete these tasks. And there may be 212 // tasks with the same sequence token behind those CONTINUE_ON_SHUTDOWN 213 // tasks. Deleting those tasks before the previous one has completed could 214 // cause nondeterministic crashes because the task could be keeping some 215 // objects alive which do work in their destructor, which could voilate the 216 // assumptions of the running task. 217 // 218 // The task will be guaranteed to run to completion before shutdown 219 // (BLOCK_SHUTDOWN semantics). 220 // 221 // Returns true if the task was posted successfully. This may fail during 222 // shutdown regardless of the specified ShutdownBehavior. 223 bool PostWorkerTask(const tracked_objects::Location& from_here, 224 const Closure& task); 225 226 // Same as PostWorkerTask but allows a delay to be specified (although doing 227 // so changes the shutdown behavior). The task will be run after the given 228 // delay has elapsed. 229 // 230 // If the delay is nonzero, the task won't be guaranteed to run to completion 231 // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs. 232 // If the delay is zero, this behaves exactly like PostWorkerTask, i.e. the 233 // task will be guaranteed to run to completion before shutdown 234 // (BLOCK_SHUTDOWN semantics). 235 bool PostDelayedWorkerTask(const tracked_objects::Location& from_here, 236 const Closure& task, 237 TimeDelta delay); 238 239 // Same as PostWorkerTask but allows specification of the shutdown behavior. 240 bool PostWorkerTaskWithShutdownBehavior( 241 const tracked_objects::Location& from_here, 242 const Closure& task, 243 WorkerShutdown shutdown_behavior); 244 245 // Like PostWorkerTask above, but provides sequencing semantics. This means 246 // that tasks posted with the same sequence token (see GetSequenceToken()) 247 // are guaranteed to execute in order. This is useful in cases where you're 248 // doing operations that may depend on previous ones, like appending to a 249 // file. 250 // 251 // The task will be guaranteed to run to completion before shutdown 252 // (BLOCK_SHUTDOWN semantics). 253 // 254 // Returns true if the task was posted successfully. This may fail during 255 // shutdown regardless of the specified ShutdownBehavior. 256 bool PostSequencedWorkerTask(SequenceToken sequence_token, 257 const tracked_objects::Location& from_here, 258 const Closure& task); 259 260 // Like PostSequencedWorkerTask above, but allows you to specify a named 261 // token, which saves an extra call to GetNamedSequenceToken. 262 bool PostNamedSequencedWorkerTask(const std::string& token_name, 263 const tracked_objects::Location& from_here, 264 const Closure& task); 265 266 // Same as PostSequencedWorkerTask but allows a delay to be specified 267 // (although doing so changes the shutdown behavior). The task will be run 268 // after the given delay has elapsed. 269 // 270 // If the delay is nonzero, the task won't be guaranteed to run to completion 271 // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs. 272 // If the delay is zero, this behaves exactly like PostSequencedWorkerTask, 273 // i.e. the task will be guaranteed to run to completion before shutdown 274 // (BLOCK_SHUTDOWN semantics). 275 bool PostDelayedSequencedWorkerTask( 276 SequenceToken sequence_token, 277 const tracked_objects::Location& from_here, 278 const Closure& task, 279 TimeDelta delay); 280 281 // Same as PostSequencedWorkerTask but allows specification of the shutdown 282 // behavior. 283 bool PostSequencedWorkerTaskWithShutdownBehavior( 284 SequenceToken sequence_token, 285 const tracked_objects::Location& from_here, 286 const Closure& task, 287 WorkerShutdown shutdown_behavior); 288 289 // TaskRunner implementation. Forwards to PostDelayedWorkerTask(). 290 virtual bool PostDelayedTask(const tracked_objects::Location& from_here, 291 const Closure& task, 292 TimeDelta delay) OVERRIDE; 293 virtual bool RunsTasksOnCurrentThread() const OVERRIDE; 294 295 // Returns true if the current thread is processing a task with the given 296 // sequence_token. 297 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 298 299 // Blocks until all pending tasks are complete. This should only be called in 300 // unit tests when you want to validate something that should have happened. 301 // This will not flush delayed tasks; delayed tasks get deleted. 302 // 303 // Note that calling this will not prevent other threads from posting work to 304 // the queue while the calling thread is waiting on Flush(). In this case, 305 // Flush will return only when there's no more work in the queue. Normally, 306 // this doesn't come up since in a test, all the work is being posted from 307 // the main thread. 308 void FlushForTesting(); 309 310 // Spuriously signal that there is work to be done. 311 void SignalHasWorkForTesting(); 312 313 // Implements the worker pool shutdown. This should be called during app 314 // shutdown, and will discard/join with appropriate tasks before returning. 315 // After this call, subsequent calls to post tasks will fail. 316 // 317 // Must be called from the same thread this object was constructed on. 318 void Shutdown() { Shutdown(0); } 319 320 // A variant that allows an arbitrary number of new blocking tasks to 321 // be posted during shutdown from within tasks that execute during shutdown. 322 // Only tasks designated as BLOCKING_SHUTDOWN will be allowed, and only if 323 // posted by tasks that are not designated as CONTINUE_ON_SHUTDOWN. Once 324 // the limit is reached, subsequent calls to post task fail in all cases. 325 // 326 // Must be called from the same thread this object was constructed on. 327 void Shutdown(int max_new_blocking_tasks_after_shutdown); 328 329 // Check if Shutdown was called for given threading pool. This method is used 330 // for aborting time consuming operation to avoid blocking shutdown. 331 // 332 // Can be called from any thread. 333 bool IsShutdownInProgress(); 334 335 protected: 336 virtual ~SequencedWorkerPool(); 337 338 virtual void OnDestruct() const OVERRIDE; 339 340 private: 341 friend class RefCountedThreadSafe<SequencedWorkerPool>; 342 friend class DeleteHelper<SequencedWorkerPool>; 343 344 class Inner; 345 class Worker; 346 347 const scoped_refptr<MessageLoopProxy> constructor_message_loop_; 348 349 // Avoid pulling in too many headers by putting (almost) everything 350 // into |inner_|. 351 const scoped_ptr<Inner> inner_; 352 353 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool); 354 }; 355 356 } // namespace base 357 358 #endif // BASE_THREADING_SEQUENCED_WORKER_POOL_H_ 359