1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/threading/sequenced_worker_pool.h" 6 7 #include <stdint.h> 8 9 #include <list> 10 #include <map> 11 #include <set> 12 #include <utility> 13 #include <vector> 14 15 #include "base/atomic_sequence_num.h" 16 #include "base/callback.h" 17 #include "base/compiler_specific.h" 18 #include "base/critical_closure.h" 19 #include "base/lazy_instance.h" 20 #include "base/logging.h" 21 #include "base/macros.h" 22 #include "base/memory/linked_ptr.h" 23 #include "base/stl_util.h" 24 #include "base/strings/stringprintf.h" 25 #include "base/synchronization/condition_variable.h" 26 #include "base/synchronization/lock.h" 27 #include "base/thread_task_runner_handle.h" 28 #include "base/threading/platform_thread.h" 29 #include "base/threading/simple_thread.h" 30 #include "base/threading/thread_local.h" 31 #include "base/threading/thread_restrictions.h" 32 #include "base/time/time.h" 33 #include "base/trace_event/trace_event.h" 34 #include "base/tracked_objects.h" 35 #include "build/build_config.h" 36 37 #if defined(OS_MACOSX) 38 #include "base/mac/scoped_nsautorelease_pool.h" 39 #elif defined(OS_WIN) 40 #include "base/win/scoped_com_initializer.h" 41 #endif 42 43 #if !defined(OS_NACL) 44 #include "base/metrics/histogram.h" 45 #endif 46 47 namespace base { 48 49 namespace { 50 51 struct SequencedTask : public TrackingInfo { 52 SequencedTask() 53 : sequence_token_id(0), 54 trace_id(0), 55 sequence_task_number(0), 56 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 57 58 explicit SequencedTask(const tracked_objects::Location& from_here) 59 : base::TrackingInfo(from_here, TimeTicks()), 60 sequence_token_id(0), 61 trace_id(0), 62 sequence_task_number(0), 63 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 64 65 ~SequencedTask() {} 66 67 int sequence_token_id; 68 int trace_id; 69 int64_t sequence_task_number; 70 SequencedWorkerPool::WorkerShutdown shutdown_behavior; 71 tracked_objects::Location posted_from; 72 Closure task; 73 74 // Non-delayed tasks and delayed tasks are managed together by time-to-run 75 // order. We calculate the time by adding the posted time and the given delay. 76 TimeTicks time_to_run; 77 }; 78 79 struct SequencedTaskLessThan { 80 public: 81 bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const { 82 if (lhs.time_to_run < rhs.time_to_run) 83 return true; 84 85 if (lhs.time_to_run > rhs.time_to_run) 86 return false; 87 88 // If the time happen to match, then we use the sequence number to decide. 89 return lhs.sequence_task_number < rhs.sequence_task_number; 90 } 91 }; 92 93 // SequencedWorkerPoolTaskRunner --------------------------------------------- 94 // A TaskRunner which posts tasks to a SequencedWorkerPool with a 95 // fixed ShutdownBehavior. 96 // 97 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 98 class SequencedWorkerPoolTaskRunner : public TaskRunner { 99 public: 100 SequencedWorkerPoolTaskRunner( 101 const scoped_refptr<SequencedWorkerPool>& pool, 102 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 103 104 // TaskRunner implementation 105 bool PostDelayedTask(const tracked_objects::Location& from_here, 106 const Closure& task, 107 TimeDelta delay) override; 108 bool RunsTasksOnCurrentThread() const override; 109 110 private: 111 ~SequencedWorkerPoolTaskRunner() override; 112 113 const scoped_refptr<SequencedWorkerPool> pool_; 114 115 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 116 117 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner); 118 }; 119 120 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner( 121 const scoped_refptr<SequencedWorkerPool>& pool, 122 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 123 : pool_(pool), 124 shutdown_behavior_(shutdown_behavior) { 125 } 126 127 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() { 128 } 129 130 bool SequencedWorkerPoolTaskRunner::PostDelayedTask( 131 const tracked_objects::Location& from_here, 132 const Closure& task, 133 TimeDelta delay) { 134 if (delay == TimeDelta()) { 135 return pool_->PostWorkerTaskWithShutdownBehavior( 136 from_here, task, shutdown_behavior_); 137 } 138 return pool_->PostDelayedWorkerTask(from_here, task, delay); 139 } 140 141 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const { 142 return pool_->RunsTasksOnCurrentThread(); 143 } 144 145 // SequencedWorkerPoolSequencedTaskRunner ------------------------------------ 146 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a 147 // fixed sequence token. 148 // 149 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 150 class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner { 151 public: 152 SequencedWorkerPoolSequencedTaskRunner( 153 const scoped_refptr<SequencedWorkerPool>& pool, 154 SequencedWorkerPool::SequenceToken token, 155 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 156 157 // TaskRunner implementation 158 bool PostDelayedTask(const tracked_objects::Location& from_here, 159 const Closure& task, 160 TimeDelta delay) override; 161 bool RunsTasksOnCurrentThread() const override; 162 163 // SequencedTaskRunner implementation 164 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, 165 const Closure& task, 166 TimeDelta delay) override; 167 168 private: 169 ~SequencedWorkerPoolSequencedTaskRunner() override; 170 171 const scoped_refptr<SequencedWorkerPool> pool_; 172 173 const SequencedWorkerPool::SequenceToken token_; 174 175 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 176 177 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner); 178 }; 179 180 SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner( 181 const scoped_refptr<SequencedWorkerPool>& pool, 182 SequencedWorkerPool::SequenceToken token, 183 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 184 : pool_(pool), 185 token_(token), 186 shutdown_behavior_(shutdown_behavior) { 187 } 188 189 SequencedWorkerPoolSequencedTaskRunner:: 190 ~SequencedWorkerPoolSequencedTaskRunner() { 191 } 192 193 bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask( 194 const tracked_objects::Location& from_here, 195 const Closure& task, 196 TimeDelta delay) { 197 if (delay == TimeDelta()) { 198 return pool_->PostSequencedWorkerTaskWithShutdownBehavior( 199 token_, from_here, task, shutdown_behavior_); 200 } 201 return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay); 202 } 203 204 bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const { 205 return pool_->IsRunningSequenceOnCurrentThread(token_); 206 } 207 208 bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask( 209 const tracked_objects::Location& from_here, 210 const Closure& task, 211 TimeDelta delay) { 212 // There's no way to run nested tasks, so simply forward to 213 // PostDelayedTask. 214 return PostDelayedTask(from_here, task, delay); 215 } 216 217 // Create a process-wide unique ID to represent this task in trace events. This 218 // will be mangled with a Process ID hash to reduce the likelyhood of colliding 219 // with MessageLoop pointers on other processes. 220 uint64_t GetTaskTraceID(const SequencedTask& task, void* pool) { 221 return (static_cast<uint64_t>(task.trace_id) << 32) | 222 static_cast<uint64_t>(reinterpret_cast<intptr_t>(pool)); 223 } 224 225 } // namespace 226 227 // Worker --------------------------------------------------------------------- 228 229 class SequencedWorkerPool::Worker : public SimpleThread { 230 public: 231 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it 232 // around as long as we are running. 233 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, 234 int thread_number, 235 const std::string& thread_name_prefix); 236 ~Worker() override; 237 238 // SimpleThread implementation. This actually runs the background thread. 239 void Run() override; 240 241 // Gets the worker for the current thread out of thread-local storage. 242 static Worker* GetForCurrentThread(); 243 244 // Indicates that a task is about to be run. The parameters provide 245 // additional metainformation about the task being run. 246 void set_running_task_info(SequenceToken token, 247 WorkerShutdown shutdown_behavior) { 248 is_processing_task_ = true; 249 task_sequence_token_ = token; 250 task_shutdown_behavior_ = shutdown_behavior; 251 } 252 253 // Indicates that the task has finished running. 254 void reset_running_task_info() { is_processing_task_ = false; } 255 256 // Whether the worker is processing a task. 257 bool is_processing_task() { return is_processing_task_; } 258 259 SequenceToken task_sequence_token() const { 260 DCHECK(is_processing_task_); 261 return task_sequence_token_; 262 } 263 264 WorkerShutdown task_shutdown_behavior() const { 265 DCHECK(is_processing_task_); 266 return task_shutdown_behavior_; 267 } 268 269 scoped_refptr<SequencedWorkerPool> worker_pool() const { 270 return worker_pool_; 271 } 272 273 private: 274 static LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky 275 lazy_tls_ptr_; 276 277 scoped_refptr<SequencedWorkerPool> worker_pool_; 278 // The sequence token of the task being processed. Only valid when 279 // is_processing_task_ is true. 280 SequenceToken task_sequence_token_; 281 // The shutdown behavior of the task being processed. Only valid when 282 // is_processing_task_ is true. 283 WorkerShutdown task_shutdown_behavior_; 284 // Whether the Worker is processing a task. 285 bool is_processing_task_; 286 287 DISALLOW_COPY_AND_ASSIGN(Worker); 288 }; 289 290 // Inner ---------------------------------------------------------------------- 291 292 class SequencedWorkerPool::Inner { 293 public: 294 // Take a raw pointer to |worker| to avoid cycles (since we're owned 295 // by it). 296 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 297 const std::string& thread_name_prefix, 298 TestingObserver* observer); 299 300 ~Inner(); 301 302 static SequenceToken GetSequenceToken(); 303 304 SequenceToken GetNamedSequenceToken(const std::string& name); 305 306 // This function accepts a name and an ID. If the name is null, the 307 // token ID is used. This allows us to implement the optional name lookup 308 // from a single function without having to enter the lock a separate time. 309 bool PostTask(const std::string* optional_token_name, 310 SequenceToken sequence_token, 311 WorkerShutdown shutdown_behavior, 312 const tracked_objects::Location& from_here, 313 const Closure& task, 314 TimeDelta delay); 315 316 bool RunsTasksOnCurrentThread() const; 317 318 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 319 320 bool IsRunningSequence(SequenceToken sequence_token) const; 321 322 void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token, 323 WorkerShutdown shutdown_behavior); 324 325 void CleanupForTesting(); 326 327 void SignalHasWorkForTesting(); 328 329 int GetWorkSignalCountForTesting() const; 330 331 void Shutdown(int max_blocking_tasks_after_shutdown); 332 333 bool IsShutdownInProgress(); 334 335 // Runs the worker loop on the background thread. 336 void ThreadLoop(Worker* this_worker); 337 338 private: 339 enum GetWorkStatus { 340 GET_WORK_FOUND, 341 GET_WORK_NOT_FOUND, 342 GET_WORK_WAIT, 343 }; 344 345 enum CleanupState { 346 CLEANUP_REQUESTED, 347 CLEANUP_STARTING, 348 CLEANUP_RUNNING, 349 CLEANUP_FINISHING, 350 CLEANUP_DONE, 351 }; 352 353 // Called from within the lock, this converts the given token name into a 354 // token ID, creating a new one if necessary. 355 int LockedGetNamedTokenID(const std::string& name); 356 357 // Called from within the lock, this returns the next sequence task number. 358 int64_t LockedGetNextSequenceTaskNumber(); 359 360 // Gets new task. There are 3 cases depending on the return value: 361 // 362 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should 363 // be run immediately. 364 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run, 365 // and |task| is not filled in. In this case, the caller should wait until 366 // a task is posted. 367 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run 368 // immediately, and |task| is not filled in. Likewise, |wait_time| is 369 // filled in the time to wait until the next task to run. In this case, the 370 // caller should wait the time. 371 // 372 // In any case, the calling code should clear the given 373 // delete_these_outside_lock vector the next time the lock is released. 374 // See the implementation for a more detailed description. 375 GetWorkStatus GetWork(SequencedTask* task, 376 TimeDelta* wait_time, 377 std::vector<Closure>* delete_these_outside_lock); 378 379 void HandleCleanup(); 380 381 // Peforms init and cleanup around running the given task. WillRun... 382 // returns the value from PrepareToStartAdditionalThreadIfNecessary. 383 // The calling code should call FinishStartingAdditionalThread once the 384 // lock is released if the return values is nonzero. 385 int WillRunWorkerTask(const SequencedTask& task); 386 void DidRunWorkerTask(const SequencedTask& task); 387 388 // Returns true if there are no threads currently running the given 389 // sequence token. 390 bool IsSequenceTokenRunnable(int sequence_token_id) const; 391 392 // Checks if all threads are busy and the addition of one more could run an 393 // additional task waiting in the queue. This must be called from within 394 // the lock. 395 // 396 // If another thread is helpful, this will mark the thread as being in the 397 // process of starting and returns the index of the new thread which will be 398 // 0 or more. The caller should then call FinishStartingAdditionalThread to 399 // complete initialization once the lock is released. 400 // 401 // If another thread is not necessary, returne 0; 402 // 403 // See the implementedion for more. 404 int PrepareToStartAdditionalThreadIfHelpful(); 405 406 // The second part of thread creation after 407 // PrepareToStartAdditionalThreadIfHelpful with the thread number it 408 // generated. This actually creates the thread and should be called outside 409 // the lock to avoid blocking important work starting a thread in the lock. 410 void FinishStartingAdditionalThread(int thread_number); 411 412 // Signal |has_work_| and increment |has_work_signal_count_|. 413 void SignalHasWork(); 414 415 // Checks whether there is work left that's blocking shutdown. Must be 416 // called inside the lock. 417 bool CanShutdown() const; 418 419 SequencedWorkerPool* const worker_pool_; 420 421 // The last sequence number used. Managed by GetSequenceToken, since this 422 // only does threadsafe increment operations, you do not need to hold the 423 // lock. This is class-static to make SequenceTokens issued by 424 // GetSequenceToken unique across SequencedWorkerPool instances. 425 static base::StaticAtomicSequenceNumber g_last_sequence_number_; 426 427 // This lock protects |everything in this class|. Do not read or modify 428 // anything without holding this lock. Do not block while holding this 429 // lock. 430 mutable Lock lock_; 431 432 // Condition variable that is waited on by worker threads until new 433 // tasks are posted or shutdown starts. 434 ConditionVariable has_work_cv_; 435 436 // Condition variable that is waited on by non-worker threads (in 437 // Shutdown()) until CanShutdown() goes to true. 438 ConditionVariable can_shutdown_cv_; 439 440 // The maximum number of worker threads we'll create. 441 const size_t max_threads_; 442 443 const std::string thread_name_prefix_; 444 445 // Associates all known sequence token names with their IDs. 446 std::map<std::string, int> named_sequence_tokens_; 447 448 // Owning pointers to all threads we've created so far, indexed by 449 // ID. Since we lazily create threads, this may be less than 450 // max_threads_ and will be initially empty. 451 typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap; 452 ThreadMap threads_; 453 454 // Set to true when we're in the process of creating another thread. 455 // See PrepareToStartAdditionalThreadIfHelpful for more. 456 bool thread_being_created_; 457 458 // Number of threads currently waiting for work. 459 size_t waiting_thread_count_; 460 461 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN 462 // or SKIP_ON_SHUTDOWN flag set. 463 size_t blocking_shutdown_thread_count_; 464 465 // A set of all pending tasks in time-to-run order. These are tasks that are 466 // either waiting for a thread to run on, waiting for their time to run, 467 // or blocked on a previous task in their sequence. We have to iterate over 468 // the tasks by time-to-run order, so we use the set instead of the 469 // traditional priority_queue. 470 typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet; 471 PendingTaskSet pending_tasks_; 472 473 // The next sequence number for a new sequenced task. 474 int64_t next_sequence_task_number_; 475 476 // Number of tasks in the pending_tasks_ list that are marked as blocking 477 // shutdown. 478 size_t blocking_shutdown_pending_task_count_; 479 480 // Lists all sequence tokens currently executing. 481 std::set<int> current_sequences_; 482 483 // An ID for each posted task to distinguish the task from others in traces. 484 int trace_id_; 485 486 // Set when Shutdown is called and no further tasks should be 487 // allowed, though we may still be running existing tasks. 488 bool shutdown_called_; 489 490 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() 491 // has been called. 492 int max_blocking_tasks_after_shutdown_; 493 494 // State used to cleanup for testing, all guarded by lock_. 495 CleanupState cleanup_state_; 496 size_t cleanup_idlers_; 497 ConditionVariable cleanup_cv_; 498 499 TestingObserver* const testing_observer_; 500 501 DISALLOW_COPY_AND_ASSIGN(Inner); 502 }; 503 504 // Worker definitions --------------------------------------------------------- 505 506 SequencedWorkerPool::Worker::Worker( 507 const scoped_refptr<SequencedWorkerPool>& worker_pool, 508 int thread_number, 509 const std::string& prefix) 510 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), 511 worker_pool_(worker_pool), 512 task_shutdown_behavior_(BLOCK_SHUTDOWN), 513 is_processing_task_(false) { 514 Start(); 515 } 516 517 SequencedWorkerPool::Worker::~Worker() { 518 } 519 520 void SequencedWorkerPool::Worker::Run() { 521 #if defined(OS_WIN) 522 win::ScopedCOMInitializer com_initializer; 523 #endif 524 525 // Store a pointer to this worker in thread local storage for static function 526 // access. 527 DCHECK(!lazy_tls_ptr_.Get().Get()); 528 lazy_tls_ptr_.Get().Set(this); 529 530 // Just jump back to the Inner object to run the thread, since it has all the 531 // tracking information and queues. It might be more natural to implement 532 // using DelegateSimpleThread and have Inner implement the Delegate to avoid 533 // having these worker objects at all, but that method lacks the ability to 534 // send thread-specific information easily to the thread loop. 535 worker_pool_->inner_->ThreadLoop(this); 536 // Release our cyclic reference once we're done. 537 worker_pool_ = nullptr; 538 } 539 540 // static 541 SequencedWorkerPool::Worker* 542 SequencedWorkerPool::Worker::GetForCurrentThread() { 543 // Don't construct lazy instance on check. 544 if (lazy_tls_ptr_ == nullptr) 545 return nullptr; 546 547 return lazy_tls_ptr_.Get().Get(); 548 } 549 550 // static 551 LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky 552 SequencedWorkerPool::Worker::lazy_tls_ptr_ = LAZY_INSTANCE_INITIALIZER; 553 554 // Inner definitions --------------------------------------------------------- 555 556 SequencedWorkerPool::Inner::Inner( 557 SequencedWorkerPool* worker_pool, 558 size_t max_threads, 559 const std::string& thread_name_prefix, 560 TestingObserver* observer) 561 : worker_pool_(worker_pool), 562 lock_(), 563 has_work_cv_(&lock_), 564 can_shutdown_cv_(&lock_), 565 max_threads_(max_threads), 566 thread_name_prefix_(thread_name_prefix), 567 thread_being_created_(false), 568 waiting_thread_count_(0), 569 blocking_shutdown_thread_count_(0), 570 next_sequence_task_number_(0), 571 blocking_shutdown_pending_task_count_(0), 572 trace_id_(0), 573 shutdown_called_(false), 574 max_blocking_tasks_after_shutdown_(0), 575 cleanup_state_(CLEANUP_DONE), 576 cleanup_idlers_(0), 577 cleanup_cv_(&lock_), 578 testing_observer_(observer) {} 579 580 SequencedWorkerPool::Inner::~Inner() { 581 // You must call Shutdown() before destroying the pool. 582 DCHECK(shutdown_called_); 583 584 // Need to explicitly join with the threads before they're destroyed or else 585 // they will be running when our object is half torn down. 586 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 587 it->second->Join(); 588 threads_.clear(); 589 590 if (testing_observer_) 591 testing_observer_->OnDestruct(); 592 } 593 594 // static 595 SequencedWorkerPool::SequenceToken 596 SequencedWorkerPool::Inner::GetSequenceToken() { 597 // Need to add one because StaticAtomicSequenceNumber starts at zero, which 598 // is used as a sentinel value in SequenceTokens. 599 return SequenceToken(g_last_sequence_number_.GetNext() + 1); 600 } 601 602 SequencedWorkerPool::SequenceToken 603 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 604 AutoLock lock(lock_); 605 return SequenceToken(LockedGetNamedTokenID(name)); 606 } 607 608 bool SequencedWorkerPool::Inner::PostTask( 609 const std::string* optional_token_name, 610 SequenceToken sequence_token, 611 WorkerShutdown shutdown_behavior, 612 const tracked_objects::Location& from_here, 613 const Closure& task, 614 TimeDelta delay) { 615 DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN); 616 SequencedTask sequenced(from_here); 617 sequenced.sequence_token_id = sequence_token.id_; 618 sequenced.shutdown_behavior = shutdown_behavior; 619 sequenced.posted_from = from_here; 620 sequenced.task = 621 shutdown_behavior == BLOCK_SHUTDOWN ? 622 base::MakeCriticalClosure(task) : task; 623 sequenced.time_to_run = TimeTicks::Now() + delay; 624 625 int create_thread_id = 0; 626 { 627 AutoLock lock(lock_); 628 if (shutdown_called_) { 629 // Don't allow a new task to be posted if it doesn't block shutdown. 630 if (shutdown_behavior != BLOCK_SHUTDOWN) 631 return false; 632 633 // If the current thread is running a task, and that task doesn't block 634 // shutdown, then it shouldn't be allowed to post any more tasks. 635 ThreadMap::const_iterator found = 636 threads_.find(PlatformThread::CurrentId()); 637 if (found != threads_.end() && found->second->is_processing_task() && 638 found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) { 639 return false; 640 } 641 642 if (max_blocking_tasks_after_shutdown_ <= 0) { 643 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; 644 return false; 645 } 646 max_blocking_tasks_after_shutdown_ -= 1; 647 } 648 649 // The trace_id is used for identifying the task in about:tracing. 650 sequenced.trace_id = trace_id_++; 651 652 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), 653 "SequencedWorkerPool::Inner::PostTask", 654 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), 655 TRACE_EVENT_FLAG_FLOW_OUT); 656 657 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); 658 659 // Now that we have the lock, apply the named token rules. 660 if (optional_token_name) 661 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); 662 663 pending_tasks_.insert(sequenced); 664 if (shutdown_behavior == BLOCK_SHUTDOWN) 665 blocking_shutdown_pending_task_count_++; 666 667 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 668 } 669 670 // Actually start the additional thread or signal an existing one now that 671 // we're outside the lock. 672 if (create_thread_id) 673 FinishStartingAdditionalThread(create_thread_id); 674 else 675 SignalHasWork(); 676 677 return true; 678 } 679 680 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 681 AutoLock lock(lock_); 682 return ContainsKey(threads_, PlatformThread::CurrentId()); 683 } 684 685 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 686 SequenceToken sequence_token) const { 687 AutoLock lock(lock_); 688 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 689 if (found == threads_.end()) 690 return false; 691 return found->second->is_processing_task() && 692 sequence_token.Equals(found->second->task_sequence_token()); 693 } 694 695 bool SequencedWorkerPool::Inner::IsRunningSequence( 696 SequenceToken sequence_token) const { 697 DCHECK(sequence_token.IsValid()); 698 AutoLock lock(lock_); 699 return !IsSequenceTokenRunnable(sequence_token.id_); 700 } 701 702 void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread( 703 SequenceToken sequence_token, 704 WorkerShutdown shutdown_behavior) { 705 AutoLock lock(lock_); 706 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 707 DCHECK(found != threads_.end()); 708 DCHECK(found->second->is_processing_task()); 709 DCHECK(!found->second->task_sequence_token().IsValid()); 710 found->second->set_running_task_info(sequence_token, shutdown_behavior); 711 712 // Mark the sequence token as in use. 713 bool success = current_sequences_.insert(sequence_token.id_).second; 714 DCHECK(success); 715 } 716 717 // See https://code.google.com/p/chromium/issues/detail?id=168415 718 void SequencedWorkerPool::Inner::CleanupForTesting() { 719 DCHECK(!RunsTasksOnCurrentThread()); 720 base::ThreadRestrictions::ScopedAllowWait allow_wait; 721 AutoLock lock(lock_); 722 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 723 if (shutdown_called_) 724 return; 725 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) 726 return; 727 cleanup_state_ = CLEANUP_REQUESTED; 728 cleanup_idlers_ = 0; 729 has_work_cv_.Signal(); 730 while (cleanup_state_ != CLEANUP_DONE) 731 cleanup_cv_.Wait(); 732 } 733 734 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { 735 SignalHasWork(); 736 } 737 738 void SequencedWorkerPool::Inner::Shutdown( 739 int max_new_blocking_tasks_after_shutdown) { 740 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); 741 { 742 AutoLock lock(lock_); 743 // Cleanup and Shutdown should not be called concurrently. 744 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 745 if (shutdown_called_) 746 return; 747 shutdown_called_ = true; 748 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; 749 750 // Tickle the threads. This will wake up a waiting one so it will know that 751 // it can exit, which in turn will wake up any other waiting ones. 752 SignalHasWork(); 753 754 // There are no pending or running tasks blocking shutdown, we're done. 755 if (CanShutdown()) 756 return; 757 } 758 759 // If we're here, then something is blocking shutdown. So wait for 760 // CanShutdown() to go to true. 761 762 if (testing_observer_) 763 testing_observer_->WillWaitForShutdown(); 764 765 #if !defined(OS_NACL) 766 TimeTicks shutdown_wait_begin = TimeTicks::Now(); 767 #endif 768 769 { 770 base::ThreadRestrictions::ScopedAllowWait allow_wait; 771 AutoLock lock(lock_); 772 while (!CanShutdown()) 773 can_shutdown_cv_.Wait(); 774 } 775 #if !defined(OS_NACL) 776 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", 777 TimeTicks::Now() - shutdown_wait_begin); 778 #endif 779 } 780 781 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { 782 AutoLock lock(lock_); 783 return shutdown_called_; 784 } 785 786 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 787 { 788 AutoLock lock(lock_); 789 DCHECK(thread_being_created_); 790 thread_being_created_ = false; 791 std::pair<ThreadMap::iterator, bool> result = 792 threads_.insert( 793 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); 794 DCHECK(result.second); 795 796 while (true) { 797 #if defined(OS_MACOSX) 798 base::mac::ScopedNSAutoreleasePool autorelease_pool; 799 #endif 800 801 HandleCleanup(); 802 803 // See GetWork for what delete_these_outside_lock is doing. 804 SequencedTask task; 805 TimeDelta wait_time; 806 std::vector<Closure> delete_these_outside_lock; 807 GetWorkStatus status = 808 GetWork(&task, &wait_time, &delete_these_outside_lock); 809 if (status == GET_WORK_FOUND) { 810 TRACE_EVENT_WITH_FLOW2(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), 811 "SequencedWorkerPool::Inner::ThreadLoop", 812 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))), 813 TRACE_EVENT_FLAG_FLOW_IN, 814 "src_file", task.posted_from.file_name(), 815 "src_func", task.posted_from.function_name()); 816 int new_thread_id = WillRunWorkerTask(task); 817 { 818 AutoUnlock unlock(lock_); 819 // There may be more work available, so wake up another 820 // worker thread. (Technically not required, since we 821 // already get a signal for each new task, but it doesn't 822 // hurt.) 823 SignalHasWork(); 824 delete_these_outside_lock.clear(); 825 826 // Complete thread creation outside the lock if necessary. 827 if (new_thread_id) 828 FinishStartingAdditionalThread(new_thread_id); 829 830 this_worker->set_running_task_info( 831 SequenceToken(task.sequence_token_id), task.shutdown_behavior); 832 833 tracked_objects::TaskStopwatch stopwatch; 834 stopwatch.Start(); 835 task.task.Run(); 836 stopwatch.Stop(); 837 838 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( 839 task, stopwatch); 840 841 // Update the sequence token in case it has been set from within the 842 // task, so it can be removed from the set of currently running 843 // sequences in DidRunWorkerTask() below. 844 task.sequence_token_id = this_worker->task_sequence_token().id_; 845 846 // Make sure our task is erased outside the lock for the 847 // same reason we do this with delete_these_oustide_lock. 848 // Also, do it before calling reset_running_task_info() so 849 // that sequence-checking from within the task's destructor 850 // still works. 851 task.task = Closure(); 852 853 this_worker->reset_running_task_info(); 854 } 855 DidRunWorkerTask(task); // Must be done inside the lock. 856 } else if (cleanup_state_ == CLEANUP_RUNNING) { 857 switch (status) { 858 case GET_WORK_WAIT: { 859 AutoUnlock unlock(lock_); 860 delete_these_outside_lock.clear(); 861 } 862 break; 863 case GET_WORK_NOT_FOUND: 864 CHECK(delete_these_outside_lock.empty()); 865 cleanup_state_ = CLEANUP_FINISHING; 866 cleanup_cv_.Broadcast(); 867 break; 868 default: 869 NOTREACHED(); 870 } 871 } else { 872 // When we're terminating and there's no more work, we can 873 // shut down, other workers can complete any pending or new tasks. 874 // We can get additional tasks posted after shutdown_called_ is set 875 // but only worker threads are allowed to post tasks at that time, and 876 // the workers responsible for posting those tasks will be available 877 // to run them. Also, there may be some tasks stuck behind running 878 // ones with the same sequence token, but additional threads won't 879 // help this case. 880 if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) { 881 AutoUnlock unlock(lock_); 882 delete_these_outside_lock.clear(); 883 break; 884 } 885 886 // No work was found, but there are tasks that need deletion. The 887 // deletion must happen outside of the lock. 888 if (delete_these_outside_lock.size()) { 889 AutoUnlock unlock(lock_); 890 delete_these_outside_lock.clear(); 891 892 // Since the lock has been released, |status| may no longer be 893 // accurate. It might read GET_WORK_WAIT even if there are tasks 894 // ready to perform work. Jump to the top of the loop to recalculate 895 // |status|. 896 continue; 897 } 898 899 waiting_thread_count_++; 900 901 switch (status) { 902 case GET_WORK_NOT_FOUND: 903 has_work_cv_.Wait(); 904 break; 905 case GET_WORK_WAIT: 906 has_work_cv_.TimedWait(wait_time); 907 break; 908 default: 909 NOTREACHED(); 910 } 911 waiting_thread_count_--; 912 } 913 } 914 } // Release lock_. 915 916 // We noticed we should exit. Wake up the next worker so it knows it should 917 // exit as well (because the Shutdown() code only signals once). 918 SignalHasWork(); 919 920 // Possibly unblock shutdown. 921 can_shutdown_cv_.Signal(); 922 } 923 924 void SequencedWorkerPool::Inner::HandleCleanup() { 925 lock_.AssertAcquired(); 926 if (cleanup_state_ == CLEANUP_DONE) 927 return; 928 if (cleanup_state_ == CLEANUP_REQUESTED) { 929 // We win, we get to do the cleanup as soon as the others wise up and idle. 930 cleanup_state_ = CLEANUP_STARTING; 931 while (thread_being_created_ || 932 cleanup_idlers_ != threads_.size() - 1) { 933 has_work_cv_.Signal(); 934 cleanup_cv_.Wait(); 935 } 936 cleanup_state_ = CLEANUP_RUNNING; 937 return; 938 } 939 if (cleanup_state_ == CLEANUP_STARTING) { 940 // Another worker thread is cleaning up, we idle here until thats done. 941 ++cleanup_idlers_; 942 cleanup_cv_.Broadcast(); 943 while (cleanup_state_ != CLEANUP_FINISHING) { 944 cleanup_cv_.Wait(); 945 } 946 --cleanup_idlers_; 947 cleanup_cv_.Broadcast(); 948 return; 949 } 950 if (cleanup_state_ == CLEANUP_FINISHING) { 951 // We wait for all idlers to wake up prior to being DONE. 952 while (cleanup_idlers_ != 0) { 953 cleanup_cv_.Broadcast(); 954 cleanup_cv_.Wait(); 955 } 956 if (cleanup_state_ == CLEANUP_FINISHING) { 957 cleanup_state_ = CLEANUP_DONE; 958 cleanup_cv_.Signal(); 959 } 960 return; 961 } 962 } 963 964 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( 965 const std::string& name) { 966 lock_.AssertAcquired(); 967 DCHECK(!name.empty()); 968 969 std::map<std::string, int>::const_iterator found = 970 named_sequence_tokens_.find(name); 971 if (found != named_sequence_tokens_.end()) 972 return found->second; // Got an existing one. 973 974 // Create a new one for this name. 975 SequenceToken result = GetSequenceToken(); 976 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); 977 return result.id_; 978 } 979 980 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { 981 lock_.AssertAcquired(); 982 // We assume that we never create enough tasks to wrap around. 983 return next_sequence_task_number_++; 984 } 985 986 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 987 SequencedTask* task, 988 TimeDelta* wait_time, 989 std::vector<Closure>* delete_these_outside_lock) { 990 lock_.AssertAcquired(); 991 992 // Find the next task with a sequence token that's not currently in use. 993 // If the token is in use, that means another thread is running something 994 // in that sequence, and we can't run it without going out-of-order. 995 // 996 // This algorithm is simple and fair, but inefficient in some cases. For 997 // example, say somebody schedules 1000 slow tasks with the same sequence 998 // number. We'll have to go through all those tasks each time we feel like 999 // there might be work to schedule. If this proves to be a problem, we 1000 // should make this more efficient. 1001 // 1002 // One possible enhancement would be to keep a map from sequence ID to a 1003 // list of pending but currently blocked SequencedTasks for that ID. 1004 // When a worker finishes a task of one sequence token, it can pick up the 1005 // next one from that token right away. 1006 // 1007 // This may lead to starvation if there are sufficient numbers of sequences 1008 // in use. To alleviate this, we could add an incrementing priority counter 1009 // to each SequencedTask. Then maintain a priority_queue of all runnable 1010 // tasks, sorted by priority counter. When a sequenced task is completed 1011 // we would pop the head element off of that tasks pending list and add it 1012 // to the priority queue. Then we would run the first item in the priority 1013 // queue. 1014 1015 GetWorkStatus status = GET_WORK_NOT_FOUND; 1016 int unrunnable_tasks = 0; 1017 PendingTaskSet::iterator i = pending_tasks_.begin(); 1018 // We assume that the loop below doesn't take too long and so we can just do 1019 // a single call to TimeTicks::Now(). 1020 const TimeTicks current_time = TimeTicks::Now(); 1021 while (i != pending_tasks_.end()) { 1022 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { 1023 unrunnable_tasks++; 1024 ++i; 1025 continue; 1026 } 1027 1028 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { 1029 // We're shutting down and the task we just found isn't blocking 1030 // shutdown. Delete it and get more work. 1031 // 1032 // Note that we do not want to delete unrunnable tasks. Deleting a task 1033 // can have side effects (like freeing some objects) and deleting a 1034 // task that's supposed to run after one that's currently running could 1035 // cause an obscure crash. 1036 // 1037 // We really want to delete these tasks outside the lock in case the 1038 // closures are holding refs to objects that want to post work from 1039 // their destructorss (which would deadlock). The closures are 1040 // internally refcounted, so we just need to keep a copy of them alive 1041 // until the lock is exited. The calling code can just clear() the 1042 // vector they passed to us once the lock is exited to make this 1043 // happen. 1044 delete_these_outside_lock->push_back(i->task); 1045 pending_tasks_.erase(i++); 1046 continue; 1047 } 1048 1049 if (i->time_to_run > current_time) { 1050 // The time to run has not come yet. 1051 *wait_time = i->time_to_run - current_time; 1052 status = GET_WORK_WAIT; 1053 if (cleanup_state_ == CLEANUP_RUNNING) { 1054 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. 1055 delete_these_outside_lock->push_back(i->task); 1056 pending_tasks_.erase(i); 1057 } 1058 break; 1059 } 1060 1061 // Found a runnable task. 1062 *task = *i; 1063 pending_tasks_.erase(i); 1064 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { 1065 blocking_shutdown_pending_task_count_--; 1066 } 1067 1068 status = GET_WORK_FOUND; 1069 break; 1070 } 1071 1072 return status; 1073 } 1074 1075 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { 1076 lock_.AssertAcquired(); 1077 1078 // Mark the task's sequence number as in use. 1079 if (task.sequence_token_id) 1080 current_sequences_.insert(task.sequence_token_id); 1081 1082 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN 1083 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread 1084 // completes. 1085 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) 1086 blocking_shutdown_thread_count_++; 1087 1088 // We just picked up a task. Since StartAdditionalThreadIfHelpful only 1089 // creates a new thread if there is no free one, there is a race when posting 1090 // tasks that many tasks could have been posted before a thread started 1091 // running them, so only one thread would have been created. So we also check 1092 // whether we should create more threads after removing our task from the 1093 // queue, which also has the nice side effect of creating the workers from 1094 // background threads rather than the main thread of the app. 1095 // 1096 // If another thread wasn't created, we want to wake up an existing thread 1097 // if there is one waiting to pick up the next task. 1098 // 1099 // Note that we really need to do this *before* running the task, not 1100 // after. Otherwise, if more than one task is posted, the creation of the 1101 // second thread (since we only create one at a time) will be blocked by 1102 // the execution of the first task, which could be arbitrarily long. 1103 return PrepareToStartAdditionalThreadIfHelpful(); 1104 } 1105 1106 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { 1107 lock_.AssertAcquired(); 1108 1109 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { 1110 DCHECK_GT(blocking_shutdown_thread_count_, 0u); 1111 blocking_shutdown_thread_count_--; 1112 } 1113 1114 if (task.sequence_token_id) 1115 current_sequences_.erase(task.sequence_token_id); 1116 } 1117 1118 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( 1119 int sequence_token_id) const { 1120 lock_.AssertAcquired(); 1121 return !sequence_token_id || 1122 current_sequences_.find(sequence_token_id) == 1123 current_sequences_.end(); 1124 } 1125 1126 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { 1127 lock_.AssertAcquired(); 1128 // How thread creation works: 1129 // 1130 // We'de like to avoid creating threads with the lock held. However, we 1131 // need to be sure that we have an accurate accounting of the threads for 1132 // proper Joining and deltion on shutdown. 1133 // 1134 // We need to figure out if we need another thread with the lock held, which 1135 // is what this function does. It then marks us as in the process of creating 1136 // a thread. When we do shutdown, we wait until the thread_being_created_ 1137 // flag is cleared, which ensures that the new thread is properly added to 1138 // all the data structures and we can't leak it. Once shutdown starts, we'll 1139 // refuse to create more threads or they would be leaked. 1140 // 1141 // Note that this creates a mostly benign race condition on shutdown that 1142 // will cause fewer workers to be created than one would expect. It isn't 1143 // much of an issue in real life, but affects some tests. Since we only spawn 1144 // one worker at a time, the following sequence of events can happen: 1145 // 1146 // 1. Main thread posts a bunch of unrelated tasks that would normally be 1147 // run on separate threads. 1148 // 2. The first task post causes us to start a worker. Other tasks do not 1149 // cause a worker to start since one is pending. 1150 // 3. Main thread initiates shutdown. 1151 // 4. No more threads are created since the shutdown_called_ flag is set. 1152 // 1153 // The result is that one may expect that max_threads_ workers to be created 1154 // given the workload, but in reality fewer may be created because the 1155 // sequence of thread creation on the background threads is racing with the 1156 // shutdown call. 1157 if (!shutdown_called_ && 1158 !thread_being_created_ && 1159 cleanup_state_ == CLEANUP_DONE && 1160 threads_.size() < max_threads_ && 1161 waiting_thread_count_ == 0) { 1162 // We could use an additional thread if there's work to be done. 1163 for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); 1164 i != pending_tasks_.end(); ++i) { 1165 if (IsSequenceTokenRunnable(i->sequence_token_id)) { 1166 // Found a runnable task, mark the thread as being started. 1167 thread_being_created_ = true; 1168 return static_cast<int>(threads_.size() + 1); 1169 } 1170 } 1171 } 1172 return 0; 1173 } 1174 1175 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( 1176 int thread_number) { 1177 // Called outside of the lock. 1178 DCHECK_GT(thread_number, 0); 1179 1180 // The worker is assigned to the list when the thread actually starts, which 1181 // will manage the memory of the pointer. 1182 new Worker(worker_pool_, thread_number, thread_name_prefix_); 1183 } 1184 1185 void SequencedWorkerPool::Inner::SignalHasWork() { 1186 has_work_cv_.Signal(); 1187 if (testing_observer_) { 1188 testing_observer_->OnHasWork(); 1189 } 1190 } 1191 1192 bool SequencedWorkerPool::Inner::CanShutdown() const { 1193 lock_.AssertAcquired(); 1194 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 1195 return !thread_being_created_ && 1196 blocking_shutdown_thread_count_ == 0 && 1197 blocking_shutdown_pending_task_count_ == 0; 1198 } 1199 1200 base::StaticAtomicSequenceNumber 1201 SequencedWorkerPool::Inner::g_last_sequence_number_; 1202 1203 // SequencedWorkerPool -------------------------------------------------------- 1204 1205 std::string SequencedWorkerPool::SequenceToken::ToString() const { 1206 return base::StringPrintf("[%d]", id_); 1207 } 1208 1209 // static 1210 SequencedWorkerPool::SequenceToken 1211 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { 1212 Worker* worker = Worker::GetForCurrentThread(); 1213 if (!worker) 1214 return SequenceToken(); 1215 1216 return worker->task_sequence_token(); 1217 } 1218 1219 // static 1220 scoped_refptr<SequencedWorkerPool> 1221 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { 1222 Worker* worker = Worker::GetForCurrentThread(); 1223 if (!worker) 1224 return nullptr; 1225 1226 return worker->worker_pool(); 1227 } 1228 1229 // static 1230 scoped_refptr<SequencedTaskRunner> 1231 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() { 1232 Worker* worker = Worker::GetForCurrentThread(); 1233 1234 // If there is no worker, this thread is not a worker thread. Otherwise, it is 1235 // currently running a task (sequenced or unsequenced). 1236 if (!worker) 1237 return nullptr; 1238 1239 scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool(); 1240 SequenceToken sequence_token = worker->task_sequence_token(); 1241 WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior(); 1242 if (!sequence_token.IsValid()) { 1243 // Create a new sequence token and bind this thread to it, to make sure that 1244 // a task posted to the SequencedTaskRunner we are going to return is not 1245 // immediately going to run on a different thread. 1246 sequence_token = Inner::GetSequenceToken(); 1247 pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token, 1248 shutdown_behavior); 1249 } 1250 1251 DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token)); 1252 return new SequencedWorkerPoolSequencedTaskRunner( 1253 std::move(pool), sequence_token, shutdown_behavior); 1254 } 1255 1256 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1257 const std::string& thread_name_prefix) 1258 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1259 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { 1260 } 1261 1262 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1263 const std::string& thread_name_prefix, 1264 TestingObserver* observer) 1265 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1266 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { 1267 } 1268 1269 SequencedWorkerPool::~SequencedWorkerPool() {} 1270 1271 void SequencedWorkerPool::OnDestruct() const { 1272 // Avoid deleting ourselves on a worker thread (which would deadlock). 1273 if (RunsTasksOnCurrentThread()) { 1274 constructor_task_runner_->DeleteSoon(FROM_HERE, this); 1275 } else { 1276 delete this; 1277 } 1278 } 1279 1280 // static 1281 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 1282 return Inner::GetSequenceToken(); 1283 } 1284 1285 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 1286 const std::string& name) { 1287 return inner_->GetNamedSequenceToken(name); 1288 } 1289 1290 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( 1291 SequenceToken token) { 1292 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); 1293 } 1294 1295 scoped_refptr<SequencedTaskRunner> 1296 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior( 1297 SequenceToken token, WorkerShutdown shutdown_behavior) { 1298 return new SequencedWorkerPoolSequencedTaskRunner( 1299 this, token, shutdown_behavior); 1300 } 1301 1302 scoped_refptr<TaskRunner> 1303 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior( 1304 WorkerShutdown shutdown_behavior) { 1305 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior); 1306 } 1307 1308 bool SequencedWorkerPool::PostWorkerTask( 1309 const tracked_objects::Location& from_here, 1310 const Closure& task) { 1311 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, 1312 from_here, task, TimeDelta()); 1313 } 1314 1315 bool SequencedWorkerPool::PostDelayedWorkerTask( 1316 const tracked_objects::Location& from_here, 1317 const Closure& task, 1318 TimeDelta delay) { 1319 WorkerShutdown shutdown_behavior = 1320 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1321 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, 1322 from_here, task, delay); 1323 } 1324 1325 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( 1326 const tracked_objects::Location& from_here, 1327 const Closure& task, 1328 WorkerShutdown shutdown_behavior) { 1329 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, 1330 from_here, task, TimeDelta()); 1331 } 1332 1333 bool SequencedWorkerPool::PostSequencedWorkerTask( 1334 SequenceToken sequence_token, 1335 const tracked_objects::Location& from_here, 1336 const Closure& task) { 1337 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN, 1338 from_here, task, TimeDelta()); 1339 } 1340 1341 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask( 1342 SequenceToken sequence_token, 1343 const tracked_objects::Location& from_here, 1344 const Closure& task, 1345 TimeDelta delay) { 1346 WorkerShutdown shutdown_behavior = 1347 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1348 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, 1349 from_here, task, delay); 1350 } 1351 1352 bool SequencedWorkerPool::PostNamedSequencedWorkerTask( 1353 const std::string& token_name, 1354 const tracked_objects::Location& from_here, 1355 const Closure& task) { 1356 DCHECK(!token_name.empty()); 1357 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN, 1358 from_here, task, TimeDelta()); 1359 } 1360 1361 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( 1362 SequenceToken sequence_token, 1363 const tracked_objects::Location& from_here, 1364 const Closure& task, 1365 WorkerShutdown shutdown_behavior) { 1366 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, 1367 from_here, task, TimeDelta()); 1368 } 1369 1370 bool SequencedWorkerPool::PostDelayedTask( 1371 const tracked_objects::Location& from_here, 1372 const Closure& task, 1373 TimeDelta delay) { 1374 return PostDelayedWorkerTask(from_here, task, delay); 1375 } 1376 1377 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1378 return inner_->RunsTasksOnCurrentThread(); 1379 } 1380 1381 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1382 SequenceToken sequence_token) const { 1383 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1384 } 1385 1386 bool SequencedWorkerPool::IsRunningSequence( 1387 SequenceToken sequence_token) const { 1388 return inner_->IsRunningSequence(sequence_token); 1389 } 1390 1391 void SequencedWorkerPool::FlushForTesting() { 1392 inner_->CleanupForTesting(); 1393 } 1394 1395 void SequencedWorkerPool::SignalHasWorkForTesting() { 1396 inner_->SignalHasWorkForTesting(); 1397 } 1398 1399 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1400 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); 1401 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1402 } 1403 1404 bool SequencedWorkerPool::IsShutdownInProgress() { 1405 return inner_->IsShutdownInProgress(); 1406 } 1407 1408 } // namespace base 1409