1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/threading/sequenced_worker_pool.h" 6 7 #include <stdint.h> 8 9 #include <list> 10 #include <map> 11 #include <memory> 12 #include <set> 13 #include <utility> 14 #include <vector> 15 16 #include "base/atomic_sequence_num.h" 17 #include "base/callback.h" 18 #include "base/compiler_specific.h" 19 #include "base/critical_closure.h" 20 #include "base/lazy_instance.h" 21 #include "base/logging.h" 22 #include "base/macros.h" 23 #include "base/memory/ptr_util.h" 24 #include "base/stl_util.h" 25 #include "base/strings/stringprintf.h" 26 #include "base/synchronization/condition_variable.h" 27 #include "base/synchronization/lock.h" 28 #include "base/threading/platform_thread.h" 29 #include "base/threading/simple_thread.h" 30 #include "base/threading/thread_local.h" 31 #include "base/threading/thread_restrictions.h" 32 #include "base/threading/thread_task_runner_handle.h" 33 #include "base/time/time.h" 34 #include "base/trace_event/heap_profiler.h" 35 #include "base/trace_event/trace_event.h" 36 #include "base/tracked_objects.h" 37 #include "build/build_config.h" 38 39 #if defined(OS_MACOSX) 40 #include "base/mac/scoped_nsautorelease_pool.h" 41 #elif defined(OS_WIN) 42 #include "base/win/scoped_com_initializer.h" 43 #endif 44 45 #if !defined(OS_NACL) 46 #include "base/metrics/histogram.h" 47 #endif 48 49 namespace base { 50 51 namespace { 52 53 struct SequencedTask : public TrackingInfo { 54 SequencedTask() 55 : sequence_token_id(0), 56 trace_id(0), 57 sequence_task_number(0), 58 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 59 60 explicit SequencedTask(const tracked_objects::Location& from_here) 61 : base::TrackingInfo(from_here, TimeTicks()), 62 sequence_token_id(0), 63 trace_id(0), 64 sequence_task_number(0), 65 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 66 67 ~SequencedTask() {} 68 69 int sequence_token_id; 70 int trace_id; 71 int64_t sequence_task_number; 72 SequencedWorkerPool::WorkerShutdown shutdown_behavior; 73 tracked_objects::Location posted_from; 74 Closure task; 75 76 // Non-delayed tasks and delayed tasks are managed together by time-to-run 77 // order. We calculate the time by adding the posted time and the given delay. 78 TimeTicks time_to_run; 79 }; 80 81 struct SequencedTaskLessThan { 82 public: 83 bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const { 84 if (lhs.time_to_run < rhs.time_to_run) 85 return true; 86 87 if (lhs.time_to_run > rhs.time_to_run) 88 return false; 89 90 // If the time happen to match, then we use the sequence number to decide. 91 return lhs.sequence_task_number < rhs.sequence_task_number; 92 } 93 }; 94 95 // SequencedWorkerPoolTaskRunner --------------------------------------------- 96 // A TaskRunner which posts tasks to a SequencedWorkerPool with a 97 // fixed ShutdownBehavior. 98 // 99 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 100 class SequencedWorkerPoolTaskRunner : public TaskRunner { 101 public: 102 SequencedWorkerPoolTaskRunner( 103 scoped_refptr<SequencedWorkerPool> pool, 104 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 105 106 // TaskRunner implementation 107 bool PostDelayedTask(const tracked_objects::Location& from_here, 108 const Closure& task, 109 TimeDelta delay) override; 110 bool RunsTasksOnCurrentThread() const override; 111 112 private: 113 ~SequencedWorkerPoolTaskRunner() override; 114 115 const scoped_refptr<SequencedWorkerPool> pool_; 116 117 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 118 119 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner); 120 }; 121 122 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner( 123 scoped_refptr<SequencedWorkerPool> pool, 124 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 125 : pool_(std::move(pool)), shutdown_behavior_(shutdown_behavior) {} 126 127 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() { 128 } 129 130 bool SequencedWorkerPoolTaskRunner::PostDelayedTask( 131 const tracked_objects::Location& from_here, 132 const Closure& task, 133 TimeDelta delay) { 134 if (delay.is_zero()) { 135 return pool_->PostWorkerTaskWithShutdownBehavior( 136 from_here, task, shutdown_behavior_); 137 } 138 return pool_->PostDelayedWorkerTask(from_here, task, delay); 139 } 140 141 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const { 142 return pool_->RunsTasksOnCurrentThread(); 143 } 144 145 // SequencedWorkerPoolSequencedTaskRunner ------------------------------------ 146 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a 147 // fixed sequence token. 148 // 149 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 150 class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner { 151 public: 152 SequencedWorkerPoolSequencedTaskRunner( 153 scoped_refptr<SequencedWorkerPool> pool, 154 SequencedWorkerPool::SequenceToken token, 155 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 156 157 // TaskRunner implementation 158 bool PostDelayedTask(const tracked_objects::Location& from_here, 159 const Closure& task, 160 TimeDelta delay) override; 161 bool RunsTasksOnCurrentThread() const override; 162 163 // SequencedTaskRunner implementation 164 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, 165 const Closure& task, 166 TimeDelta delay) override; 167 168 private: 169 ~SequencedWorkerPoolSequencedTaskRunner() override; 170 171 const scoped_refptr<SequencedWorkerPool> pool_; 172 173 const SequencedWorkerPool::SequenceToken token_; 174 175 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 176 177 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner); 178 }; 179 180 SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner( 181 scoped_refptr<SequencedWorkerPool> pool, 182 SequencedWorkerPool::SequenceToken token, 183 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 184 : pool_(std::move(pool)), 185 token_(token), 186 shutdown_behavior_(shutdown_behavior) {} 187 188 SequencedWorkerPoolSequencedTaskRunner:: 189 ~SequencedWorkerPoolSequencedTaskRunner() { 190 } 191 192 bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask( 193 const tracked_objects::Location& from_here, 194 const Closure& task, 195 TimeDelta delay) { 196 if (delay.is_zero()) { 197 return pool_->PostSequencedWorkerTaskWithShutdownBehavior( 198 token_, from_here, task, shutdown_behavior_); 199 } 200 return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay); 201 } 202 203 bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const { 204 return pool_->IsRunningSequenceOnCurrentThread(token_); 205 } 206 207 bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask( 208 const tracked_objects::Location& from_here, 209 const Closure& task, 210 TimeDelta delay) { 211 // There's no way to run nested tasks, so simply forward to 212 // PostDelayedTask. 213 return PostDelayedTask(from_here, task, delay); 214 } 215 216 // Create a process-wide unique ID to represent this task in trace events. This 217 // will be mangled with a Process ID hash to reduce the likelyhood of colliding 218 // with MessageLoop pointers on other processes. 219 uint64_t GetTaskTraceID(const SequencedTask& task, void* pool) { 220 return (static_cast<uint64_t>(task.trace_id) << 32) | 221 static_cast<uint64_t>(reinterpret_cast<intptr_t>(pool)); 222 } 223 224 } // namespace 225 226 // Worker --------------------------------------------------------------------- 227 228 class SequencedWorkerPool::Worker : public SimpleThread { 229 public: 230 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it 231 // around as long as we are running. 232 Worker(scoped_refptr<SequencedWorkerPool> worker_pool, 233 int thread_number, 234 const std::string& thread_name_prefix); 235 ~Worker() override; 236 237 // SimpleThread implementation. This actually runs the background thread. 238 void Run() override; 239 240 // Gets the worker for the current thread out of thread-local storage. 241 static Worker* GetForCurrentThread(); 242 243 // Indicates that a task is about to be run. The parameters provide 244 // additional metainformation about the task being run. 245 void set_running_task_info(SequenceToken token, 246 WorkerShutdown shutdown_behavior) { 247 is_processing_task_ = true; 248 task_sequence_token_ = token; 249 task_shutdown_behavior_ = shutdown_behavior; 250 } 251 252 // Indicates that the task has finished running. 253 void reset_running_task_info() { is_processing_task_ = false; } 254 255 // Whether the worker is processing a task. 256 bool is_processing_task() { return is_processing_task_; } 257 258 SequenceToken task_sequence_token() const { 259 DCHECK(is_processing_task_); 260 return task_sequence_token_; 261 } 262 263 WorkerShutdown task_shutdown_behavior() const { 264 DCHECK(is_processing_task_); 265 return task_shutdown_behavior_; 266 } 267 268 scoped_refptr<SequencedWorkerPool> worker_pool() const { 269 return worker_pool_; 270 } 271 272 private: 273 static LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky 274 lazy_tls_ptr_; 275 276 scoped_refptr<SequencedWorkerPool> worker_pool_; 277 // The sequence token of the task being processed. Only valid when 278 // is_processing_task_ is true. 279 SequenceToken task_sequence_token_; 280 // The shutdown behavior of the task being processed. Only valid when 281 // is_processing_task_ is true. 282 WorkerShutdown task_shutdown_behavior_; 283 // Whether the Worker is processing a task. 284 bool is_processing_task_; 285 286 DISALLOW_COPY_AND_ASSIGN(Worker); 287 }; 288 289 // Inner ---------------------------------------------------------------------- 290 291 class SequencedWorkerPool::Inner { 292 public: 293 // Take a raw pointer to |worker| to avoid cycles (since we're owned 294 // by it). 295 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 296 const std::string& thread_name_prefix, 297 TestingObserver* observer); 298 299 ~Inner(); 300 301 static SequenceToken GetSequenceToken(); 302 303 SequenceToken GetNamedSequenceToken(const std::string& name); 304 305 // This function accepts a name and an ID. If the name is null, the 306 // token ID is used. This allows us to implement the optional name lookup 307 // from a single function without having to enter the lock a separate time. 308 bool PostTask(const std::string* optional_token_name, 309 SequenceToken sequence_token, 310 WorkerShutdown shutdown_behavior, 311 const tracked_objects::Location& from_here, 312 const Closure& task, 313 TimeDelta delay); 314 315 bool RunsTasksOnCurrentThread() const; 316 317 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 318 319 bool IsRunningSequence(SequenceToken sequence_token) const; 320 321 void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token, 322 WorkerShutdown shutdown_behavior); 323 324 void CleanupForTesting(); 325 326 void SignalHasWorkForTesting(); 327 328 int GetWorkSignalCountForTesting() const; 329 330 void Shutdown(int max_blocking_tasks_after_shutdown); 331 332 bool IsShutdownInProgress(); 333 334 // Runs the worker loop on the background thread. 335 void ThreadLoop(Worker* this_worker); 336 337 private: 338 enum GetWorkStatus { 339 GET_WORK_FOUND, 340 GET_WORK_NOT_FOUND, 341 GET_WORK_WAIT, 342 }; 343 344 enum CleanupState { 345 CLEANUP_REQUESTED, 346 CLEANUP_STARTING, 347 CLEANUP_RUNNING, 348 CLEANUP_FINISHING, 349 CLEANUP_DONE, 350 }; 351 352 // Called from within the lock, this converts the given token name into a 353 // token ID, creating a new one if necessary. 354 int LockedGetNamedTokenID(const std::string& name); 355 356 // Called from within the lock, this returns the next sequence task number. 357 int64_t LockedGetNextSequenceTaskNumber(); 358 359 // Gets new task. There are 3 cases depending on the return value: 360 // 361 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should 362 // be run immediately. 363 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run, 364 // and |task| is not filled in. In this case, the caller should wait until 365 // a task is posted. 366 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run 367 // immediately, and |task| is not filled in. Likewise, |wait_time| is 368 // filled in the time to wait until the next task to run. In this case, the 369 // caller should wait the time. 370 // 371 // In any case, the calling code should clear the given 372 // delete_these_outside_lock vector the next time the lock is released. 373 // See the implementation for a more detailed description. 374 GetWorkStatus GetWork(SequencedTask* task, 375 TimeDelta* wait_time, 376 std::vector<Closure>* delete_these_outside_lock); 377 378 void HandleCleanup(); 379 380 // Peforms init and cleanup around running the given task. WillRun... 381 // returns the value from PrepareToStartAdditionalThreadIfNecessary. 382 // The calling code should call FinishStartingAdditionalThread once the 383 // lock is released if the return values is nonzero. 384 int WillRunWorkerTask(const SequencedTask& task); 385 void DidRunWorkerTask(const SequencedTask& task); 386 387 // Returns true if there are no threads currently running the given 388 // sequence token. 389 bool IsSequenceTokenRunnable(int sequence_token_id) const; 390 391 // Checks if all threads are busy and the addition of one more could run an 392 // additional task waiting in the queue. This must be called from within 393 // the lock. 394 // 395 // If another thread is helpful, this will mark the thread as being in the 396 // process of starting and returns the index of the new thread which will be 397 // 0 or more. The caller should then call FinishStartingAdditionalThread to 398 // complete initialization once the lock is released. 399 // 400 // If another thread is not necessary, returne 0; 401 // 402 // See the implementedion for more. 403 int PrepareToStartAdditionalThreadIfHelpful(); 404 405 // The second part of thread creation after 406 // PrepareToStartAdditionalThreadIfHelpful with the thread number it 407 // generated. This actually creates the thread and should be called outside 408 // the lock to avoid blocking important work starting a thread in the lock. 409 void FinishStartingAdditionalThread(int thread_number); 410 411 // Signal |has_work_| and increment |has_work_signal_count_|. 412 void SignalHasWork(); 413 414 // Checks whether there is work left that's blocking shutdown. Must be 415 // called inside the lock. 416 bool CanShutdown() const; 417 418 SequencedWorkerPool* const worker_pool_; 419 420 // The last sequence number used. Managed by GetSequenceToken, since this 421 // only does threadsafe increment operations, you do not need to hold the 422 // lock. This is class-static to make SequenceTokens issued by 423 // GetSequenceToken unique across SequencedWorkerPool instances. 424 static base::StaticAtomicSequenceNumber g_last_sequence_number_; 425 426 // This lock protects |everything in this class|. Do not read or modify 427 // anything without holding this lock. Do not block while holding this 428 // lock. 429 mutable Lock lock_; 430 431 // Condition variable that is waited on by worker threads until new 432 // tasks are posted or shutdown starts. 433 ConditionVariable has_work_cv_; 434 435 // Condition variable that is waited on by non-worker threads (in 436 // Shutdown()) until CanShutdown() goes to true. 437 ConditionVariable can_shutdown_cv_; 438 439 // The maximum number of worker threads we'll create. 440 const size_t max_threads_; 441 442 const std::string thread_name_prefix_; 443 444 // Associates all known sequence token names with their IDs. 445 std::map<std::string, int> named_sequence_tokens_; 446 447 // Owning pointers to all threads we've created so far, indexed by 448 // ID. Since we lazily create threads, this may be less than 449 // max_threads_ and will be initially empty. 450 using ThreadMap = std::map<PlatformThreadId, std::unique_ptr<Worker>>; 451 ThreadMap threads_; 452 453 // Set to true when we're in the process of creating another thread. 454 // See PrepareToStartAdditionalThreadIfHelpful for more. 455 bool thread_being_created_; 456 457 // Number of threads currently waiting for work. 458 size_t waiting_thread_count_; 459 460 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN 461 // or SKIP_ON_SHUTDOWN flag set. 462 size_t blocking_shutdown_thread_count_; 463 464 // A set of all pending tasks in time-to-run order. These are tasks that are 465 // either waiting for a thread to run on, waiting for their time to run, 466 // or blocked on a previous task in their sequence. We have to iterate over 467 // the tasks by time-to-run order, so we use the set instead of the 468 // traditional priority_queue. 469 typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet; 470 PendingTaskSet pending_tasks_; 471 472 // The next sequence number for a new sequenced task. 473 int64_t next_sequence_task_number_; 474 475 // Number of tasks in the pending_tasks_ list that are marked as blocking 476 // shutdown. 477 size_t blocking_shutdown_pending_task_count_; 478 479 // Lists all sequence tokens currently executing. 480 std::set<int> current_sequences_; 481 482 // An ID for each posted task to distinguish the task from others in traces. 483 int trace_id_; 484 485 // Set when Shutdown is called and no further tasks should be 486 // allowed, though we may still be running existing tasks. 487 bool shutdown_called_; 488 489 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() 490 // has been called. 491 int max_blocking_tasks_after_shutdown_; 492 493 // State used to cleanup for testing, all guarded by lock_. 494 CleanupState cleanup_state_; 495 size_t cleanup_idlers_; 496 ConditionVariable cleanup_cv_; 497 498 TestingObserver* const testing_observer_; 499 500 DISALLOW_COPY_AND_ASSIGN(Inner); 501 }; 502 503 // Worker definitions --------------------------------------------------------- 504 505 SequencedWorkerPool::Worker::Worker( 506 scoped_refptr<SequencedWorkerPool> worker_pool, 507 int thread_number, 508 const std::string& prefix) 509 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), 510 worker_pool_(std::move(worker_pool)), 511 task_shutdown_behavior_(BLOCK_SHUTDOWN), 512 is_processing_task_(false) { 513 Start(); 514 } 515 516 SequencedWorkerPool::Worker::~Worker() { 517 } 518 519 void SequencedWorkerPool::Worker::Run() { 520 #if defined(OS_WIN) 521 win::ScopedCOMInitializer com_initializer; 522 #endif 523 524 // Store a pointer to this worker in thread local storage for static function 525 // access. 526 DCHECK(!lazy_tls_ptr_.Get().Get()); 527 lazy_tls_ptr_.Get().Set(this); 528 529 // Just jump back to the Inner object to run the thread, since it has all the 530 // tracking information and queues. It might be more natural to implement 531 // using DelegateSimpleThread and have Inner implement the Delegate to avoid 532 // having these worker objects at all, but that method lacks the ability to 533 // send thread-specific information easily to the thread loop. 534 worker_pool_->inner_->ThreadLoop(this); 535 // Release our cyclic reference once we're done. 536 worker_pool_ = nullptr; 537 } 538 539 // static 540 SequencedWorkerPool::Worker* 541 SequencedWorkerPool::Worker::GetForCurrentThread() { 542 // Don't construct lazy instance on check. 543 if (lazy_tls_ptr_ == nullptr) 544 return nullptr; 545 546 return lazy_tls_ptr_.Get().Get(); 547 } 548 549 // static 550 LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky 551 SequencedWorkerPool::Worker::lazy_tls_ptr_ = LAZY_INSTANCE_INITIALIZER; 552 553 // Inner definitions --------------------------------------------------------- 554 555 SequencedWorkerPool::Inner::Inner( 556 SequencedWorkerPool* worker_pool, 557 size_t max_threads, 558 const std::string& thread_name_prefix, 559 TestingObserver* observer) 560 : worker_pool_(worker_pool), 561 lock_(), 562 has_work_cv_(&lock_), 563 can_shutdown_cv_(&lock_), 564 max_threads_(max_threads), 565 thread_name_prefix_(thread_name_prefix), 566 thread_being_created_(false), 567 waiting_thread_count_(0), 568 blocking_shutdown_thread_count_(0), 569 next_sequence_task_number_(0), 570 blocking_shutdown_pending_task_count_(0), 571 trace_id_(0), 572 shutdown_called_(false), 573 max_blocking_tasks_after_shutdown_(0), 574 cleanup_state_(CLEANUP_DONE), 575 cleanup_idlers_(0), 576 cleanup_cv_(&lock_), 577 testing_observer_(observer) {} 578 579 SequencedWorkerPool::Inner::~Inner() { 580 // You must call Shutdown() before destroying the pool. 581 DCHECK(shutdown_called_); 582 583 // Need to explicitly join with the threads before they're destroyed or else 584 // they will be running when our object is half torn down. 585 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 586 it->second->Join(); 587 threads_.clear(); 588 589 if (testing_observer_) 590 testing_observer_->OnDestruct(); 591 } 592 593 // static 594 SequencedWorkerPool::SequenceToken 595 SequencedWorkerPool::Inner::GetSequenceToken() { 596 // Need to add one because StaticAtomicSequenceNumber starts at zero, which 597 // is used as a sentinel value in SequenceTokens. 598 return SequenceToken(g_last_sequence_number_.GetNext() + 1); 599 } 600 601 SequencedWorkerPool::SequenceToken 602 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 603 AutoLock lock(lock_); 604 return SequenceToken(LockedGetNamedTokenID(name)); 605 } 606 607 bool SequencedWorkerPool::Inner::PostTask( 608 const std::string* optional_token_name, 609 SequenceToken sequence_token, 610 WorkerShutdown shutdown_behavior, 611 const tracked_objects::Location& from_here, 612 const Closure& task, 613 TimeDelta delay) { 614 DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN); 615 SequencedTask sequenced(from_here); 616 sequenced.sequence_token_id = sequence_token.id_; 617 sequenced.shutdown_behavior = shutdown_behavior; 618 sequenced.posted_from = from_here; 619 sequenced.task = 620 shutdown_behavior == BLOCK_SHUTDOWN ? 621 base::MakeCriticalClosure(task) : task; 622 sequenced.time_to_run = TimeTicks::Now() + delay; 623 624 int create_thread_id = 0; 625 { 626 AutoLock lock(lock_); 627 if (shutdown_called_) { 628 // Don't allow a new task to be posted if it doesn't block shutdown. 629 if (shutdown_behavior != BLOCK_SHUTDOWN) 630 return false; 631 632 // If the current thread is running a task, and that task doesn't block 633 // shutdown, then it shouldn't be allowed to post any more tasks. 634 ThreadMap::const_iterator found = 635 threads_.find(PlatformThread::CurrentId()); 636 if (found != threads_.end() && found->second->is_processing_task() && 637 found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) { 638 return false; 639 } 640 641 if (max_blocking_tasks_after_shutdown_ <= 0) { 642 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; 643 return false; 644 } 645 max_blocking_tasks_after_shutdown_ -= 1; 646 } 647 648 // The trace_id is used for identifying the task in about:tracing. 649 sequenced.trace_id = trace_id_++; 650 651 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), 652 "SequencedWorkerPool::Inner::PostTask", 653 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))), 654 TRACE_EVENT_FLAG_FLOW_OUT); 655 656 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); 657 658 // Now that we have the lock, apply the named token rules. 659 if (optional_token_name) 660 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); 661 662 pending_tasks_.insert(sequenced); 663 if (shutdown_behavior == BLOCK_SHUTDOWN) 664 blocking_shutdown_pending_task_count_++; 665 666 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 667 } 668 669 // Actually start the additional thread or signal an existing one now that 670 // we're outside the lock. 671 if (create_thread_id) 672 FinishStartingAdditionalThread(create_thread_id); 673 else 674 SignalHasWork(); 675 676 return true; 677 } 678 679 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 680 AutoLock lock(lock_); 681 return ContainsKey(threads_, PlatformThread::CurrentId()); 682 } 683 684 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 685 SequenceToken sequence_token) const { 686 AutoLock lock(lock_); 687 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 688 if (found == threads_.end()) 689 return false; 690 return found->second->is_processing_task() && 691 sequence_token.Equals(found->second->task_sequence_token()); 692 } 693 694 bool SequencedWorkerPool::Inner::IsRunningSequence( 695 SequenceToken sequence_token) const { 696 DCHECK(sequence_token.IsValid()); 697 AutoLock lock(lock_); 698 return !IsSequenceTokenRunnable(sequence_token.id_); 699 } 700 701 void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread( 702 SequenceToken sequence_token, 703 WorkerShutdown shutdown_behavior) { 704 AutoLock lock(lock_); 705 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 706 DCHECK(found != threads_.end()); 707 DCHECK(found->second->is_processing_task()); 708 DCHECK(!found->second->task_sequence_token().IsValid()); 709 found->second->set_running_task_info(sequence_token, shutdown_behavior); 710 711 // Mark the sequence token as in use. 712 bool success = current_sequences_.insert(sequence_token.id_).second; 713 DCHECK(success); 714 } 715 716 // See https://code.google.com/p/chromium/issues/detail?id=168415 717 void SequencedWorkerPool::Inner::CleanupForTesting() { 718 DCHECK(!RunsTasksOnCurrentThread()); 719 base::ThreadRestrictions::ScopedAllowWait allow_wait; 720 AutoLock lock(lock_); 721 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 722 if (shutdown_called_) 723 return; 724 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) 725 return; 726 cleanup_state_ = CLEANUP_REQUESTED; 727 cleanup_idlers_ = 0; 728 has_work_cv_.Signal(); 729 while (cleanup_state_ != CLEANUP_DONE) 730 cleanup_cv_.Wait(); 731 } 732 733 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { 734 SignalHasWork(); 735 } 736 737 void SequencedWorkerPool::Inner::Shutdown( 738 int max_new_blocking_tasks_after_shutdown) { 739 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); 740 { 741 AutoLock lock(lock_); 742 // Cleanup and Shutdown should not be called concurrently. 743 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 744 if (shutdown_called_) 745 return; 746 shutdown_called_ = true; 747 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; 748 749 // Tickle the threads. This will wake up a waiting one so it will know that 750 // it can exit, which in turn will wake up any other waiting ones. 751 SignalHasWork(); 752 753 // There are no pending or running tasks blocking shutdown, we're done. 754 if (CanShutdown()) 755 return; 756 } 757 758 // If we're here, then something is blocking shutdown. So wait for 759 // CanShutdown() to go to true. 760 761 if (testing_observer_) 762 testing_observer_->WillWaitForShutdown(); 763 764 #if !defined(OS_NACL) 765 TimeTicks shutdown_wait_begin = TimeTicks::Now(); 766 #endif 767 768 { 769 base::ThreadRestrictions::ScopedAllowWait allow_wait; 770 AutoLock lock(lock_); 771 while (!CanShutdown()) 772 can_shutdown_cv_.Wait(); 773 } 774 #if !defined(OS_NACL) 775 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", 776 TimeTicks::Now() - shutdown_wait_begin); 777 #endif 778 } 779 780 bool SequencedWorkerPool::Inner::IsShutdownInProgress() { 781 AutoLock lock(lock_); 782 return shutdown_called_; 783 } 784 785 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 786 { 787 AutoLock lock(lock_); 788 DCHECK(thread_being_created_); 789 thread_being_created_ = false; 790 auto result = threads_.insert( 791 std::make_pair(this_worker->tid(), WrapUnique(this_worker))); 792 DCHECK(result.second); 793 794 while (true) { 795 #if defined(OS_MACOSX) 796 base::mac::ScopedNSAutoreleasePool autorelease_pool; 797 #endif 798 799 HandleCleanup(); 800 801 // See GetWork for what delete_these_outside_lock is doing. 802 SequencedTask task; 803 TimeDelta wait_time; 804 std::vector<Closure> delete_these_outside_lock; 805 GetWorkStatus status = 806 GetWork(&task, &wait_time, &delete_these_outside_lock); 807 if (status == GET_WORK_FOUND) { 808 TRACE_EVENT_WITH_FLOW2(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), 809 "SequencedWorkerPool::Inner::ThreadLoop", 810 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))), 811 TRACE_EVENT_FLAG_FLOW_IN, 812 "src_file", task.posted_from.file_name(), 813 "src_func", task.posted_from.function_name()); 814 TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION task_event( 815 task.posted_from.file_name()); 816 int new_thread_id = WillRunWorkerTask(task); 817 { 818 AutoUnlock unlock(lock_); 819 // There may be more work available, so wake up another 820 // worker thread. (Technically not required, since we 821 // already get a signal for each new task, but it doesn't 822 // hurt.) 823 SignalHasWork(); 824 delete_these_outside_lock.clear(); 825 826 // Complete thread creation outside the lock if necessary. 827 if (new_thread_id) 828 FinishStartingAdditionalThread(new_thread_id); 829 830 this_worker->set_running_task_info( 831 SequenceToken(task.sequence_token_id), task.shutdown_behavior); 832 833 tracked_objects::TaskStopwatch stopwatch; 834 stopwatch.Start(); 835 task.task.Run(); 836 stopwatch.Stop(); 837 838 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( 839 task, stopwatch); 840 841 // Update the sequence token in case it has been set from within the 842 // task, so it can be removed from the set of currently running 843 // sequences in DidRunWorkerTask() below. 844 task.sequence_token_id = this_worker->task_sequence_token().id_; 845 846 // Make sure our task is erased outside the lock for the 847 // same reason we do this with delete_these_oustide_lock. 848 // Also, do it before calling reset_running_task_info() so 849 // that sequence-checking from within the task's destructor 850 // still works. 851 task.task = Closure(); 852 853 this_worker->reset_running_task_info(); 854 } 855 DidRunWorkerTask(task); // Must be done inside the lock. 856 } else if (cleanup_state_ == CLEANUP_RUNNING) { 857 switch (status) { 858 case GET_WORK_WAIT: { 859 AutoUnlock unlock(lock_); 860 delete_these_outside_lock.clear(); 861 } 862 break; 863 case GET_WORK_NOT_FOUND: 864 CHECK(delete_these_outside_lock.empty()); 865 cleanup_state_ = CLEANUP_FINISHING; 866 cleanup_cv_.Broadcast(); 867 break; 868 default: 869 NOTREACHED(); 870 } 871 } else { 872 // When we're terminating and there's no more work, we can 873 // shut down, other workers can complete any pending or new tasks. 874 // We can get additional tasks posted after shutdown_called_ is set 875 // but only worker threads are allowed to post tasks at that time, and 876 // the workers responsible for posting those tasks will be available 877 // to run them. Also, there may be some tasks stuck behind running 878 // ones with the same sequence token, but additional threads won't 879 // help this case. 880 if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) { 881 AutoUnlock unlock(lock_); 882 delete_these_outside_lock.clear(); 883 break; 884 } 885 886 // No work was found, but there are tasks that need deletion. The 887 // deletion must happen outside of the lock. 888 if (delete_these_outside_lock.size()) { 889 AutoUnlock unlock(lock_); 890 delete_these_outside_lock.clear(); 891 892 // Since the lock has been released, |status| may no longer be 893 // accurate. It might read GET_WORK_WAIT even if there are tasks 894 // ready to perform work. Jump to the top of the loop to recalculate 895 // |status|. 896 continue; 897 } 898 899 waiting_thread_count_++; 900 901 switch (status) { 902 case GET_WORK_NOT_FOUND: 903 has_work_cv_.Wait(); 904 break; 905 case GET_WORK_WAIT: 906 has_work_cv_.TimedWait(wait_time); 907 break; 908 default: 909 NOTREACHED(); 910 } 911 waiting_thread_count_--; 912 } 913 } 914 } // Release lock_. 915 916 // We noticed we should exit. Wake up the next worker so it knows it should 917 // exit as well (because the Shutdown() code only signals once). 918 SignalHasWork(); 919 920 // Possibly unblock shutdown. 921 can_shutdown_cv_.Signal(); 922 } 923 924 void SequencedWorkerPool::Inner::HandleCleanup() { 925 lock_.AssertAcquired(); 926 if (cleanup_state_ == CLEANUP_DONE) 927 return; 928 if (cleanup_state_ == CLEANUP_REQUESTED) { 929 // We win, we get to do the cleanup as soon as the others wise up and idle. 930 cleanup_state_ = CLEANUP_STARTING; 931 while (thread_being_created_ || 932 cleanup_idlers_ != threads_.size() - 1) { 933 has_work_cv_.Signal(); 934 cleanup_cv_.Wait(); 935 } 936 cleanup_state_ = CLEANUP_RUNNING; 937 return; 938 } 939 if (cleanup_state_ == CLEANUP_STARTING) { 940 // Another worker thread is cleaning up, we idle here until thats done. 941 ++cleanup_idlers_; 942 cleanup_cv_.Broadcast(); 943 while (cleanup_state_ != CLEANUP_FINISHING) { 944 cleanup_cv_.Wait(); 945 } 946 --cleanup_idlers_; 947 cleanup_cv_.Broadcast(); 948 return; 949 } 950 if (cleanup_state_ == CLEANUP_FINISHING) { 951 // We wait for all idlers to wake up prior to being DONE. 952 while (cleanup_idlers_ != 0) { 953 cleanup_cv_.Broadcast(); 954 cleanup_cv_.Wait(); 955 } 956 if (cleanup_state_ == CLEANUP_FINISHING) { 957 cleanup_state_ = CLEANUP_DONE; 958 cleanup_cv_.Signal(); 959 } 960 return; 961 } 962 } 963 964 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( 965 const std::string& name) { 966 lock_.AssertAcquired(); 967 DCHECK(!name.empty()); 968 969 std::map<std::string, int>::const_iterator found = 970 named_sequence_tokens_.find(name); 971 if (found != named_sequence_tokens_.end()) 972 return found->second; // Got an existing one. 973 974 // Create a new one for this name. 975 SequenceToken result = GetSequenceToken(); 976 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); 977 return result.id_; 978 } 979 980 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { 981 lock_.AssertAcquired(); 982 // We assume that we never create enough tasks to wrap around. 983 return next_sequence_task_number_++; 984 } 985 986 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 987 SequencedTask* task, 988 TimeDelta* wait_time, 989 std::vector<Closure>* delete_these_outside_lock) { 990 lock_.AssertAcquired(); 991 992 // Find the next task with a sequence token that's not currently in use. 993 // If the token is in use, that means another thread is running something 994 // in that sequence, and we can't run it without going out-of-order. 995 // 996 // This algorithm is simple and fair, but inefficient in some cases. For 997 // example, say somebody schedules 1000 slow tasks with the same sequence 998 // number. We'll have to go through all those tasks each time we feel like 999 // there might be work to schedule. If this proves to be a problem, we 1000 // should make this more efficient. 1001 // 1002 // One possible enhancement would be to keep a map from sequence ID to a 1003 // list of pending but currently blocked SequencedTasks for that ID. 1004 // When a worker finishes a task of one sequence token, it can pick up the 1005 // next one from that token right away. 1006 // 1007 // This may lead to starvation if there are sufficient numbers of sequences 1008 // in use. To alleviate this, we could add an incrementing priority counter 1009 // to each SequencedTask. Then maintain a priority_queue of all runnable 1010 // tasks, sorted by priority counter. When a sequenced task is completed 1011 // we would pop the head element off of that tasks pending list and add it 1012 // to the priority queue. Then we would run the first item in the priority 1013 // queue. 1014 1015 GetWorkStatus status = GET_WORK_NOT_FOUND; 1016 int unrunnable_tasks = 0; 1017 PendingTaskSet::iterator i = pending_tasks_.begin(); 1018 // We assume that the loop below doesn't take too long and so we can just do 1019 // a single call to TimeTicks::Now(). 1020 const TimeTicks current_time = TimeTicks::Now(); 1021 while (i != pending_tasks_.end()) { 1022 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { 1023 unrunnable_tasks++; 1024 ++i; 1025 continue; 1026 } 1027 1028 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { 1029 // We're shutting down and the task we just found isn't blocking 1030 // shutdown. Delete it and get more work. 1031 // 1032 // Note that we do not want to delete unrunnable tasks. Deleting a task 1033 // can have side effects (like freeing some objects) and deleting a 1034 // task that's supposed to run after one that's currently running could 1035 // cause an obscure crash. 1036 // 1037 // We really want to delete these tasks outside the lock in case the 1038 // closures are holding refs to objects that want to post work from 1039 // their destructorss (which would deadlock). The closures are 1040 // internally refcounted, so we just need to keep a copy of them alive 1041 // until the lock is exited. The calling code can just clear() the 1042 // vector they passed to us once the lock is exited to make this 1043 // happen. 1044 delete_these_outside_lock->push_back(i->task); 1045 pending_tasks_.erase(i++); 1046 continue; 1047 } 1048 1049 if (i->time_to_run > current_time) { 1050 // The time to run has not come yet. 1051 *wait_time = i->time_to_run - current_time; 1052 status = GET_WORK_WAIT; 1053 if (cleanup_state_ == CLEANUP_RUNNING) { 1054 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. 1055 delete_these_outside_lock->push_back(i->task); 1056 pending_tasks_.erase(i); 1057 } 1058 break; 1059 } 1060 1061 // Found a runnable task. 1062 *task = *i; 1063 pending_tasks_.erase(i); 1064 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { 1065 blocking_shutdown_pending_task_count_--; 1066 } 1067 1068 status = GET_WORK_FOUND; 1069 break; 1070 } 1071 1072 return status; 1073 } 1074 1075 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { 1076 lock_.AssertAcquired(); 1077 1078 // Mark the task's sequence number as in use. 1079 if (task.sequence_token_id) 1080 current_sequences_.insert(task.sequence_token_id); 1081 1082 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN 1083 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread 1084 // completes. 1085 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) 1086 blocking_shutdown_thread_count_++; 1087 1088 // We just picked up a task. Since StartAdditionalThreadIfHelpful only 1089 // creates a new thread if there is no free one, there is a race when posting 1090 // tasks that many tasks could have been posted before a thread started 1091 // running them, so only one thread would have been created. So we also check 1092 // whether we should create more threads after removing our task from the 1093 // queue, which also has the nice side effect of creating the workers from 1094 // background threads rather than the main thread of the app. 1095 // 1096 // If another thread wasn't created, we want to wake up an existing thread 1097 // if there is one waiting to pick up the next task. 1098 // 1099 // Note that we really need to do this *before* running the task, not 1100 // after. Otherwise, if more than one task is posted, the creation of the 1101 // second thread (since we only create one at a time) will be blocked by 1102 // the execution of the first task, which could be arbitrarily long. 1103 return PrepareToStartAdditionalThreadIfHelpful(); 1104 } 1105 1106 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { 1107 lock_.AssertAcquired(); 1108 1109 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { 1110 DCHECK_GT(blocking_shutdown_thread_count_, 0u); 1111 blocking_shutdown_thread_count_--; 1112 } 1113 1114 if (task.sequence_token_id) 1115 current_sequences_.erase(task.sequence_token_id); 1116 } 1117 1118 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( 1119 int sequence_token_id) const { 1120 lock_.AssertAcquired(); 1121 return !sequence_token_id || 1122 current_sequences_.find(sequence_token_id) == 1123 current_sequences_.end(); 1124 } 1125 1126 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { 1127 lock_.AssertAcquired(); 1128 // How thread creation works: 1129 // 1130 // We'de like to avoid creating threads with the lock held. However, we 1131 // need to be sure that we have an accurate accounting of the threads for 1132 // proper Joining and deltion on shutdown. 1133 // 1134 // We need to figure out if we need another thread with the lock held, which 1135 // is what this function does. It then marks us as in the process of creating 1136 // a thread. When we do shutdown, we wait until the thread_being_created_ 1137 // flag is cleared, which ensures that the new thread is properly added to 1138 // all the data structures and we can't leak it. Once shutdown starts, we'll 1139 // refuse to create more threads or they would be leaked. 1140 // 1141 // Note that this creates a mostly benign race condition on shutdown that 1142 // will cause fewer workers to be created than one would expect. It isn't 1143 // much of an issue in real life, but affects some tests. Since we only spawn 1144 // one worker at a time, the following sequence of events can happen: 1145 // 1146 // 1. Main thread posts a bunch of unrelated tasks that would normally be 1147 // run on separate threads. 1148 // 2. The first task post causes us to start a worker. Other tasks do not 1149 // cause a worker to start since one is pending. 1150 // 3. Main thread initiates shutdown. 1151 // 4. No more threads are created since the shutdown_called_ flag is set. 1152 // 1153 // The result is that one may expect that max_threads_ workers to be created 1154 // given the workload, but in reality fewer may be created because the 1155 // sequence of thread creation on the background threads is racing with the 1156 // shutdown call. 1157 if (!shutdown_called_ && 1158 !thread_being_created_ && 1159 cleanup_state_ == CLEANUP_DONE && 1160 threads_.size() < max_threads_ && 1161 waiting_thread_count_ == 0) { 1162 // We could use an additional thread if there's work to be done. 1163 for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); 1164 i != pending_tasks_.end(); ++i) { 1165 if (IsSequenceTokenRunnable(i->sequence_token_id)) { 1166 // Found a runnable task, mark the thread as being started. 1167 thread_being_created_ = true; 1168 return static_cast<int>(threads_.size() + 1); 1169 } 1170 } 1171 } 1172 return 0; 1173 } 1174 1175 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( 1176 int thread_number) { 1177 // Called outside of the lock. 1178 DCHECK_GT(thread_number, 0); 1179 1180 // The worker is assigned to the list when the thread actually starts, which 1181 // will manage the memory of the pointer. 1182 new Worker(worker_pool_, thread_number, thread_name_prefix_); 1183 } 1184 1185 void SequencedWorkerPool::Inner::SignalHasWork() { 1186 has_work_cv_.Signal(); 1187 if (testing_observer_) { 1188 testing_observer_->OnHasWork(); 1189 } 1190 } 1191 1192 bool SequencedWorkerPool::Inner::CanShutdown() const { 1193 lock_.AssertAcquired(); 1194 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 1195 return !thread_being_created_ && 1196 blocking_shutdown_thread_count_ == 0 && 1197 blocking_shutdown_pending_task_count_ == 0; 1198 } 1199 1200 base::StaticAtomicSequenceNumber 1201 SequencedWorkerPool::Inner::g_last_sequence_number_; 1202 1203 // SequencedWorkerPool -------------------------------------------------------- 1204 1205 std::string SequencedWorkerPool::SequenceToken::ToString() const { 1206 return base::StringPrintf("[%d]", id_); 1207 } 1208 1209 // static 1210 SequencedWorkerPool::SequenceToken 1211 SequencedWorkerPool::GetSequenceTokenForCurrentThread() { 1212 Worker* worker = Worker::GetForCurrentThread(); 1213 if (!worker) 1214 return SequenceToken(); 1215 1216 return worker->task_sequence_token(); 1217 } 1218 1219 // static 1220 scoped_refptr<SequencedWorkerPool> 1221 SequencedWorkerPool::GetWorkerPoolForCurrentThread() { 1222 Worker* worker = Worker::GetForCurrentThread(); 1223 if (!worker) 1224 return nullptr; 1225 1226 return worker->worker_pool(); 1227 } 1228 1229 // static 1230 scoped_refptr<SequencedTaskRunner> 1231 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() { 1232 Worker* worker = Worker::GetForCurrentThread(); 1233 1234 // If there is no worker, this thread is not a worker thread. Otherwise, it is 1235 // currently running a task (sequenced or unsequenced). 1236 if (!worker) 1237 return nullptr; 1238 1239 scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool(); 1240 SequenceToken sequence_token = worker->task_sequence_token(); 1241 WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior(); 1242 if (!sequence_token.IsValid()) { 1243 // Create a new sequence token and bind this thread to it, to make sure that 1244 // a task posted to the SequencedTaskRunner we are going to return is not 1245 // immediately going to run on a different thread. 1246 sequence_token = Inner::GetSequenceToken(); 1247 pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token, 1248 shutdown_behavior); 1249 } 1250 1251 DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token)); 1252 return new SequencedWorkerPoolSequencedTaskRunner( 1253 std::move(pool), sequence_token, shutdown_behavior); 1254 } 1255 1256 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1257 const std::string& thread_name_prefix) 1258 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1259 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { 1260 } 1261 1262 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 1263 const std::string& thread_name_prefix, 1264 TestingObserver* observer) 1265 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), 1266 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { 1267 } 1268 1269 SequencedWorkerPool::~SequencedWorkerPool() {} 1270 1271 void SequencedWorkerPool::OnDestruct() const { 1272 // Avoid deleting ourselves on a worker thread (which would deadlock). 1273 if (RunsTasksOnCurrentThread()) { 1274 constructor_task_runner_->DeleteSoon(FROM_HERE, this); 1275 } else { 1276 delete this; 1277 } 1278 } 1279 1280 // static 1281 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 1282 return Inner::GetSequenceToken(); 1283 } 1284 1285 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 1286 const std::string& name) { 1287 return inner_->GetNamedSequenceToken(name); 1288 } 1289 1290 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( 1291 SequenceToken token) { 1292 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); 1293 } 1294 1295 scoped_refptr<SequencedTaskRunner> 1296 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior( 1297 SequenceToken token, WorkerShutdown shutdown_behavior) { 1298 return new SequencedWorkerPoolSequencedTaskRunner( 1299 this, token, shutdown_behavior); 1300 } 1301 1302 scoped_refptr<TaskRunner> 1303 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior( 1304 WorkerShutdown shutdown_behavior) { 1305 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior); 1306 } 1307 1308 bool SequencedWorkerPool::PostWorkerTask( 1309 const tracked_objects::Location& from_here, 1310 const Closure& task) { 1311 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, 1312 from_here, task, TimeDelta()); 1313 } 1314 1315 bool SequencedWorkerPool::PostDelayedWorkerTask( 1316 const tracked_objects::Location& from_here, 1317 const Closure& task, 1318 TimeDelta delay) { 1319 WorkerShutdown shutdown_behavior = 1320 delay.is_zero() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1321 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, 1322 from_here, task, delay); 1323 } 1324 1325 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( 1326 const tracked_objects::Location& from_here, 1327 const Closure& task, 1328 WorkerShutdown shutdown_behavior) { 1329 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, 1330 from_here, task, TimeDelta()); 1331 } 1332 1333 bool SequencedWorkerPool::PostSequencedWorkerTask( 1334 SequenceToken sequence_token, 1335 const tracked_objects::Location& from_here, 1336 const Closure& task) { 1337 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN, 1338 from_here, task, TimeDelta()); 1339 } 1340 1341 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask( 1342 SequenceToken sequence_token, 1343 const tracked_objects::Location& from_here, 1344 const Closure& task, 1345 TimeDelta delay) { 1346 WorkerShutdown shutdown_behavior = 1347 delay.is_zero() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1348 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, 1349 from_here, task, delay); 1350 } 1351 1352 bool SequencedWorkerPool::PostNamedSequencedWorkerTask( 1353 const std::string& token_name, 1354 const tracked_objects::Location& from_here, 1355 const Closure& task) { 1356 DCHECK(!token_name.empty()); 1357 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN, 1358 from_here, task, TimeDelta()); 1359 } 1360 1361 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( 1362 SequenceToken sequence_token, 1363 const tracked_objects::Location& from_here, 1364 const Closure& task, 1365 WorkerShutdown shutdown_behavior) { 1366 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, 1367 from_here, task, TimeDelta()); 1368 } 1369 1370 bool SequencedWorkerPool::PostDelayedTask( 1371 const tracked_objects::Location& from_here, 1372 const Closure& task, 1373 TimeDelta delay) { 1374 return PostDelayedWorkerTask(from_here, task, delay); 1375 } 1376 1377 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1378 return inner_->RunsTasksOnCurrentThread(); 1379 } 1380 1381 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1382 SequenceToken sequence_token) const { 1383 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1384 } 1385 1386 bool SequencedWorkerPool::IsRunningSequence( 1387 SequenceToken sequence_token) const { 1388 return inner_->IsRunningSequence(sequence_token); 1389 } 1390 1391 void SequencedWorkerPool::FlushForTesting() { 1392 inner_->CleanupForTesting(); 1393 } 1394 1395 void SequencedWorkerPool::SignalHasWorkForTesting() { 1396 inner_->SignalHasWorkForTesting(); 1397 } 1398 1399 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1400 DCHECK(constructor_task_runner_->BelongsToCurrentThread()); 1401 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1402 } 1403 1404 bool SequencedWorkerPool::IsShutdownInProgress() { 1405 return inner_->IsShutdownInProgress(); 1406 } 1407 1408 } // namespace base 1409