1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/threading/sequenced_worker_pool.h" 6 7 #include <list> 8 #include <map> 9 #include <set> 10 #include <utility> 11 #include <vector> 12 13 #include "base/atomic_sequence_num.h" 14 #include "base/callback.h" 15 #include "base/compiler_specific.h" 16 #include "base/critical_closure.h" 17 #include "base/debug/trace_event.h" 18 #include "base/lazy_instance.h" 19 #include "base/logging.h" 20 #include "base/memory/linked_ptr.h" 21 #include "base/message_loop/message_loop_proxy.h" 22 #include "base/stl_util.h" 23 #include "base/strings/stringprintf.h" 24 #include "base/synchronization/condition_variable.h" 25 #include "base/synchronization/lock.h" 26 #include "base/threading/platform_thread.h" 27 #include "base/threading/simple_thread.h" 28 #include "base/threading/thread_local.h" 29 #include "base/threading/thread_restrictions.h" 30 #include "base/time/time.h" 31 #include "base/tracked_objects.h" 32 33 #if defined(OS_MACOSX) 34 #include "base/mac/scoped_nsautorelease_pool.h" 35 #endif 36 37 #if !defined(OS_NACL) 38 #include "base/metrics/histogram.h" 39 #endif 40 41 namespace base { 42 43 namespace { 44 45 struct SequencedTask : public TrackingInfo { 46 SequencedTask() 47 : sequence_token_id(0), 48 trace_id(0), 49 sequence_task_number(0), 50 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 51 52 explicit SequencedTask(const tracked_objects::Location& from_here) 53 : base::TrackingInfo(from_here, TimeTicks()), 54 sequence_token_id(0), 55 trace_id(0), 56 sequence_task_number(0), 57 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 58 59 ~SequencedTask() {} 60 61 int sequence_token_id; 62 int trace_id; 63 int64 sequence_task_number; 64 SequencedWorkerPool::WorkerShutdown shutdown_behavior; 65 tracked_objects::Location posted_from; 66 Closure task; 67 68 // Non-delayed tasks and delayed tasks are managed together by time-to-run 69 // order. We calculate the time by adding the posted time and the given delay. 70 TimeTicks time_to_run; 71 }; 72 73 struct SequencedTaskLessThan { 74 public: 75 bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const { 76 if (lhs.time_to_run < rhs.time_to_run) 77 return true; 78 79 if (lhs.time_to_run > rhs.time_to_run) 80 return false; 81 82 // If the time happen to match, then we use the sequence number to decide. 83 return lhs.sequence_task_number < rhs.sequence_task_number; 84 } 85 }; 86 87 // SequencedWorkerPoolTaskRunner --------------------------------------------- 88 // A TaskRunner which posts tasks to a SequencedWorkerPool with a 89 // fixed ShutdownBehavior. 90 // 91 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 92 class SequencedWorkerPoolTaskRunner : public TaskRunner { 93 public: 94 SequencedWorkerPoolTaskRunner( 95 const scoped_refptr<SequencedWorkerPool>& pool, 96 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 97 98 // TaskRunner implementation 99 virtual bool PostDelayedTask(const tracked_objects::Location& from_here, 100 const Closure& task, 101 TimeDelta delay) OVERRIDE; 102 virtual bool RunsTasksOnCurrentThread() const OVERRIDE; 103 104 private: 105 virtual ~SequencedWorkerPoolTaskRunner(); 106 107 const scoped_refptr<SequencedWorkerPool> pool_; 108 109 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 110 111 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner); 112 }; 113 114 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner( 115 const scoped_refptr<SequencedWorkerPool>& pool, 116 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 117 : pool_(pool), 118 shutdown_behavior_(shutdown_behavior) { 119 } 120 121 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() { 122 } 123 124 bool SequencedWorkerPoolTaskRunner::PostDelayedTask( 125 const tracked_objects::Location& from_here, 126 const Closure& task, 127 TimeDelta delay) { 128 if (delay == TimeDelta()) { 129 return pool_->PostWorkerTaskWithShutdownBehavior( 130 from_here, task, shutdown_behavior_); 131 } 132 return pool_->PostDelayedWorkerTask(from_here, task, delay); 133 } 134 135 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const { 136 return pool_->RunsTasksOnCurrentThread(); 137 } 138 139 // SequencedWorkerPoolSequencedTaskRunner ------------------------------------ 140 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a 141 // fixed sequence token. 142 // 143 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 144 class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner { 145 public: 146 SequencedWorkerPoolSequencedTaskRunner( 147 const scoped_refptr<SequencedWorkerPool>& pool, 148 SequencedWorkerPool::SequenceToken token, 149 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 150 151 // TaskRunner implementation 152 virtual bool PostDelayedTask(const tracked_objects::Location& from_here, 153 const Closure& task, 154 TimeDelta delay) OVERRIDE; 155 virtual bool RunsTasksOnCurrentThread() const OVERRIDE; 156 157 // SequencedTaskRunner implementation 158 virtual bool PostNonNestableDelayedTask( 159 const tracked_objects::Location& from_here, 160 const Closure& task, 161 TimeDelta delay) OVERRIDE; 162 163 private: 164 virtual ~SequencedWorkerPoolSequencedTaskRunner(); 165 166 const scoped_refptr<SequencedWorkerPool> pool_; 167 168 const SequencedWorkerPool::SequenceToken token_; 169 170 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 171 172 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner); 173 }; 174 175 SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner( 176 const scoped_refptr<SequencedWorkerPool>& pool, 177 SequencedWorkerPool::SequenceToken token, 178 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 179 : pool_(pool), 180 token_(token), 181 shutdown_behavior_(shutdown_behavior) { 182 } 183 184 SequencedWorkerPoolSequencedTaskRunner:: 185 ~SequencedWorkerPoolSequencedTaskRunner() { 186 } 187 188 bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask( 189 const tracked_objects::Location& from_here, 190 const Closure& task, 191 TimeDelta delay) { 192 if (delay == TimeDelta()) { 193 return pool_->PostSequencedWorkerTaskWithShutdownBehavior( 194 token_, from_here, task, shutdown_behavior_); 195 } 196 return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay); 197 } 198 199 bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const { 200 return pool_->IsRunningSequenceOnCurrentThread(token_); 201 } 202 203 bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask( 204 const tracked_objects::Location& from_here, 205 const Closure& task, 206 TimeDelta delay) { 207 // There's no way to run nested tasks, so simply forward to 208 // PostDelayedTask. 209 return PostDelayedTask(from_here, task, delay); 210 } 211 212 // Create a process-wide unique ID to represent this task in trace events. This 213 // will be mangled with a Process ID hash to reduce the likelyhood of colliding 214 // with MessageLoop pointers on other processes. 215 uint64 GetTaskTraceID(const SequencedTask& task, 216 void* pool) { 217 return (static_cast<uint64>(task.trace_id) << 32) | 218 static_cast<uint64>(reinterpret_cast<intptr_t>(pool)); 219 } 220 221 base::LazyInstance<base::ThreadLocalPointer< 222 SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr = 223 LAZY_INSTANCE_INITIALIZER; 224 225 } // namespace 226 227 // Worker --------------------------------------------------------------------- 228 229 class SequencedWorkerPool::Worker : public SimpleThread { 230 public: 231 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it 232 // around as long as we are running. 233 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, 234 int thread_number, 235 const std::string& thread_name_prefix); 236 virtual ~Worker(); 237 238 // SimpleThread implementation. This actually runs the background thread. 239 virtual void Run() OVERRIDE; 240 241 void set_running_task_info(SequenceToken token, 242 WorkerShutdown shutdown_behavior) { 243 running_sequence_ = token; 244 running_shutdown_behavior_ = shutdown_behavior; 245 } 246 247 SequenceToken running_sequence() const { 248 return running_sequence_; 249 } 250 251 WorkerShutdown running_shutdown_behavior() const { 252 return running_shutdown_behavior_; 253 } 254 255 private: 256 scoped_refptr<SequencedWorkerPool> worker_pool_; 257 SequenceToken running_sequence_; 258 WorkerShutdown running_shutdown_behavior_; 259 260 DISALLOW_COPY_AND_ASSIGN(Worker); 261 }; 262 263 // Inner ---------------------------------------------------------------------- 264 265 class SequencedWorkerPool::Inner { 266 public: 267 // Take a raw pointer to |worker| to avoid cycles (since we're owned 268 // by it). 269 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 270 const std::string& thread_name_prefix, 271 TestingObserver* observer); 272 273 ~Inner(); 274 275 SequenceToken GetSequenceToken(); 276 277 SequenceToken GetNamedSequenceToken(const std::string& name); 278 279 // This function accepts a name and an ID. If the name is null, the 280 // token ID is used. This allows us to implement the optional name lookup 281 // from a single function without having to enter the lock a separate time. 282 bool PostTask(const std::string* optional_token_name, 283 SequenceToken sequence_token, 284 WorkerShutdown shutdown_behavior, 285 const tracked_objects::Location& from_here, 286 const Closure& task, 287 TimeDelta delay); 288 289 bool RunsTasksOnCurrentThread() const; 290 291 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 292 293 void CleanupForTesting(); 294 295 void SignalHasWorkForTesting(); 296 297 int GetWorkSignalCountForTesting() const; 298 299 void Shutdown(int max_blocking_tasks_after_shutdown); 300 301 bool IsShutdownInProgress(); 302 303 // Runs the worker loop on the background thread. 304 void ThreadLoop(Worker* this_worker); 305 306 private: 307 enum GetWorkStatus { 308 GET_WORK_FOUND, 309 GET_WORK_NOT_FOUND, 310 GET_WORK_WAIT, 311 }; 312 313 enum CleanupState { 314 CLEANUP_REQUESTED, 315 CLEANUP_STARTING, 316 CLEANUP_RUNNING, 317 CLEANUP_FINISHING, 318 CLEANUP_DONE, 319 }; 320 321 // Called from within the lock, this converts the given token name into a 322 // token ID, creating a new one if necessary. 323 int LockedGetNamedTokenID(const std::string& name); 324 325 // Called from within the lock, this returns the next sequence task number. 326 int64 LockedGetNextSequenceTaskNumber(); 327 328 // Called from within the lock, returns the shutdown behavior of the task 329 // running on the currently executing worker thread. If invoked from a thread 330 // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN. 331 WorkerShutdown LockedCurrentThreadShutdownBehavior() const; 332 333 // Gets new task. There are 3 cases depending on the return value: 334 // 335 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should 336 // be run immediately. 337 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run, 338 // and |task| is not filled in. In this case, the caller should wait until 339 // a task is posted. 340 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run 341 // immediately, and |task| is not filled in. Likewise, |wait_time| is 342 // filled in the time to wait until the next task to run. In this case, the 343 // caller should wait the time. 344 // 345 // In any case, the calling code should clear the given 346 // delete_these_outside_lock vector the next time the lock is released. 347 // See the implementation for a more detailed description. 348 GetWorkStatus GetWork(SequencedTask* task, 349 TimeDelta* wait_time, 350 std::vector<Closure>* delete_these_outside_lock); 351 352 void HandleCleanup(); 353 354 // Peforms init and cleanup around running the given task. WillRun... 355 // returns the value from PrepareToStartAdditionalThreadIfNecessary. 356 // The calling code should call FinishStartingAdditionalThread once the 357 // lock is released if the return values is nonzero. 358 int WillRunWorkerTask(const SequencedTask& task); 359 void DidRunWorkerTask(const SequencedTask& task); 360 361 // Returns true if there are no threads currently running the given 362 // sequence token. 363 bool IsSequenceTokenRunnable(int sequence_token_id) const; 364 365 // Checks if all threads are busy and the addition of one more could run an 366 // additional task waiting in the queue. This must be called from within 367 // the lock. 368 // 369 // If another thread is helpful, this will mark the thread as being in the 370 // process of starting and returns the index of the new thread which will be 371 // 0 or more. The caller should then call FinishStartingAdditionalThread to 372 // complete initialization once the lock is released. 373 // 374 // If another thread is not necessary, returne 0; 375 // 376 // See the implementedion for more. 377 int PrepareToStartAdditionalThreadIfHelpful(); 378 379 // The second part of thread creation after 380 // PrepareToStartAdditionalThreadIfHelpful with the thread number it 381 // generated. This actually creates the thread and should be called outside 382 // the lock to avoid blocking important work starting a thread in the lock. 383 void FinishStartingAdditionalThread(int thread_number); 384 385 // Signal |has_work_| and increment |has_work_signal_count_|. 386 void SignalHasWork(); 387 388 // Checks whether there is work left that's blocking shutdown. Must be 389 // called inside the lock. 390 bool CanShutdown() const; 391 392 SequencedWorkerPool* const worker_pool_; 393 394 // The last sequence number used. Managed by GetSequenceToken, since this 395 // only does threadsafe increment operations, you do not need to hold the 396 // lock. This is class-static to make SequenceTokens issued by 397 // GetSequenceToken unique across SequencedWorkerPool instances. 398 static base::StaticAtomicSequenceNumber g_last_sequence_number_; 399 400 // This lock protects |everything in this class|. Do not read or modify 401 // anything without holding this lock. Do not block while holding this 402 // lock. 403 mutable Lock lock_; 404 405 // Condition variable that is waited on by worker threads until new 406 // tasks are posted or shutdown starts. 407 ConditionVariable has_work_cv_; 408 409 // Condition variable that is waited on by non-worker threads (in 410 // Shutdown()) until CanShutdown() goes to true. 411 ConditionVariable can_shutdown_cv_; 412 413 // The maximum number of worker threads we'll create. 414 const size_t max_threads_; 415 416 const std::string thread_name_prefix_; 417 418 // Associates all known sequence token names with their IDs. 419 std::map<std::string, int> named_sequence_tokens_; 420 421 // Owning pointers to all threads we've created so far, indexed by 422 // ID. Since we lazily create threads, this may be less than 423 // max_threads_ and will be initially empty. 424 typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap; 425 ThreadMap threads_; 426 427 // Set to true when we're in the process of creating another thread. 428 // See PrepareToStartAdditionalThreadIfHelpful for more. 429 bool thread_being_created_; 430 431 // Number of threads currently waiting for work. 432 size_t waiting_thread_count_; 433 434 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN 435 // or SKIP_ON_SHUTDOWN flag set. 436 size_t blocking_shutdown_thread_count_; 437 438 // A set of all pending tasks in time-to-run order. These are tasks that are 439 // either waiting for a thread to run on, waiting for their time to run, 440 // or blocked on a previous task in their sequence. We have to iterate over 441 // the tasks by time-to-run order, so we use the set instead of the 442 // traditional priority_queue. 443 typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet; 444 PendingTaskSet pending_tasks_; 445 446 // The next sequence number for a new sequenced task. 447 int64 next_sequence_task_number_; 448 449 // Number of tasks in the pending_tasks_ list that are marked as blocking 450 // shutdown. 451 size_t blocking_shutdown_pending_task_count_; 452 453 // Lists all sequence tokens currently executing. 454 std::set<int> current_sequences_; 455 456 // An ID for each posted task to distinguish the task from others in traces. 457 int trace_id_; 458 459 // Set when Shutdown is called and no further tasks should be 460 // allowed, though we may still be running existing tasks. 461 bool shutdown_called_; 462 463 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() 464 // has been called. 465 int max_blocking_tasks_after_shutdown_; 466 467 // State used to cleanup for testing, all guarded by lock_. 468 CleanupState cleanup_state_; 469 size_t cleanup_idlers_; 470 ConditionVariable cleanup_cv_; 471 472 TestingObserver* const testing_observer_; 473 474 DISALLOW_COPY_AND_ASSIGN(Inner); 475 }; 476 477 // Worker definitions --------------------------------------------------------- 478 479 SequencedWorkerPool::Worker::Worker( 480 const scoped_refptr<SequencedWorkerPool>& worker_pool, 481 int thread_number, 482 const std::string& prefix) 483 : SimpleThread( 484 prefix + StringPrintf("Worker%d", thread_number).c_str()), 485 worker_pool_(worker_pool), 486 running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) { 487 Start(); 488 } 489 490 SequencedWorkerPool::Worker::~Worker() { 491 } 492 493 void SequencedWorkerPool::Worker::Run() { 494 // Store a pointer to the running sequence in thread local storage for 495 // static function access. 496 g_lazy_tls_ptr.Get().Set(&running_sequence_); 497 498 // Just jump back to the Inner object to run the thread, since it has all the 499 // tracking information and queues. It might be more natural to implement 500 // using DelegateSimpleThread and have Inner implement the Delegate to avoid 501 // having these worker objects at all, but that method lacks the ability to 502 // send thread-specific information easily to the thread loop. 503 worker_pool_->inner_->ThreadLoop(this); 504 // Release our cyclic reference once we're done. 505 worker_pool_ = NULL; 506 } 507 508 // Inner definitions --------------------------------------------------------- 509 510 SequencedWorkerPool::Inner::Inner( 511 SequencedWorkerPool* worker_pool, 512 size_t max_threads, 513 const std::string& thread_name_prefix, 514 TestingObserver* observer) 515 : worker_pool_(worker_pool), 516 lock_(), 517 has_work_cv_(&lock_), 518 can_shutdown_cv_(&lock_), 519 max_threads_(max_threads), 520 thread_name_prefix_(thread_name_prefix), 521 thread_being_created_(false), 522 waiting_thread_count_(0), 523 blocking_shutdown_thread_count_(0), 524 next_sequence_task_number_(0), 525 blocking_shutdown_pending_task_count_(0), 526 trace_id_(0), 527 shutdown_called_(false), 528 max_blocking_tasks_after_shutdown_(0), 529 cleanup_state_(CLEANUP_DONE), 530 cleanup_idlers_(0), 531 cleanup_cv_(&lock_), 532 testing_observer_(observer) {} 533 534 SequencedWorkerPool::Inner::~Inner() { 535 // You must call Shutdown() before destroying the pool. 536 DCHECK(shutdown_called_); 537 538 // Need to explicitly join with the threads before they're destroyed or else 539 // they will be running when our object is half torn down. 540 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 541 it->second->Join(); 542 threads_.clear(); 543 544 if (testing_observer_) 545 testing_observer_->OnDestruct(); 546 } 547 548 SequencedWorkerPool::SequenceToken 549 SequencedWorkerPool::Inner::GetSequenceToken() { 550 // Need to add one because StaticAtomicSequenceNumber starts at zero, which 551 // is used as a sentinel value in SequenceTokens. 552 return SequenceToken(g_last_sequence_number_.GetNext() + 1); 553 } 554 555 SequencedWorkerPool::SequenceToken 556 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 557 AutoLock lock(lock_); 558 return SequenceToken(LockedGetNamedTokenID(name)); 559 } 560 561 bool SequencedWorkerPool::Inner::PostTask( 562 const std::string* optional_token_name, 563 SequenceToken sequence_token, 564 WorkerShutdown shutdown_behavior, 565 const tracked_objects::Location& from_here, 566 const Closure& task, 567 TimeDelta delay) { 568 DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN); 569 SequencedTask sequenced(from_here); 570 sequenced.sequence_token_id = sequence_token.id_; 571 sequenced.shutdown_behavior = shutdown_behavior; 572 sequenced.posted_from = from_here; 573 sequenced.task = 574 shutdown_behavior == BLOCK_SHUTDOWN ? 575 base::MakeCriticalClosure(task) : task; 576 sequenced.time_to_run = TimeTicks::Now() + delay; 577 578 int create_thread_id = 0; 579 { 580 AutoLock lock(lock_); 581 if (shutdown_called_) { 582 if (shutdown_behavior != BLOCK_SHUTDOWN || 583 LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) { 584 return false; 585 } 586 if (max_blocking_tasks_after_shutdown_ <= 0) { 587 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; 588 return false; 589 } 590 max_blocking_tasks_after_shutdown_ -= 1; 591 } 592 593 // The trace_id is used for identifying the task in about:tracing. 594 sequenced.trace_id = trace_id_++; 595 596 TRACE_EVENT_FLOW_BEGIN0("task", "SequencedWorkerPool::PostTask", 597 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this)))); 598 599 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); 600 601 // Now that we have the lock, apply the named token rules. 602 if (optional_token_name) 603 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); 604 605 pending_tasks_.insert(sequenced); 606 if (shutdown_behavior == BLOCK_SHUTDOWN) 607 blocking_shutdown_pending_task_count_++; 608 609 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 610 } 611 612 // Actually start the additional thread or signal an existing one now that 613 // we're outside the lock. 614 if (create_thread_id) 615 FinishStartingAdditionalThread(create_thread_id); 616 else 617 SignalHasWork(); 618 619 return true; 620 } 621 622 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 623 AutoLock lock(lock_); 624 return ContainsKey(threads_, PlatformThread::CurrentId()); 625 } 626 627 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 628 SequenceToken sequence_token) const { 629 AutoLock lock(lock_); 630 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 631 if (found == threads_.end()) 632 return false; 633 return sequence_token.Equals(found->second->running_sequence()); 634 } 635 636 // See https://code.google.com/p/chromium/issues/detail?id=168415 637 void SequencedWorkerPool::Inner::CleanupForTesting() { 638 DCHECK(!RunsTasksOnCurrentThread()); 639 base::ThreadRestrictions::ScopedAllowWait allow_wait; 640 AutoLock lock(lock_); 641 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 642 if (shutdown_called_) 643 return; 644 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) 645 return; 646 cleanup_state_ = CLEANUP_REQUESTED; 647 cleanup_idlers_ = 0; 648 has_work_cv_.Signal(); 649 while (cleanup_state_ != CLEANUP_DONE) 650 cleanup_cv_.Wait(); 651 } 652 653 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { 654 SignalHasWork(); 655 } 656 657 void SequencedWorkerPool::Inner::Shutdown( 658 int max_new_blocking_tasks_after_shutdown) { 659 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); 660 { 661 AutoLock lock(lock_); 662 // Cleanup and Shutdown should not be called concurrently. 663 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 664 if (shutdown_called_) 665 return; 666 shutdown_called_ = true; 667 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; 668 669 // Tickle the threads. This will wake up a waiting one so it will know that 670 // it can exit, which in turn will wake up any other waiting ones. 671 SignalHasWork(); 672 673 // There are no pending or running tasks blocking shutdown, we're done. 674 if (CanShutdown()) 675 return; 676 } 677 678 // If we're here, then something is blocking shutdown. So wait for 679 // CanShutdown() to go to true. 680 681 if (testing_observer_) 682 testing_observer_->WillWaitForShutdown(); 683 684 #if !defined(OS_NACL) 685 TimeTicks shutdown_wait_begin = TimeTicks::Now(); 686 #endif 687 688 { 689 base::ThreadRestrictions::ScopedAllowWait allow_wait; 690 AutoLock lock(lock_); 691 while (!CanShutdown()) 692 can_shutdown_cv_.Wait(); 693 } 694 #if !defined(OS_NACL) 695 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", 696 TimeTicks::Now() - shutdown_wait_begin); 697 #endif 698 } 699 700 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { 701 AutoLock lock(lock_); 702 return shutdown_called_; 703 } 704 705 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 706 { 707 AutoLock lock(lock_); 708 DCHECK(thread_being_created_); 709 thread_being_created_ = false; 710 std::pair<ThreadMap::iterator, bool> result = 711 threads_.insert( 712 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); 713 DCHECK(result.second); 714 715 while (true) { 716 #if defined(OS_MACOSX) 717 base::mac::ScopedNSAutoreleasePool autorelease_pool; 718 #endif 719 720 HandleCleanup(); 721 722 // See GetWork for what delete_these_outside_lock is doing. 723 SequencedTask task; 724 TimeDelta wait_time; 725 std::vector<Closure> delete_these_outside_lock; 726 GetWorkStatus status = 727 GetWork(&task, &wait_time, &delete_these_outside_lock); 728 if (status == GET_WORK_FOUND) { 729 TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask", 730 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this)))); 731 TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop", 732 "src_file", task.posted_from.file_name(), 733 "src_func", task.posted_from.function_name()); 734 int new_thread_id = WillRunWorkerTask(task); 735 { 736 AutoUnlock unlock(lock_); 737 // There may be more work available, so wake up another 738 // worker thread. (Technically not required, since we 739 // already get a signal for each new task, but it doesn't 740 // hurt.) 741 SignalHasWork(); 742 delete_these_outside_lock.clear(); 743 744 // Complete thread creation outside the lock if necessary. 745 if (new_thread_id) 746 FinishStartingAdditionalThread(new_thread_id); 747 748 this_worker->set_running_task_info( 749 SequenceToken(task.sequence_token_id), task.shutdown_behavior); 750 751 tracked_objects::TrackedTime start_time = 752 tracked_objects::ThreadData::NowForStartOfRun(task.birth_tally); 753 754 task.task.Run(); 755 756 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task, 757 start_time, tracked_objects::ThreadData::NowForEndOfRun()); 758 759 // Make sure our task is erased outside the lock for the 760 // same reason we do this with delete_these_oustide_lock. 761 // Also, do it before calling set_running_task_info() so 762 // that sequence-checking from within the task's destructor 763 // still works. 764 task.task = Closure(); 765 766 this_worker->set_running_task_info( 767 SequenceToken(), CONTINUE_ON_SHUTDOWN); 768 } 769 DidRunWorkerTask(task); // Must be done inside the lock. 770 } else if (cleanup_state_ == CLEANUP_RUNNING) { 771 switch (status) { 772 case GET_WORK_WAIT: { 773 AutoUnlock unlock(lock_); 774 delete_these_outside_lock.clear(); 775 } 776 break; 777 case GET_WORK_NOT_FOUND: 778 CHECK(delete_these_outside_lock.empty()); 779 cleanup_state_ = CLEANUP_FINISHING; 780 cleanup_cv_.Broadcast(); 781 break; 782 default: 783 NOTREACHED(); 784 } 785 } else { 786 // When we're terminating and there's no more work, we can 787 // shut down, other workers can complete any pending or new tasks. 788 // We can get additional tasks posted after shutdown_called_ is set 789 // but only worker threads are allowed to post tasks at that time, and 790 // the workers responsible for posting those tasks will be available 791 // to run them. Also, there may be some tasks stuck behind running 792 // ones with the same sequence token, but additional threads won't 793 // help this case. 794 if (shutdown_called_ && 795 blocking_shutdown_pending_task_count_ == 0) 796 break; 797 waiting_thread_count_++; 798 799 switch (status) { 800 case GET_WORK_NOT_FOUND: 801 has_work_cv_.Wait(); 802 break; 803 case GET_WORK_WAIT: 804 has_work_cv_.TimedWait(wait_time); 805 break; 806 default: 807 NOTREACHED(); 808 } 809 waiting_thread_count_--; 810 } 811 } 812 } // Release lock_. 813 814 // We noticed we should exit. Wake up the next worker so it knows it should 815 // exit as well (because the Shutdown() code only signals once). 816 SignalHasWork(); 817 818 // Possibly unblock shutdown. 819 can_shutdown_cv_.Signal(); 820 } 821 822 void SequencedWorkerPool::Inner::HandleCleanup() { 823 lock_.AssertAcquired(); 824 if (cleanup_state_ == CLEANUP_DONE) 825 return; 826 if (cleanup_state_ == CLEANUP_REQUESTED) { 827 // We win, we get to do the cleanup as soon as the others wise up and idle. 828 cleanup_state_ = CLEANUP_STARTING; 829 while (thread_being_created_ || 830 cleanup_idlers_ != threads_.size() - 1) { 831 has_work_cv_.Signal(); 832 cleanup_cv_.Wait(); 833 } 834 cleanup_state_ = CLEANUP_RUNNING; 835 return; 836 } 837 if (cleanup_state_ == CLEANUP_STARTING) { 838 // Another worker thread is cleaning up, we idle here until thats done. 839 ++cleanup_idlers_; 840 cleanup_cv_.Broadcast(); 841 while (cleanup_state_ != CLEANUP_FINISHING) { 842 cleanup_cv_.Wait(); 843 } 844 --cleanup_idlers_; 845 cleanup_cv_.Broadcast(); 846 return; 847 } 848 if (cleanup_state_ == CLEANUP_FINISHING) { 849 // We wait for all idlers to wake up prior to being DONE. 850 while (cleanup_idlers_ != 0) { 851 cleanup_cv_.Broadcast(); 852 cleanup_cv_.Wait(); 853 } 854 if (cleanup_state_ == CLEANUP_FINISHING) { 855 cleanup_state_ = CLEANUP_DONE; 856 cleanup_cv_.Signal(); 857 } 858 return; 859 } 860 } 861 862 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( 863 const std::string& name) { 864 lock_.AssertAcquired(); 865 DCHECK(!name.empty()); 866 867 std::map<std::string, int>::const_iterator found = 868 named_sequence_tokens_.find(name); 869 if (found != named_sequence_tokens_.end()) 870 return found->second; // Got an existing one. 871 872 // Create a new one for this name. 873 SequenceToken result = GetSequenceToken(); 874 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); 875 return result.id_; 876 } 877 878 int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { 879 lock_.AssertAcquired(); 880 // We assume that we never create enough tasks to wrap around. 881 return next_sequence_task_number_++; 882 } 883 884 SequencedWorkerPool::WorkerShutdown 885 SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const { 886 lock_.AssertAcquired(); 887 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 888 if (found == threads_.end()) 889 return CONTINUE_ON_SHUTDOWN; 890 return found->second->running_shutdown_behavior(); 891 } 892 893 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 894 SequencedTask* task, 895 TimeDelta* wait_time, 896 std::vector<Closure>* delete_these_outside_lock) { 897 lock_.AssertAcquired(); 898 899 #if !defined(OS_NACL) 900 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", 901 static_cast<int>(pending_tasks_.size())); 902 #endif 903 904 // Find the next task with a sequence token that's not currently in use. 905 // If the token is in use, that means another thread is running something 906 // in that sequence, and we can't run it without going out-of-order. 907 // 908 // This algorithm is simple and fair, but inefficient in some cases. For 909 // example, say somebody schedules 1000 slow tasks with the same sequence 910 // number. We'll have to go through all those tasks each time we feel like 911 // there might be work to schedule. If this proves to be a problem, we 912 // should make this more efficient. 913 // 914 // One possible enhancement would be to keep a map from sequence ID to a 915 // list of pending but currently blocked SequencedTasks for that ID. 916 // When a worker finishes a task of one sequence token, it can pick up the 917 // next one from that token right away. 918 // 919 // This may lead to starvation if there are sufficient numbers of sequences 920 // in use. To alleviate this, we could add an incrementing priority counter 921 // to each SequencedTask. Then maintain a priority_queue of all runnable 922 // tasks, sorted by priority counter. When a sequenced task is completed 923 // we would pop the head element off of that tasks pending list and add it 924 // to the priority queue. Then we would run the first item in the priority 925 // queue. 926 927 GetWorkStatus status = GET_WORK_NOT_FOUND; 928 int unrunnable_tasks = 0; 929 PendingTaskSet::iterator i = pending_tasks_.begin(); 930 // We assume that the loop below doesn't take too long and so we can just do 931 // a single call to TimeTicks::Now(). 932 const TimeTicks current_time = TimeTicks::Now(); 933 while (i != pending_tasks_.end()) { 934 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { 935 unrunnable_tasks++; 936 ++i; 937 continue; 938 } 939 940 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { 941 // We're shutting down and the task we just found isn't blocking 942 // shutdown. Delete it and get more work. 943 // 944 // Note that we do not want to delete unrunnable tasks. Deleting a task 945 // can have side effects (like freeing some objects) and deleting a 946 // task that's supposed to run after one that's currently running could 947 // cause an obscure crash. 948 // 949 // We really want to delete these tasks outside the lock in case the 950 // closures are holding refs to objects that want to post work from 951 // their destructorss (which would deadlock). The closures are 952 // internally refcounted, so we just need to keep a copy of them alive 953 // until the lock is exited. The calling code can just clear() the 954 // vector they passed to us once the lock is exited to make this 955 // happen. 956 delete_these_outside_lock->push_back(i->task); 957 pending_tasks_.erase(i++); 958 continue; 959 } 960 961 if (i->time_to_run > current_time) { 962 // The time to run has not come yet. 963 *wait_time = i->time_to_run - current_time; 964 status = GET_WORK_WAIT; 965 if (cleanup_state_ == CLEANUP_RUNNING) { 966 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. 967 delete_these_outside_lock->push_back(i->task); 968 pending_tasks_.erase(i); 969 } 970 break; 971 } 972 973 // Found a runnable task. 974 *task = *i; 975 pending_tasks_.erase(i); 976 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { 977 blocking_shutdown_pending_task_count_--; 978 } 979 980 status = GET_WORK_FOUND; 981 break; 982 } 983 984 // Track the number of tasks we had to skip over to see if we should be 985 // making this more efficient. If this number ever becomes large or is 986 // frequently "some", we should consider the optimization above. 987 #if !defined(OS_NACL) 988 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount", 989 unrunnable_tasks); 990 #endif 991 return status; 992 } 993 994 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { 995 lock_.AssertAcquired(); 996 997 // Mark the task's sequence number as in use. 998 if (task.sequence_token_id) 999 current_sequences_.insert(task.sequence_token_id); 1000 1001 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN 1002 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread 1003 // completes. 1004 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) 1005 blocking_shutdown_thread_count_++; 1006 1007 // We just picked up a task. Since StartAdditionalThreadIfHelpful only 1008 // creates a new thread if there is no free one, there is a race when posting 1009 // tasks that many tasks could have been posted before a thread started 1010 // running them, so only one thread would have been created. So we also check 1011 // whether we should create more threads after removing our task from the 1012 // queue, which also has the nice side effect of creating the workers from 1013 // background threads rather than the main thread of the app. 1014 // 1015 // If another thread wasn't created, we want to wake up an existing thread 1016 // if there is one waiting to pick up the next task. 1017 // 1018 // Note that we really need to do this *before* running the task, not 1019 // after. Otherwise, if more than one task is posted, the creation of the 1020 // second thread (since we only create one at a time) will be blocked by 1021 // the execution of the first task, which could be arbitrarily long. 1022 return PrepareToStartAdditionalThreadIfHelpful(); 1023 } 1024 1025 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { 1026 lock_.AssertAcquired(); 1027 1028 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { 1029 DCHECK_GT(blocking_shutdown_thread_count_, 0u); 1030 blocking_shutdown_thread_count_--; 1031 } 1032 1033 if (task.sequence_token_id) 1034 current_sequences_.erase(task.sequence_token_id); 1035 } 1036 1037 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( 1038 int sequence_token_id) const { 1039 lock_.AssertAcquired(); 1040 return !sequence_token_id || 1041 current_sequences_.find(sequence_token_id) == 1042 current_sequences_.end(); 1043 } 1044 1045 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { 1046 lock_.AssertAcquired(); 1047 // How thread creation works: 1048 // 1049 // We'de like to avoid creating threads with the lock held. However, we 1050 // need to be sure that we have an accurate accounting of the threads for 1051 // proper Joining and deltion on shutdown. 1052 // 1053 // We need to figure out if we need another thread with the lock held, which 1054 // is what this function does. It then marks us as in the process of creating 1055 // a thread. When we do shutdown, we wait until the thread_being_created_ 1056 // flag is cleared, which ensures that the new thread is properly added to 1057 // all the data structures and we can't leak it. Once shutdown starts, we'll 1058 // refuse to create more threads or they would be leaked. 1059 // 1060 // Note that this creates a mostly benign race condition on shutdown that 1061 // will cause fewer workers to be created than one would expect. It isn't 1062 // much of an issue in real life, but affects some tests. Since we only spawn 1063 // one worker at a time, the following sequence of events can happen: 1064 // 1065 // 1. Main thread posts a bunch of unrelated tasks that would normally be 1066 // run on separate threads. 1067 // 2. The first task post causes us to start a worker. Other tasks do not 1068 // cause a worker to start since one is pending. 1069 // 3. Main thread initiates shutdown. 1070 // 4. No more threads are created since the shutdown_called_ flag is set. 1071 // 1072 // The result is that one may expect that max_threads_ workers to be created 1073 // given the workload, but in reality fewer may be created because the 1074 // sequence of thread creation on the background threads is racing with the 1075 // shutdown call. 1076 if (!shutdown_called_ && 1077 !thread_being_created_ && 1078 cleanup_state_ == CLEANUP_DONE && 1079 threads_.size() < max_threads_ && 1080 waiting_thread_count_ == 0) { 1081 // We could use an additional thread if there's work to be done. 1082 for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); 1083 i != pending_tasks_.end(); ++i) { 1084 if (IsSequenceTokenRunnable(i->sequence_token_id)) { 1085 // Found a runnable task, mark the thread as being started. 1086 thread_being_created_ = true; 1087 return static_cast<int>(threads_.size() + 1); 1088 } 1089 } 1090 } 1091 return 0; 1092 } 1093 1094 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( 1095 int thread_number) { 1096 // Called outside of the lock. 1097 DCHECK(thread_number > 0); 1098 1099 // The worker is assigned to the list when the thread actually starts, which 1100 // will manage the memory of the pointer. 1101 new Worker(worker_pool_, thread_number, thread_name_prefix_); 1102 } 1103 1104 void SequencedWorkerPool::Inner::SignalHasWork() { 1105 has_work_cv_.Signal(); 1106 if (testing_observer_) { 1107 testing_observer_->OnHasWork(); 1108 } 1109 } 1110 1111 bool SequencedWorkerPool::Inner::CanShutdown() const { 1112 lock_.AssertAcquired(); 1113 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 1114 return !thread_being_created_ && 1115 blocking_shutdown_thread_count_ == 0 && 1116 blocking_shutdown_pending_task_count_ == 0; 1117 } 1118 1119 base::StaticAtomicSequenceNumber 1120 SequencedWorkerPool::Inner::g_last_sequence_number_; 1121 1122 // SequencedWorkerPool -------------------------------------------------------- 1123 1124 // static 1125 SequencedWorkerPool::SequenceToken 1126 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { 1127 // Don't construct lazy instance on check. 1128 if (g_lazy_tls_ptr == NULL) 1129 return SequenceToken(); 1130 1131 SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get(); 1132 if (!token) 1133 return SequenceToken(); 1134 return *token; 1135 } 1136 1137 SequencedWorkerPool::SequencedWorkerPool( 1138 size_t max_threads, 1139 const std::string& thread_name_prefix) 1140 : constructor_message_loop_(MessageLoopProxy::current()), 1141 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { 1142 } 1143 1144 SequencedWorkerPool::SequencedWorkerPool( 1145 size_t max_threads, 1146 const std::string& thread_name_prefix, 1147 TestingObserver* observer) 1148 : constructor_message_loop_(MessageLoopProxy::current()), 1149 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { 1150 } 1151 1152 SequencedWorkerPool::~SequencedWorkerPool() {} 1153 1154 void SequencedWorkerPool::OnDestruct() const { 1155 DCHECK(constructor_message_loop_.get()); 1156 // Avoid deleting ourselves on a worker thread (which would 1157 // deadlock). 1158 if (RunsTasksOnCurrentThread()) { 1159 constructor_message_loop_->DeleteSoon(FROM_HERE, this); 1160 } else { 1161 delete this; 1162 } 1163 } 1164 1165 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 1166 return inner_->GetSequenceToken(); 1167 } 1168 1169 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 1170 const std::string& name) { 1171 return inner_->GetNamedSequenceToken(name); 1172 } 1173 1174 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( 1175 SequenceToken token) { 1176 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); 1177 } 1178 1179 scoped_refptr<SequencedTaskRunner> 1180 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior( 1181 SequenceToken token, WorkerShutdown shutdown_behavior) { 1182 return new SequencedWorkerPoolSequencedTaskRunner( 1183 this, token, shutdown_behavior); 1184 } 1185 1186 scoped_refptr<TaskRunner> 1187 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior( 1188 WorkerShutdown shutdown_behavior) { 1189 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior); 1190 } 1191 1192 bool SequencedWorkerPool::PostWorkerTask( 1193 const tracked_objects::Location& from_here, 1194 const Closure& task) { 1195 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, 1196 from_here, task, TimeDelta()); 1197 } 1198 1199 bool SequencedWorkerPool::PostDelayedWorkerTask( 1200 const tracked_objects::Location& from_here, 1201 const Closure& task, 1202 TimeDelta delay) { 1203 WorkerShutdown shutdown_behavior = 1204 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1205 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, 1206 from_here, task, delay); 1207 } 1208 1209 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( 1210 const tracked_objects::Location& from_here, 1211 const Closure& task, 1212 WorkerShutdown shutdown_behavior) { 1213 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, 1214 from_here, task, TimeDelta()); 1215 } 1216 1217 bool SequencedWorkerPool::PostSequencedWorkerTask( 1218 SequenceToken sequence_token, 1219 const tracked_objects::Location& from_here, 1220 const Closure& task) { 1221 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN, 1222 from_here, task, TimeDelta()); 1223 } 1224 1225 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask( 1226 SequenceToken sequence_token, 1227 const tracked_objects::Location& from_here, 1228 const Closure& task, 1229 TimeDelta delay) { 1230 WorkerShutdown shutdown_behavior = 1231 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1232 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, 1233 from_here, task, delay); 1234 } 1235 1236 bool SequencedWorkerPool::PostNamedSequencedWorkerTask( 1237 const std::string& token_name, 1238 const tracked_objects::Location& from_here, 1239 const Closure& task) { 1240 DCHECK(!token_name.empty()); 1241 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN, 1242 from_here, task, TimeDelta()); 1243 } 1244 1245 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( 1246 SequenceToken sequence_token, 1247 const tracked_objects::Location& from_here, 1248 const Closure& task, 1249 WorkerShutdown shutdown_behavior) { 1250 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, 1251 from_here, task, TimeDelta()); 1252 } 1253 1254 bool SequencedWorkerPool::PostDelayedTask( 1255 const tracked_objects::Location& from_here, 1256 const Closure& task, 1257 TimeDelta delay) { 1258 return PostDelayedWorkerTask(from_here, task, delay); 1259 } 1260 1261 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1262 return inner_->RunsTasksOnCurrentThread(); 1263 } 1264 1265 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1266 SequenceToken sequence_token) const { 1267 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1268 } 1269 1270 void SequencedWorkerPool::FlushForTesting() { 1271 inner_->CleanupForTesting(); 1272 } 1273 1274 void SequencedWorkerPool::SignalHasWorkForTesting() { 1275 inner_->SignalHasWorkForTesting(); 1276 } 1277 1278 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1279 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); 1280 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1281 } 1282 1283 bool SequencedWorkerPool::IsShutdownInProgress() { 1284 return inner_->IsShutdownInProgress(); 1285 } 1286 1287 } // namespace base 1288