1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/threading/sequenced_worker_pool.h" 6 7 #include <list> 8 #include <map> 9 #include <set> 10 #include <utility> 11 #include <vector> 12 13 #include "base/atomic_sequence_num.h" 14 #include "base/callback.h" 15 #include "base/compiler_specific.h" 16 #include "base/critical_closure.h" 17 #include "base/debug/trace_event.h" 18 #include "base/lazy_instance.h" 19 #include "base/logging.h" 20 #include "base/memory/linked_ptr.h" 21 #include "base/message_loop/message_loop_proxy.h" 22 #include "base/stl_util.h" 23 #include "base/strings/stringprintf.h" 24 #include "base/synchronization/condition_variable.h" 25 #include "base/synchronization/lock.h" 26 #include "base/threading/platform_thread.h" 27 #include "base/threading/simple_thread.h" 28 #include "base/threading/thread_local.h" 29 #include "base/threading/thread_restrictions.h" 30 #include "base/time/time.h" 31 #include "base/tracked_objects.h" 32 33 #if defined(OS_MACOSX) 34 #include "base/mac/scoped_nsautorelease_pool.h" 35 #elif defined(OS_WIN) 36 #include "base/win/scoped_com_initializer.h" 37 #endif 38 39 #if !defined(OS_NACL) 40 #include "base/metrics/histogram.h" 41 #endif 42 43 namespace base { 44 45 namespace { 46 47 struct SequencedTask : public TrackingInfo { 48 SequencedTask() 49 : sequence_token_id(0), 50 trace_id(0), 51 sequence_task_number(0), 52 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 53 54 explicit SequencedTask(const tracked_objects::Location& from_here) 55 : base::TrackingInfo(from_here, TimeTicks()), 56 sequence_token_id(0), 57 trace_id(0), 58 sequence_task_number(0), 59 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 60 61 ~SequencedTask() {} 62 63 int sequence_token_id; 64 int trace_id; 65 int64 sequence_task_number; 66 SequencedWorkerPool::WorkerShutdown shutdown_behavior; 67 tracked_objects::Location posted_from; 68 Closure task; 69 70 // Non-delayed tasks and delayed tasks are managed together by time-to-run 71 // order. We calculate the time by adding the posted time and the given delay. 72 TimeTicks time_to_run; 73 }; 74 75 struct SequencedTaskLessThan { 76 public: 77 bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const { 78 if (lhs.time_to_run < rhs.time_to_run) 79 return true; 80 81 if (lhs.time_to_run > rhs.time_to_run) 82 return false; 83 84 // If the time happen to match, then we use the sequence number to decide. 85 return lhs.sequence_task_number < rhs.sequence_task_number; 86 } 87 }; 88 89 // SequencedWorkerPoolTaskRunner --------------------------------------------- 90 // A TaskRunner which posts tasks to a SequencedWorkerPool with a 91 // fixed ShutdownBehavior. 92 // 93 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 94 class SequencedWorkerPoolTaskRunner : public TaskRunner { 95 public: 96 SequencedWorkerPoolTaskRunner( 97 const scoped_refptr<SequencedWorkerPool>& pool, 98 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 99 100 // TaskRunner implementation 101 virtual bool PostDelayedTask(const tracked_objects::Location& from_here, 102 const Closure& task, 103 TimeDelta delay) OVERRIDE; 104 virtual bool RunsTasksOnCurrentThread() const OVERRIDE; 105 106 private: 107 virtual ~SequencedWorkerPoolTaskRunner(); 108 109 const scoped_refptr<SequencedWorkerPool> pool_; 110 111 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 112 113 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner); 114 }; 115 116 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner( 117 const scoped_refptr<SequencedWorkerPool>& pool, 118 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 119 : pool_(pool), 120 shutdown_behavior_(shutdown_behavior) { 121 } 122 123 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() { 124 } 125 126 bool SequencedWorkerPoolTaskRunner::PostDelayedTask( 127 const tracked_objects::Location& from_here, 128 const Closure& task, 129 TimeDelta delay) { 130 if (delay == TimeDelta()) { 131 return pool_->PostWorkerTaskWithShutdownBehavior( 132 from_here, task, shutdown_behavior_); 133 } 134 return pool_->PostDelayedWorkerTask(from_here, task, delay); 135 } 136 137 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const { 138 return pool_->RunsTasksOnCurrentThread(); 139 } 140 141 // SequencedWorkerPoolSequencedTaskRunner ------------------------------------ 142 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a 143 // fixed sequence token. 144 // 145 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 146 class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner { 147 public: 148 SequencedWorkerPoolSequencedTaskRunner( 149 const scoped_refptr<SequencedWorkerPool>& pool, 150 SequencedWorkerPool::SequenceToken token, 151 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 152 153 // TaskRunner implementation 154 virtual bool PostDelayedTask(const tracked_objects::Location& from_here, 155 const Closure& task, 156 TimeDelta delay) OVERRIDE; 157 virtual bool RunsTasksOnCurrentThread() const OVERRIDE; 158 159 // SequencedTaskRunner implementation 160 virtual bool PostNonNestableDelayedTask( 161 const tracked_objects::Location& from_here, 162 const Closure& task, 163 TimeDelta delay) OVERRIDE; 164 165 private: 166 virtual ~SequencedWorkerPoolSequencedTaskRunner(); 167 168 const scoped_refptr<SequencedWorkerPool> pool_; 169 170 const SequencedWorkerPool::SequenceToken token_; 171 172 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 173 174 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner); 175 }; 176 177 SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner( 178 const scoped_refptr<SequencedWorkerPool>& pool, 179 SequencedWorkerPool::SequenceToken token, 180 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 181 : pool_(pool), 182 token_(token), 183 shutdown_behavior_(shutdown_behavior) { 184 } 185 186 SequencedWorkerPoolSequencedTaskRunner:: 187 ~SequencedWorkerPoolSequencedTaskRunner() { 188 } 189 190 bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask( 191 const tracked_objects::Location& from_here, 192 const Closure& task, 193 TimeDelta delay) { 194 if (delay == TimeDelta()) { 195 return pool_->PostSequencedWorkerTaskWithShutdownBehavior( 196 token_, from_here, task, shutdown_behavior_); 197 } 198 return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay); 199 } 200 201 bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const { 202 return pool_->IsRunningSequenceOnCurrentThread(token_); 203 } 204 205 bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask( 206 const tracked_objects::Location& from_here, 207 const Closure& task, 208 TimeDelta delay) { 209 // There's no way to run nested tasks, so simply forward to 210 // PostDelayedTask. 211 return PostDelayedTask(from_here, task, delay); 212 } 213 214 // Create a process-wide unique ID to represent this task in trace events. This 215 // will be mangled with a Process ID hash to reduce the likelyhood of colliding 216 // with MessageLoop pointers on other processes. 217 uint64 GetTaskTraceID(const SequencedTask& task, 218 void* pool) { 219 return (static_cast<uint64>(task.trace_id) << 32) | 220 static_cast<uint64>(reinterpret_cast<intptr_t>(pool)); 221 } 222 223 base::LazyInstance<base::ThreadLocalPointer< 224 SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr = 225 LAZY_INSTANCE_INITIALIZER; 226 227 } // namespace 228 229 // Worker --------------------------------------------------------------------- 230 231 class SequencedWorkerPool::Worker : public SimpleThread { 232 public: 233 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it 234 // around as long as we are running. 235 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, 236 int thread_number, 237 const std::string& thread_name_prefix); 238 virtual ~Worker(); 239 240 // SimpleThread implementation. This actually runs the background thread. 241 virtual void Run() OVERRIDE; 242 243 void set_running_task_info(SequenceToken token, 244 WorkerShutdown shutdown_behavior) { 245 running_sequence_ = token; 246 running_shutdown_behavior_ = shutdown_behavior; 247 } 248 249 SequenceToken running_sequence() const { 250 return running_sequence_; 251 } 252 253 WorkerShutdown running_shutdown_behavior() const { 254 return running_shutdown_behavior_; 255 } 256 257 private: 258 scoped_refptr<SequencedWorkerPool> worker_pool_; 259 SequenceToken running_sequence_; 260 WorkerShutdown running_shutdown_behavior_; 261 262 DISALLOW_COPY_AND_ASSIGN(Worker); 263 }; 264 265 // Inner ---------------------------------------------------------------------- 266 267 class SequencedWorkerPool::Inner { 268 public: 269 // Take a raw pointer to |worker| to avoid cycles (since we're owned 270 // by it). 271 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 272 const std::string& thread_name_prefix, 273 TestingObserver* observer); 274 275 ~Inner(); 276 277 SequenceToken GetSequenceToken(); 278 279 SequenceToken GetNamedSequenceToken(const std::string& name); 280 281 // This function accepts a name and an ID. If the name is null, the 282 // token ID is used. This allows us to implement the optional name lookup 283 // from a single function without having to enter the lock a separate time. 284 bool PostTask(const std::string* optional_token_name, 285 SequenceToken sequence_token, 286 WorkerShutdown shutdown_behavior, 287 const tracked_objects::Location& from_here, 288 const Closure& task, 289 TimeDelta delay); 290 291 bool RunsTasksOnCurrentThread() const; 292 293 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 294 295 void CleanupForTesting(); 296 297 void SignalHasWorkForTesting(); 298 299 int GetWorkSignalCountForTesting() const; 300 301 void Shutdown(int max_blocking_tasks_after_shutdown); 302 303 bool IsShutdownInProgress(); 304 305 // Runs the worker loop on the background thread. 306 void ThreadLoop(Worker* this_worker); 307 308 private: 309 enum GetWorkStatus { 310 GET_WORK_FOUND, 311 GET_WORK_NOT_FOUND, 312 GET_WORK_WAIT, 313 }; 314 315 enum CleanupState { 316 CLEANUP_REQUESTED, 317 CLEANUP_STARTING, 318 CLEANUP_RUNNING, 319 CLEANUP_FINISHING, 320 CLEANUP_DONE, 321 }; 322 323 // Called from within the lock, this converts the given token name into a 324 // token ID, creating a new one if necessary. 325 int LockedGetNamedTokenID(const std::string& name); 326 327 // Called from within the lock, this returns the next sequence task number. 328 int64 LockedGetNextSequenceTaskNumber(); 329 330 // Called from within the lock, returns the shutdown behavior of the task 331 // running on the currently executing worker thread. If invoked from a thread 332 // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN. 333 WorkerShutdown LockedCurrentThreadShutdownBehavior() const; 334 335 // Gets new task. There are 3 cases depending on the return value: 336 // 337 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should 338 // be run immediately. 339 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run, 340 // and |task| is not filled in. In this case, the caller should wait until 341 // a task is posted. 342 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run 343 // immediately, and |task| is not filled in. Likewise, |wait_time| is 344 // filled in the time to wait until the next task to run. In this case, the 345 // caller should wait the time. 346 // 347 // In any case, the calling code should clear the given 348 // delete_these_outside_lock vector the next time the lock is released. 349 // See the implementation for a more detailed description. 350 GetWorkStatus GetWork(SequencedTask* task, 351 TimeDelta* wait_time, 352 std::vector<Closure>* delete_these_outside_lock); 353 354 void HandleCleanup(); 355 356 // Peforms init and cleanup around running the given task. WillRun... 357 // returns the value from PrepareToStartAdditionalThreadIfNecessary. 358 // The calling code should call FinishStartingAdditionalThread once the 359 // lock is released if the return values is nonzero. 360 int WillRunWorkerTask(const SequencedTask& task); 361 void DidRunWorkerTask(const SequencedTask& task); 362 363 // Returns true if there are no threads currently running the given 364 // sequence token. 365 bool IsSequenceTokenRunnable(int sequence_token_id) const; 366 367 // Checks if all threads are busy and the addition of one more could run an 368 // additional task waiting in the queue. This must be called from within 369 // the lock. 370 // 371 // If another thread is helpful, this will mark the thread as being in the 372 // process of starting and returns the index of the new thread which will be 373 // 0 or more. The caller should then call FinishStartingAdditionalThread to 374 // complete initialization once the lock is released. 375 // 376 // If another thread is not necessary, returne 0; 377 // 378 // See the implementedion for more. 379 int PrepareToStartAdditionalThreadIfHelpful(); 380 381 // The second part of thread creation after 382 // PrepareToStartAdditionalThreadIfHelpful with the thread number it 383 // generated. This actually creates the thread and should be called outside 384 // the lock to avoid blocking important work starting a thread in the lock. 385 void FinishStartingAdditionalThread(int thread_number); 386 387 // Signal |has_work_| and increment |has_work_signal_count_|. 388 void SignalHasWork(); 389 390 // Checks whether there is work left that's blocking shutdown. Must be 391 // called inside the lock. 392 bool CanShutdown() const; 393 394 SequencedWorkerPool* const worker_pool_; 395 396 // The last sequence number used. Managed by GetSequenceToken, since this 397 // only does threadsafe increment operations, you do not need to hold the 398 // lock. This is class-static to make SequenceTokens issued by 399 // GetSequenceToken unique across SequencedWorkerPool instances. 400 static base::StaticAtomicSequenceNumber g_last_sequence_number_; 401 402 // This lock protects |everything in this class|. Do not read or modify 403 // anything without holding this lock. Do not block while holding this 404 // lock. 405 mutable Lock lock_; 406 407 // Condition variable that is waited on by worker threads until new 408 // tasks are posted or shutdown starts. 409 ConditionVariable has_work_cv_; 410 411 // Condition variable that is waited on by non-worker threads (in 412 // Shutdown()) until CanShutdown() goes to true. 413 ConditionVariable can_shutdown_cv_; 414 415 // The maximum number of worker threads we'll create. 416 const size_t max_threads_; 417 418 const std::string thread_name_prefix_; 419 420 // Associates all known sequence token names with their IDs. 421 std::map<std::string, int> named_sequence_tokens_; 422 423 // Owning pointers to all threads we've created so far, indexed by 424 // ID. Since we lazily create threads, this may be less than 425 // max_threads_ and will be initially empty. 426 typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap; 427 ThreadMap threads_; 428 429 // Set to true when we're in the process of creating another thread. 430 // See PrepareToStartAdditionalThreadIfHelpful for more. 431 bool thread_being_created_; 432 433 // Number of threads currently waiting for work. 434 size_t waiting_thread_count_; 435 436 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN 437 // or SKIP_ON_SHUTDOWN flag set. 438 size_t blocking_shutdown_thread_count_; 439 440 // A set of all pending tasks in time-to-run order. These are tasks that are 441 // either waiting for a thread to run on, waiting for their time to run, 442 // or blocked on a previous task in their sequence. We have to iterate over 443 // the tasks by time-to-run order, so we use the set instead of the 444 // traditional priority_queue. 445 typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet; 446 PendingTaskSet pending_tasks_; 447 448 // The next sequence number for a new sequenced task. 449 int64 next_sequence_task_number_; 450 451 // Number of tasks in the pending_tasks_ list that are marked as blocking 452 // shutdown. 453 size_t blocking_shutdown_pending_task_count_; 454 455 // Lists all sequence tokens currently executing. 456 std::set<int> current_sequences_; 457 458 // An ID for each posted task to distinguish the task from others in traces. 459 int trace_id_; 460 461 // Set when Shutdown is called and no further tasks should be 462 // allowed, though we may still be running existing tasks. 463 bool shutdown_called_; 464 465 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() 466 // has been called. 467 int max_blocking_tasks_after_shutdown_; 468 469 // State used to cleanup for testing, all guarded by lock_. 470 CleanupState cleanup_state_; 471 size_t cleanup_idlers_; 472 ConditionVariable cleanup_cv_; 473 474 TestingObserver* const testing_observer_; 475 476 DISALLOW_COPY_AND_ASSIGN(Inner); 477 }; 478 479 // Worker definitions --------------------------------------------------------- 480 481 SequencedWorkerPool::Worker::Worker( 482 const scoped_refptr<SequencedWorkerPool>& worker_pool, 483 int thread_number, 484 const std::string& prefix) 485 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), 486 worker_pool_(worker_pool), 487 running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) { 488 Start(); 489 } 490 491 SequencedWorkerPool::Worker::~Worker() { 492 } 493 494 void SequencedWorkerPool::Worker::Run() { 495 #if defined(OS_WIN) 496 win::ScopedCOMInitializer com_initializer; 497 #endif 498 499 // Store a pointer to the running sequence in thread local storage for 500 // static function access. 501 g_lazy_tls_ptr.Get().Set(&running_sequence_); 502 503 // Just jump back to the Inner object to run the thread, since it has all the 504 // tracking information and queues. It might be more natural to implement 505 // using DelegateSimpleThread and have Inner implement the Delegate to avoid 506 // having these worker objects at all, but that method lacks the ability to 507 // send thread-specific information easily to the thread loop. 508 worker_pool_->inner_->ThreadLoop(this); 509 // Release our cyclic reference once we're done. 510 worker_pool_ = NULL; 511 } 512 513 // Inner definitions --------------------------------------------------------- 514 515 SequencedWorkerPool::Inner::Inner( 516 SequencedWorkerPool* worker_pool, 517 size_t max_threads, 518 const std::string& thread_name_prefix, 519 TestingObserver* observer) 520 : worker_pool_(worker_pool), 521 lock_(), 522 has_work_cv_(&lock_), 523 can_shutdown_cv_(&lock_), 524 max_threads_(max_threads), 525 thread_name_prefix_(thread_name_prefix), 526 thread_being_created_(false), 527 waiting_thread_count_(0), 528 blocking_shutdown_thread_count_(0), 529 next_sequence_task_number_(0), 530 blocking_shutdown_pending_task_count_(0), 531 trace_id_(0), 532 shutdown_called_(false), 533 max_blocking_tasks_after_shutdown_(0), 534 cleanup_state_(CLEANUP_DONE), 535 cleanup_idlers_(0), 536 cleanup_cv_(&lock_), 537 testing_observer_(observer) {} 538 539 SequencedWorkerPool::Inner::~Inner() { 540 // You must call Shutdown() before destroying the pool. 541 DCHECK(shutdown_called_); 542 543 // Need to explicitly join with the threads before they're destroyed or else 544 // they will be running when our object is half torn down. 545 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 546 it->second->Join(); 547 threads_.clear(); 548 549 if (testing_observer_) 550 testing_observer_->OnDestruct(); 551 } 552 553 SequencedWorkerPool::SequenceToken 554 SequencedWorkerPool::Inner::GetSequenceToken() { 555 // Need to add one because StaticAtomicSequenceNumber starts at zero, which 556 // is used as a sentinel value in SequenceTokens. 557 return SequenceToken(g_last_sequence_number_.GetNext() + 1); 558 } 559 560 SequencedWorkerPool::SequenceToken 561 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 562 AutoLock lock(lock_); 563 return SequenceToken(LockedGetNamedTokenID(name)); 564 } 565 566 bool SequencedWorkerPool::Inner::PostTask( 567 const std::string* optional_token_name, 568 SequenceToken sequence_token, 569 WorkerShutdown shutdown_behavior, 570 const tracked_objects::Location& from_here, 571 const Closure& task, 572 TimeDelta delay) { 573 DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN); 574 SequencedTask sequenced(from_here); 575 sequenced.sequence_token_id = sequence_token.id_; 576 sequenced.shutdown_behavior = shutdown_behavior; 577 sequenced.posted_from = from_here; 578 sequenced.task = 579 shutdown_behavior == BLOCK_SHUTDOWN ? 580 base::MakeCriticalClosure(task) : task; 581 sequenced.time_to_run = TimeTicks::Now() + delay; 582 583 int create_thread_id = 0; 584 { 585 AutoLock lock(lock_); 586 if (shutdown_called_) { 587 if (shutdown_behavior != BLOCK_SHUTDOWN || 588 LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) { 589 return false; 590 } 591 if (max_blocking_tasks_after_shutdown_ <= 0) { 592 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; 593 return false; 594 } 595 max_blocking_tasks_after_shutdown_ -= 1; 596 } 597 598 // The trace_id is used for identifying the task in about:tracing. 599 sequenced.trace_id = trace_id_++; 600 601 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), 602 "SequencedWorkerPool::PostTask", 603 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this)))); 604 605 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); 606 607 // Now that we have the lock, apply the named token rules. 608 if (optional_token_name) 609 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); 610 611 pending_tasks_.insert(sequenced); 612 if (shutdown_behavior == BLOCK_SHUTDOWN) 613 blocking_shutdown_pending_task_count_++; 614 615 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 616 } 617 618 // Actually start the additional thread or signal an existing one now that 619 // we're outside the lock. 620 if (create_thread_id) 621 FinishStartingAdditionalThread(create_thread_id); 622 else 623 SignalHasWork(); 624 625 return true; 626 } 627 628 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 629 AutoLock lock(lock_); 630 return ContainsKey(threads_, PlatformThread::CurrentId()); 631 } 632 633 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 634 SequenceToken sequence_token) const { 635 AutoLock lock(lock_); 636 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 637 if (found == threads_.end()) 638 return false; 639 return sequence_token.Equals(found->second->running_sequence()); 640 } 641 642 // See https://code.google.com/p/chromium/issues/detail?id=168415 643 void SequencedWorkerPool::Inner::CleanupForTesting() { 644 DCHECK(!RunsTasksOnCurrentThread()); 645 base::ThreadRestrictions::ScopedAllowWait allow_wait; 646 AutoLock lock(lock_); 647 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 648 if (shutdown_called_) 649 return; 650 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) 651 return; 652 cleanup_state_ = CLEANUP_REQUESTED; 653 cleanup_idlers_ = 0; 654 has_work_cv_.Signal(); 655 while (cleanup_state_ != CLEANUP_DONE) 656 cleanup_cv_.Wait(); 657 } 658 659 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { 660 SignalHasWork(); 661 } 662 663 void SequencedWorkerPool::Inner::Shutdown( 664 int max_new_blocking_tasks_after_shutdown) { 665 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); 666 { 667 AutoLock lock(lock_); 668 // Cleanup and Shutdown should not be called concurrently. 669 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 670 if (shutdown_called_) 671 return; 672 shutdown_called_ = true; 673 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; 674 675 // Tickle the threads. This will wake up a waiting one so it will know that 676 // it can exit, which in turn will wake up any other waiting ones. 677 SignalHasWork(); 678 679 // There are no pending or running tasks blocking shutdown, we're done. 680 if (CanShutdown()) 681 return; 682 } 683 684 // If we're here, then something is blocking shutdown. So wait for 685 // CanShutdown() to go to true. 686 687 if (testing_observer_) 688 testing_observer_->WillWaitForShutdown(); 689 690 #if !defined(OS_NACL) 691 TimeTicks shutdown_wait_begin = TimeTicks::Now(); 692 #endif 693 694 { 695 base::ThreadRestrictions::ScopedAllowWait allow_wait; 696 AutoLock lock(lock_); 697 while (!CanShutdown()) 698 can_shutdown_cv_.Wait(); 699 } 700 #if !defined(OS_NACL) 701 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", 702 TimeTicks::Now() - shutdown_wait_begin); 703 #endif 704 } 705 706 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { 707 AutoLock lock(lock_); 708 return shutdown_called_; 709 } 710 711 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 712 { 713 AutoLock lock(lock_); 714 DCHECK(thread_being_created_); 715 thread_being_created_ = false; 716 std::pair<ThreadMap::iterator, bool> result = 717 threads_.insert( 718 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); 719 DCHECK(result.second); 720 721 while (true) { 722 #if defined(OS_MACOSX) 723 base::mac::ScopedNSAutoreleasePool autorelease_pool; 724 #endif 725 726 HandleCleanup(); 727 728 // See GetWork for what delete_these_outside_lock is doing. 729 SequencedTask task; 730 TimeDelta wait_time; 731 std::vector<Closure> delete_these_outside_lock; 732 GetWorkStatus status = 733 GetWork(&task, &wait_time, &delete_these_outside_lock); 734 if (status == GET_WORK_FOUND) { 735 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), 736 "SequencedWorkerPool::PostTask", 737 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this)))); 738 TRACE_EVENT2("toplevel", "SequencedWorkerPool::ThreadLoop", 739 "src_file", task.posted_from.file_name(), 740 "src_func", task.posted_from.function_name()); 741 int new_thread_id = WillRunWorkerTask(task); 742 { 743 AutoUnlock unlock(lock_); 744 // There may be more work available, so wake up another 745 // worker thread. (Technically not required, since we 746 // already get a signal for each new task, but it doesn't 747 // hurt.) 748 SignalHasWork(); 749 delete_these_outside_lock.clear(); 750 751 // Complete thread creation outside the lock if necessary. 752 if (new_thread_id) 753 FinishStartingAdditionalThread(new_thread_id); 754 755 this_worker->set_running_task_info( 756 SequenceToken(task.sequence_token_id), task.shutdown_behavior); 757 758 tracked_objects::TrackedTime start_time = 759 tracked_objects::ThreadData::NowForStartOfRun(task.birth_tally); 760 761 task.task.Run(); 762 763 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task, 764 start_time, tracked_objects::ThreadData::NowForEndOfRun()); 765 766 // Make sure our task is erased outside the lock for the 767 // same reason we do this with delete_these_oustide_lock. 768 // Also, do it before calling set_running_task_info() so 769 // that sequence-checking from within the task's destructor 770 // still works. 771 task.task = Closure(); 772 773 this_worker->set_running_task_info( 774 SequenceToken(), CONTINUE_ON_SHUTDOWN); 775 } 776 DidRunWorkerTask(task); // Must be done inside the lock. 777 } else if (cleanup_state_ == CLEANUP_RUNNING) { 778 switch (status) { 779 case GET_WORK_WAIT: { 780 AutoUnlock unlock(lock_); 781 delete_these_outside_lock.clear(); 782 } 783 break; 784 case GET_WORK_NOT_FOUND: 785 CHECK(delete_these_outside_lock.empty()); 786 cleanup_state_ = CLEANUP_FINISHING; 787 cleanup_cv_.Broadcast(); 788 break; 789 default: 790 NOTREACHED(); 791 } 792 } else { 793 // When we're terminating and there's no more work, we can 794 // shut down, other workers can complete any pending or new tasks. 795 // We can get additional tasks posted after shutdown_called_ is set 796 // but only worker threads are allowed to post tasks at that time, and 797 // the workers responsible for posting those tasks will be available 798 // to run them. Also, there may be some tasks stuck behind running 799 // ones with the same sequence token, but additional threads won't 800 // help this case. 801 if (shutdown_called_ && 802 blocking_shutdown_pending_task_count_ == 0) 803 break; 804 waiting_thread_count_++; 805 806 switch (status) { 807 case GET_WORK_NOT_FOUND: 808 has_work_cv_.Wait(); 809 break; 810 case GET_WORK_WAIT: 811 has_work_cv_.TimedWait(wait_time); 812 break; 813 default: 814 NOTREACHED(); 815 } 816 waiting_thread_count_--; 817 } 818 } 819 } // Release lock_. 820 821 // We noticed we should exit. Wake up the next worker so it knows it should 822 // exit as well (because the Shutdown() code only signals once). 823 SignalHasWork(); 824 825 // Possibly unblock shutdown. 826 can_shutdown_cv_.Signal(); 827 } 828 829 void SequencedWorkerPool::Inner::HandleCleanup() { 830 lock_.AssertAcquired(); 831 if (cleanup_state_ == CLEANUP_DONE) 832 return; 833 if (cleanup_state_ == CLEANUP_REQUESTED) { 834 // We win, we get to do the cleanup as soon as the others wise up and idle. 835 cleanup_state_ = CLEANUP_STARTING; 836 while (thread_being_created_ || 837 cleanup_idlers_ != threads_.size() - 1) { 838 has_work_cv_.Signal(); 839 cleanup_cv_.Wait(); 840 } 841 cleanup_state_ = CLEANUP_RUNNING; 842 return; 843 } 844 if (cleanup_state_ == CLEANUP_STARTING) { 845 // Another worker thread is cleaning up, we idle here until thats done. 846 ++cleanup_idlers_; 847 cleanup_cv_.Broadcast(); 848 while (cleanup_state_ != CLEANUP_FINISHING) { 849 cleanup_cv_.Wait(); 850 } 851 --cleanup_idlers_; 852 cleanup_cv_.Broadcast(); 853 return; 854 } 855 if (cleanup_state_ == CLEANUP_FINISHING) { 856 // We wait for all idlers to wake up prior to being DONE. 857 while (cleanup_idlers_ != 0) { 858 cleanup_cv_.Broadcast(); 859 cleanup_cv_.Wait(); 860 } 861 if (cleanup_state_ == CLEANUP_FINISHING) { 862 cleanup_state_ = CLEANUP_DONE; 863 cleanup_cv_.Signal(); 864 } 865 return; 866 } 867 } 868 869 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( 870 const std::string& name) { 871 lock_.AssertAcquired(); 872 DCHECK(!name.empty()); 873 874 std::map<std::string, int>::const_iterator found = 875 named_sequence_tokens_.find(name); 876 if (found != named_sequence_tokens_.end()) 877 return found->second; // Got an existing one. 878 879 // Create a new one for this name. 880 SequenceToken result = GetSequenceToken(); 881 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); 882 return result.id_; 883 } 884 885 int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { 886 lock_.AssertAcquired(); 887 // We assume that we never create enough tasks to wrap around. 888 return next_sequence_task_number_++; 889 } 890 891 SequencedWorkerPool::WorkerShutdown 892 SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const { 893 lock_.AssertAcquired(); 894 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 895 if (found == threads_.end()) 896 return CONTINUE_ON_SHUTDOWN; 897 return found->second->running_shutdown_behavior(); 898 } 899 900 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 901 SequencedTask* task, 902 TimeDelta* wait_time, 903 std::vector<Closure>* delete_these_outside_lock) { 904 lock_.AssertAcquired(); 905 906 #if !defined(OS_NACL) 907 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", 908 static_cast<int>(pending_tasks_.size())); 909 #endif 910 911 // Find the next task with a sequence token that's not currently in use. 912 // If the token is in use, that means another thread is running something 913 // in that sequence, and we can't run it without going out-of-order. 914 // 915 // This algorithm is simple and fair, but inefficient in some cases. For 916 // example, say somebody schedules 1000 slow tasks with the same sequence 917 // number. We'll have to go through all those tasks each time we feel like 918 // there might be work to schedule. If this proves to be a problem, we 919 // should make this more efficient. 920 // 921 // One possible enhancement would be to keep a map from sequence ID to a 922 // list of pending but currently blocked SequencedTasks for that ID. 923 // When a worker finishes a task of one sequence token, it can pick up the 924 // next one from that token right away. 925 // 926 // This may lead to starvation if there are sufficient numbers of sequences 927 // in use. To alleviate this, we could add an incrementing priority counter 928 // to each SequencedTask. Then maintain a priority_queue of all runnable 929 // tasks, sorted by priority counter. When a sequenced task is completed 930 // we would pop the head element off of that tasks pending list and add it 931 // to the priority queue. Then we would run the first item in the priority 932 // queue. 933 934 GetWorkStatus status = GET_WORK_NOT_FOUND; 935 int unrunnable_tasks = 0; 936 PendingTaskSet::iterator i = pending_tasks_.begin(); 937 // We assume that the loop below doesn't take too long and so we can just do 938 // a single call to TimeTicks::Now(). 939 const TimeTicks current_time = TimeTicks::Now(); 940 while (i != pending_tasks_.end()) { 941 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { 942 unrunnable_tasks++; 943 ++i; 944 continue; 945 } 946 947 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { 948 // We're shutting down and the task we just found isn't blocking 949 // shutdown. Delete it and get more work. 950 // 951 // Note that we do not want to delete unrunnable tasks. Deleting a task 952 // can have side effects (like freeing some objects) and deleting a 953 // task that's supposed to run after one that's currently running could 954 // cause an obscure crash. 955 // 956 // We really want to delete these tasks outside the lock in case the 957 // closures are holding refs to objects that want to post work from 958 // their destructorss (which would deadlock). The closures are 959 // internally refcounted, so we just need to keep a copy of them alive 960 // until the lock is exited. The calling code can just clear() the 961 // vector they passed to us once the lock is exited to make this 962 // happen. 963 delete_these_outside_lock->push_back(i->task); 964 pending_tasks_.erase(i++); 965 continue; 966 } 967 968 if (i->time_to_run > current_time) { 969 // The time to run has not come yet. 970 *wait_time = i->time_to_run - current_time; 971 status = GET_WORK_WAIT; 972 if (cleanup_state_ == CLEANUP_RUNNING) { 973 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. 974 delete_these_outside_lock->push_back(i->task); 975 pending_tasks_.erase(i); 976 } 977 break; 978 } 979 980 // Found a runnable task. 981 *task = *i; 982 pending_tasks_.erase(i); 983 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { 984 blocking_shutdown_pending_task_count_--; 985 } 986 987 status = GET_WORK_FOUND; 988 break; 989 } 990 991 // Track the number of tasks we had to skip over to see if we should be 992 // making this more efficient. If this number ever becomes large or is 993 // frequently "some", we should consider the optimization above. 994 #if !defined(OS_NACL) 995 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount", 996 unrunnable_tasks); 997 #endif 998 return status; 999 } 1000 1001 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { 1002 lock_.AssertAcquired(); 1003 1004 // Mark the task's sequence number as in use. 1005 if (task.sequence_token_id) 1006 current_sequences_.insert(task.sequence_token_id); 1007 1008 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN 1009 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread 1010 // completes. 1011 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) 1012 blocking_shutdown_thread_count_++; 1013 1014 // We just picked up a task. Since StartAdditionalThreadIfHelpful only 1015 // creates a new thread if there is no free one, there is a race when posting 1016 // tasks that many tasks could have been posted before a thread started 1017 // running them, so only one thread would have been created. So we also check 1018 // whether we should create more threads after removing our task from the 1019 // queue, which also has the nice side effect of creating the workers from 1020 // background threads rather than the main thread of the app. 1021 // 1022 // If another thread wasn't created, we want to wake up an existing thread 1023 // if there is one waiting to pick up the next task. 1024 // 1025 // Note that we really need to do this *before* running the task, not 1026 // after. Otherwise, if more than one task is posted, the creation of the 1027 // second thread (since we only create one at a time) will be blocked by 1028 // the execution of the first task, which could be arbitrarily long. 1029 return PrepareToStartAdditionalThreadIfHelpful(); 1030 } 1031 1032 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { 1033 lock_.AssertAcquired(); 1034 1035 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { 1036 DCHECK_GT(blocking_shutdown_thread_count_, 0u); 1037 blocking_shutdown_thread_count_--; 1038 } 1039 1040 if (task.sequence_token_id) 1041 current_sequences_.erase(task.sequence_token_id); 1042 } 1043 1044 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( 1045 int sequence_token_id) const { 1046 lock_.AssertAcquired(); 1047 return !sequence_token_id || 1048 current_sequences_.find(sequence_token_id) == 1049 current_sequences_.end(); 1050 } 1051 1052 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { 1053 lock_.AssertAcquired(); 1054 // How thread creation works: 1055 // 1056 // We'de like to avoid creating threads with the lock held. However, we 1057 // need to be sure that we have an accurate accounting of the threads for 1058 // proper Joining and deltion on shutdown. 1059 // 1060 // We need to figure out if we need another thread with the lock held, which 1061 // is what this function does. It then marks us as in the process of creating 1062 // a thread. When we do shutdown, we wait until the thread_being_created_ 1063 // flag is cleared, which ensures that the new thread is properly added to 1064 // all the data structures and we can't leak it. Once shutdown starts, we'll 1065 // refuse to create more threads or they would be leaked. 1066 // 1067 // Note that this creates a mostly benign race condition on shutdown that 1068 // will cause fewer workers to be created than one would expect. It isn't 1069 // much of an issue in real life, but affects some tests. Since we only spawn 1070 // one worker at a time, the following sequence of events can happen: 1071 // 1072 // 1. Main thread posts a bunch of unrelated tasks that would normally be 1073 // run on separate threads. 1074 // 2. The first task post causes us to start a worker. Other tasks do not 1075 // cause a worker to start since one is pending. 1076 // 3. Main thread initiates shutdown. 1077 // 4. No more threads are created since the shutdown_called_ flag is set. 1078 // 1079 // The result is that one may expect that max_threads_ workers to be created 1080 // given the workload, but in reality fewer may be created because the 1081 // sequence of thread creation on the background threads is racing with the 1082 // shutdown call. 1083 if (!shutdown_called_ && 1084 !thread_being_created_ && 1085 cleanup_state_ == CLEANUP_DONE && 1086 threads_.size() < max_threads_ && 1087 waiting_thread_count_ == 0) { 1088 // We could use an additional thread if there's work to be done. 1089 for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); 1090 i != pending_tasks_.end(); ++i) { 1091 if (IsSequenceTokenRunnable(i->sequence_token_id)) { 1092 // Found a runnable task, mark the thread as being started. 1093 thread_being_created_ = true; 1094 return static_cast<int>(threads_.size() + 1); 1095 } 1096 } 1097 } 1098 return 0; 1099 } 1100 1101 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( 1102 int thread_number) { 1103 // Called outside of the lock. 1104 DCHECK(thread_number > 0); 1105 1106 // The worker is assigned to the list when the thread actually starts, which 1107 // will manage the memory of the pointer. 1108 new Worker(worker_pool_, thread_number, thread_name_prefix_); 1109 } 1110 1111 void SequencedWorkerPool::Inner::SignalHasWork() { 1112 has_work_cv_.Signal(); 1113 if (testing_observer_) { 1114 testing_observer_->OnHasWork(); 1115 } 1116 } 1117 1118 bool SequencedWorkerPool::Inner::CanShutdown() const { 1119 lock_.AssertAcquired(); 1120 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 1121 return !thread_being_created_ && 1122 blocking_shutdown_thread_count_ == 0 && 1123 blocking_shutdown_pending_task_count_ == 0; 1124 } 1125 1126 base::StaticAtomicSequenceNumber 1127 SequencedWorkerPool::Inner::g_last_sequence_number_; 1128 1129 // SequencedWorkerPool -------------------------------------------------------- 1130 1131 // static 1132 SequencedWorkerPool::SequenceToken 1133 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { 1134 // Don't construct lazy instance on check. 1135 if (g_lazy_tls_ptr == NULL) 1136 return SequenceToken(); 1137 1138 SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get(); 1139 if (!token) 1140 return SequenceToken(); 1141 return *token; 1142 } 1143 1144 SequencedWorkerPool::SequencedWorkerPool( 1145 size_t max_threads, 1146 const std::string& thread_name_prefix) 1147 : constructor_message_loop_(MessageLoopProxy::current()), 1148 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { 1149 } 1150 1151 SequencedWorkerPool::SequencedWorkerPool( 1152 size_t max_threads, 1153 const std::string& thread_name_prefix, 1154 TestingObserver* observer) 1155 : constructor_message_loop_(MessageLoopProxy::current()), 1156 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { 1157 } 1158 1159 SequencedWorkerPool::~SequencedWorkerPool() {} 1160 1161 void SequencedWorkerPool::OnDestruct() const { 1162 DCHECK(constructor_message_loop_.get()); 1163 // Avoid deleting ourselves on a worker thread (which would 1164 // deadlock). 1165 if (RunsTasksOnCurrentThread()) { 1166 constructor_message_loop_->DeleteSoon(FROM_HERE, this); 1167 } else { 1168 delete this; 1169 } 1170 } 1171 1172 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 1173 return inner_->GetSequenceToken(); 1174 } 1175 1176 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 1177 const std::string& name) { 1178 return inner_->GetNamedSequenceToken(name); 1179 } 1180 1181 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( 1182 SequenceToken token) { 1183 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); 1184 } 1185 1186 scoped_refptr<SequencedTaskRunner> 1187 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior( 1188 SequenceToken token, WorkerShutdown shutdown_behavior) { 1189 return new SequencedWorkerPoolSequencedTaskRunner( 1190 this, token, shutdown_behavior); 1191 } 1192 1193 scoped_refptr<TaskRunner> 1194 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior( 1195 WorkerShutdown shutdown_behavior) { 1196 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior); 1197 } 1198 1199 bool SequencedWorkerPool::PostWorkerTask( 1200 const tracked_objects::Location& from_here, 1201 const Closure& task) { 1202 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, 1203 from_here, task, TimeDelta()); 1204 } 1205 1206 bool SequencedWorkerPool::PostDelayedWorkerTask( 1207 const tracked_objects::Location& from_here, 1208 const Closure& task, 1209 TimeDelta delay) { 1210 WorkerShutdown shutdown_behavior = 1211 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1212 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, 1213 from_here, task, delay); 1214 } 1215 1216 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( 1217 const tracked_objects::Location& from_here, 1218 const Closure& task, 1219 WorkerShutdown shutdown_behavior) { 1220 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, 1221 from_here, task, TimeDelta()); 1222 } 1223 1224 bool SequencedWorkerPool::PostSequencedWorkerTask( 1225 SequenceToken sequence_token, 1226 const tracked_objects::Location& from_here, 1227 const Closure& task) { 1228 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN, 1229 from_here, task, TimeDelta()); 1230 } 1231 1232 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask( 1233 SequenceToken sequence_token, 1234 const tracked_objects::Location& from_here, 1235 const Closure& task, 1236 TimeDelta delay) { 1237 WorkerShutdown shutdown_behavior = 1238 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1239 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, 1240 from_here, task, delay); 1241 } 1242 1243 bool SequencedWorkerPool::PostNamedSequencedWorkerTask( 1244 const std::string& token_name, 1245 const tracked_objects::Location& from_here, 1246 const Closure& task) { 1247 DCHECK(!token_name.empty()); 1248 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN, 1249 from_here, task, TimeDelta()); 1250 } 1251 1252 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( 1253 SequenceToken sequence_token, 1254 const tracked_objects::Location& from_here, 1255 const Closure& task, 1256 WorkerShutdown shutdown_behavior) { 1257 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, 1258 from_here, task, TimeDelta()); 1259 } 1260 1261 bool SequencedWorkerPool::PostDelayedTask( 1262 const tracked_objects::Location& from_here, 1263 const Closure& task, 1264 TimeDelta delay) { 1265 return PostDelayedWorkerTask(from_here, task, delay); 1266 } 1267 1268 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1269 return inner_->RunsTasksOnCurrentThread(); 1270 } 1271 1272 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1273 SequenceToken sequence_token) const { 1274 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1275 } 1276 1277 void SequencedWorkerPool::FlushForTesting() { 1278 inner_->CleanupForTesting(); 1279 } 1280 1281 void SequencedWorkerPool::SignalHasWorkForTesting() { 1282 inner_->SignalHasWorkForTesting(); 1283 } 1284 1285 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1286 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); 1287 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1288 } 1289 1290 bool SequencedWorkerPool::IsShutdownInProgress() { 1291 return inner_->IsShutdownInProgress(); 1292 } 1293 1294 } // namespace base 1295