1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/threading/sequenced_worker_pool.h" 6 7 #include <algorithm> 8 9 #include "base/bind.h" 10 #include "base/compiler_specific.h" 11 #include "base/memory/ref_counted.h" 12 #include "base/memory/scoped_ptr.h" 13 #include "base/message_loop/message_loop.h" 14 #include "base/message_loop/message_loop_proxy.h" 15 #include "base/synchronization/condition_variable.h" 16 #include "base/synchronization/lock.h" 17 #include "base/test/sequenced_task_runner_test_template.h" 18 #include "base/test/sequenced_worker_pool_owner.h" 19 #include "base/test/task_runner_test_template.h" 20 #include "base/test/test_timeouts.h" 21 #include "base/threading/platform_thread.h" 22 #include "base/time/time.h" 23 #include "base/tracked_objects.h" 24 #include "testing/gtest/include/gtest/gtest.h" 25 26 namespace base { 27 28 // IMPORTANT NOTE: 29 // 30 // Many of these tests have failure modes where they'll hang forever. These 31 // tests should not be flaky, and hanging indicates a type of failure. Do not 32 // mark as flaky if they're hanging, it's likely an actual bug. 33 34 namespace { 35 36 const size_t kNumWorkerThreads = 3; 37 38 // Allows a number of threads to all be blocked on the same event, and 39 // provides a way to unblock a certain number of them. 40 class ThreadBlocker { 41 public: 42 ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {} 43 44 void Block() { 45 { 46 base::AutoLock lock(lock_); 47 while (unblock_counter_ == 0) 48 cond_var_.Wait(); 49 unblock_counter_--; 50 } 51 cond_var_.Signal(); 52 } 53 54 void Unblock(size_t count) { 55 { 56 base::AutoLock lock(lock_); 57 DCHECK(unblock_counter_ == 0); 58 unblock_counter_ = count; 59 } 60 cond_var_.Signal(); 61 } 62 63 private: 64 base::Lock lock_; 65 base::ConditionVariable cond_var_; 66 67 size_t unblock_counter_; 68 }; 69 70 class TestTracker : public base::RefCountedThreadSafe<TestTracker> { 71 public: 72 TestTracker() 73 : lock_(), 74 cond_var_(&lock_), 75 started_events_(0) { 76 } 77 78 // Each of these tasks appends the argument to the complete sequence vector 79 // so calling code can see what order they finished in. 80 void FastTask(int id) { 81 SignalWorkerDone(id); 82 } 83 84 void SlowTask(int id) { 85 base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1)); 86 SignalWorkerDone(id); 87 } 88 89 void BlockTask(int id, ThreadBlocker* blocker) { 90 // Note that this task has started and signal anybody waiting for that 91 // to happen. 92 { 93 base::AutoLock lock(lock_); 94 started_events_++; 95 } 96 cond_var_.Signal(); 97 98 blocker->Block(); 99 SignalWorkerDone(id); 100 } 101 102 void PostAdditionalTasks( 103 int id, SequencedWorkerPool* pool, 104 bool expected_return_value) { 105 Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100); 106 EXPECT_EQ(expected_return_value, 107 pool->PostWorkerTaskWithShutdownBehavior( 108 FROM_HERE, fast_task, 109 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); 110 EXPECT_EQ(expected_return_value, 111 pool->PostWorkerTaskWithShutdownBehavior( 112 FROM_HERE, fast_task, 113 SequencedWorkerPool::SKIP_ON_SHUTDOWN)); 114 pool->PostWorkerTaskWithShutdownBehavior( 115 FROM_HERE, fast_task, 116 SequencedWorkerPool::BLOCK_SHUTDOWN); 117 SignalWorkerDone(id); 118 } 119 120 // Waits until the given number of tasks have started executing. 121 void WaitUntilTasksBlocked(size_t count) { 122 { 123 base::AutoLock lock(lock_); 124 while (started_events_ < count) 125 cond_var_.Wait(); 126 } 127 cond_var_.Signal(); 128 } 129 130 // Blocks the current thread until at least the given number of tasks are in 131 // the completed vector, and then returns a copy. 132 std::vector<int> WaitUntilTasksComplete(size_t num_tasks) { 133 std::vector<int> ret; 134 { 135 base::AutoLock lock(lock_); 136 while (complete_sequence_.size() < num_tasks) 137 cond_var_.Wait(); 138 ret = complete_sequence_; 139 } 140 cond_var_.Signal(); 141 return ret; 142 } 143 144 size_t GetTasksCompletedCount() { 145 base::AutoLock lock(lock_); 146 return complete_sequence_.size(); 147 } 148 149 void ClearCompleteSequence() { 150 base::AutoLock lock(lock_); 151 complete_sequence_.clear(); 152 started_events_ = 0; 153 } 154 155 private: 156 friend class base::RefCountedThreadSafe<TestTracker>; 157 ~TestTracker() {} 158 159 void SignalWorkerDone(int id) { 160 { 161 base::AutoLock lock(lock_); 162 complete_sequence_.push_back(id); 163 } 164 cond_var_.Signal(); 165 } 166 167 // Protects the complete_sequence. 168 base::Lock lock_; 169 170 base::ConditionVariable cond_var_; 171 172 // Protected by lock_. 173 std::vector<int> complete_sequence_; 174 175 // Counter of the number of "block" workers that have started. 176 size_t started_events_; 177 }; 178 179 class SequencedWorkerPoolTest : public testing::Test { 180 public: 181 SequencedWorkerPoolTest() 182 : tracker_(new TestTracker) { 183 ResetPool(); 184 } 185 186 virtual ~SequencedWorkerPoolTest() {} 187 188 virtual void SetUp() OVERRIDE {} 189 190 virtual void TearDown() OVERRIDE { 191 pool()->Shutdown(); 192 } 193 194 const scoped_refptr<SequencedWorkerPool>& pool() { 195 return pool_owner_->pool(); 196 } 197 TestTracker* tracker() { return tracker_.get(); } 198 199 // Destroys the SequencedWorkerPool instance, blocking until it is fully shut 200 // down, and creates a new instance. 201 void ResetPool() { 202 pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test")); 203 } 204 205 void SetWillWaitForShutdownCallback(const Closure& callback) { 206 pool_owner_->SetWillWaitForShutdownCallback(callback); 207 } 208 209 // Ensures that the given number of worker threads is created by adding 210 // tasks and waiting until they complete. Worker thread creation is 211 // serialized, can happen on background threads asynchronously, and doesn't 212 // happen any more at shutdown. This means that if a test posts a bunch of 213 // tasks and calls shutdown, fewer workers will be created than the test may 214 // expect. 215 // 216 // This function ensures that this condition can't happen so tests can make 217 // assumptions about the number of workers active. See the comment in 218 // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more 219 // details. 220 // 221 // It will post tasks to the queue with id -1. It also assumes this is the 222 // first thing called in a test since it will clear the complete_sequence_. 223 void EnsureAllWorkersCreated() { 224 // Create a bunch of threads, all waiting. This will cause that may 225 // workers to be created. 226 ThreadBlocker blocker; 227 for (size_t i = 0; i < kNumWorkerThreads; i++) { 228 pool()->PostWorkerTask(FROM_HERE, 229 base::Bind(&TestTracker::BlockTask, 230 tracker(), -1, &blocker)); 231 } 232 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); 233 234 // Now wake them up and wait until they're done. 235 blocker.Unblock(kNumWorkerThreads); 236 tracker()->WaitUntilTasksComplete(kNumWorkerThreads); 237 238 // Clean up the task IDs we added. 239 tracker()->ClearCompleteSequence(); 240 } 241 242 int has_work_call_count() const { 243 return pool_owner_->has_work_call_count(); 244 } 245 246 private: 247 MessageLoop message_loop_; 248 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_; 249 const scoped_refptr<TestTracker> tracker_; 250 }; 251 252 // Checks that the given number of entries are in the tasks to complete of 253 // the given tracker, and then signals the given event the given number of 254 // times. This is used to wakt up blocked background threads before blocking 255 // on shutdown. 256 void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker, 257 size_t expected_tasks_to_complete, 258 ThreadBlocker* blocker, 259 size_t threads_to_awake) { 260 EXPECT_EQ( 261 expected_tasks_to_complete, 262 tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size()); 263 264 blocker->Unblock(threads_to_awake); 265 } 266 267 class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> { 268 public: 269 explicit DeletionHelper( 270 const scoped_refptr<base::RefCountedData<bool> >& deleted_flag) 271 : deleted_flag_(deleted_flag) { 272 } 273 274 private: 275 friend class base::RefCountedThreadSafe<DeletionHelper>; 276 virtual ~DeletionHelper() { deleted_flag_->data = true; } 277 278 const scoped_refptr<base::RefCountedData<bool> > deleted_flag_; 279 DISALLOW_COPY_AND_ASSIGN(DeletionHelper); 280 }; 281 282 void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool, 283 const scoped_refptr<DeletionHelper>& helper) { 284 ADD_FAILURE() << "Should never run"; 285 } 286 287 // Tests that delayed tasks are deleted upon shutdown of the pool. 288 TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) { 289 // Post something to verify the pool is started up. 290 EXPECT_TRUE(pool()->PostTask( 291 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1))); 292 293 scoped_refptr<base::RefCountedData<bool> > deleted_flag( 294 new base::RefCountedData<bool>(false)); 295 296 base::Time posted_at(base::Time::Now()); 297 // Post something that shouldn't run. 298 EXPECT_TRUE(pool()->PostDelayedTask( 299 FROM_HERE, 300 base::Bind(&HoldPoolReference, 301 pool(), 302 make_scoped_refptr(new DeletionHelper(deleted_flag))), 303 TestTimeouts::action_timeout())); 304 305 std::vector<int> completion_sequence = tracker()->WaitUntilTasksComplete(1); 306 ASSERT_EQ(1u, completion_sequence.size()); 307 ASSERT_EQ(1, completion_sequence[0]); 308 309 pool()->Shutdown(); 310 // Shutdown is asynchronous, so use ResetPool() to block until the pool is 311 // fully destroyed (and thus shut down). 312 ResetPool(); 313 314 // Verify that we didn't block until the task was due. 315 ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout()); 316 317 // Verify that the deferred task has not only not run, but has also been 318 // destroyed. 319 ASSERT_TRUE(deleted_flag->data); 320 } 321 322 // Tests that same-named tokens have the same ID. 323 TEST_F(SequencedWorkerPoolTest, NamedTokens) { 324 const std::string name1("hello"); 325 SequencedWorkerPool::SequenceToken token1 = 326 pool()->GetNamedSequenceToken(name1); 327 328 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); 329 330 const std::string name3("goodbye"); 331 SequencedWorkerPool::SequenceToken token3 = 332 pool()->GetNamedSequenceToken(name3); 333 334 // All 3 tokens should be different. 335 EXPECT_FALSE(token1.Equals(token2)); 336 EXPECT_FALSE(token1.Equals(token3)); 337 EXPECT_FALSE(token2.Equals(token3)); 338 339 // Requesting the same name again should give the same value. 340 SequencedWorkerPool::SequenceToken token1again = 341 pool()->GetNamedSequenceToken(name1); 342 EXPECT_TRUE(token1.Equals(token1again)); 343 344 SequencedWorkerPool::SequenceToken token3again = 345 pool()->GetNamedSequenceToken(name3); 346 EXPECT_TRUE(token3.Equals(token3again)); 347 } 348 349 // Tests that posting a bunch of tasks (many more than the number of worker 350 // threads) runs them all. 351 TEST_F(SequencedWorkerPoolTest, LotsOfTasks) { 352 pool()->PostWorkerTask(FROM_HERE, 353 base::Bind(&TestTracker::SlowTask, tracker(), 0)); 354 355 const size_t kNumTasks = 20; 356 for (size_t i = 1; i < kNumTasks; i++) { 357 pool()->PostWorkerTask(FROM_HERE, 358 base::Bind(&TestTracker::FastTask, tracker(), i)); 359 } 360 361 std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks); 362 EXPECT_EQ(kNumTasks, result.size()); 363 } 364 365 // Tests that posting a bunch of tasks (many more than the number of 366 // worker threads) to two pools simultaneously runs them all twice. 367 // This test is meant to shake out any concurrency issues between 368 // pools (like histograms). 369 TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) { 370 SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1"); 371 SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2"); 372 373 base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0); 374 pool1.pool()->PostWorkerTask(FROM_HERE, slow_task); 375 pool2.pool()->PostWorkerTask(FROM_HERE, slow_task); 376 377 const size_t kNumTasks = 20; 378 for (size_t i = 1; i < kNumTasks; i++) { 379 base::Closure fast_task = 380 base::Bind(&TestTracker::FastTask, tracker(), i); 381 pool1.pool()->PostWorkerTask(FROM_HERE, fast_task); 382 pool2.pool()->PostWorkerTask(FROM_HERE, fast_task); 383 } 384 385 std::vector<int> result = 386 tracker()->WaitUntilTasksComplete(2*kNumTasks); 387 EXPECT_EQ(2 * kNumTasks, result.size()); 388 389 pool2.pool()->Shutdown(); 390 pool1.pool()->Shutdown(); 391 } 392 393 // Test that tasks with the same sequence token are executed in order but don't 394 // affect other tasks. 395 TEST_F(SequencedWorkerPoolTest, Sequence) { 396 // Fill all the worker threads except one. 397 const size_t kNumBackgroundTasks = kNumWorkerThreads - 1; 398 ThreadBlocker background_blocker; 399 for (size_t i = 0; i < kNumBackgroundTasks; i++) { 400 pool()->PostWorkerTask(FROM_HERE, 401 base::Bind(&TestTracker::BlockTask, 402 tracker(), i, &background_blocker)); 403 } 404 tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks); 405 406 // Create two tasks with the same sequence token, one that will block on the 407 // event, and one which will just complete quickly when it's run. Since there 408 // is one worker thread free, the first task will start and then block, and 409 // the second task should be waiting. 410 ThreadBlocker blocker; 411 SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken(); 412 pool()->PostSequencedWorkerTask( 413 token1, FROM_HERE, 414 base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker)); 415 pool()->PostSequencedWorkerTask( 416 token1, FROM_HERE, 417 base::Bind(&TestTracker::FastTask, tracker(), 101)); 418 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); 419 420 // Create another two tasks as above with a different token. These will be 421 // blocked since there are no slots to run. 422 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); 423 pool()->PostSequencedWorkerTask( 424 token2, FROM_HERE, 425 base::Bind(&TestTracker::FastTask, tracker(), 200)); 426 pool()->PostSequencedWorkerTask( 427 token2, FROM_HERE, 428 base::Bind(&TestTracker::FastTask, tracker(), 201)); 429 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); 430 431 // Let one background task complete. This should then let both tasks of 432 // token2 run to completion in order. The second task of token1 should still 433 // be blocked. 434 background_blocker.Unblock(1); 435 std::vector<int> result = tracker()->WaitUntilTasksComplete(3); 436 ASSERT_EQ(3u, result.size()); 437 EXPECT_EQ(200, result[1]); 438 EXPECT_EQ(201, result[2]); 439 440 // Finish the rest of the background tasks. This should leave some workers 441 // free with the second token1 task still blocked on the first. 442 background_blocker.Unblock(kNumBackgroundTasks - 1); 443 EXPECT_EQ(kNumBackgroundTasks + 2, 444 tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size()); 445 446 // Allow the first task of token1 to complete. This should run the second. 447 blocker.Unblock(1); 448 result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4); 449 ASSERT_EQ(kNumBackgroundTasks + 4, result.size()); 450 EXPECT_EQ(100, result[result.size() - 2]); 451 EXPECT_EQ(101, result[result.size() - 1]); 452 } 453 454 // Tests that any tasks posted after Shutdown are ignored. 455 // Disabled for flakiness. See http://crbug.com/166451. 456 TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) { 457 // Start tasks to take all the threads and block them. 458 EnsureAllWorkersCreated(); 459 ThreadBlocker blocker; 460 for (size_t i = 0; i < kNumWorkerThreads; i++) { 461 pool()->PostWorkerTask(FROM_HERE, 462 base::Bind(&TestTracker::BlockTask, 463 tracker(), i, &blocker)); 464 } 465 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); 466 467 SetWillWaitForShutdownCallback( 468 base::Bind(&EnsureTasksToCompleteCountAndUnblock, 469 scoped_refptr<TestTracker>(tracker()), 0, 470 &blocker, kNumWorkerThreads)); 471 472 // Shutdown the worker pool. This should discard all non-blocking tasks. 473 const int kMaxNewBlockingTasksAfterShutdown = 100; 474 pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown); 475 476 int old_has_work_call_count = has_work_call_count(); 477 478 std::vector<int> result = 479 tracker()->WaitUntilTasksComplete(kNumWorkerThreads); 480 481 // The kNumWorkerThread items should have completed, in no particular order. 482 ASSERT_EQ(kNumWorkerThreads, result.size()); 483 for (size_t i = 0; i < kNumWorkerThreads; i++) { 484 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) != 485 result.end()); 486 } 487 488 // No further tasks, regardless of shutdown mode, should be allowed. 489 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( 490 FROM_HERE, 491 base::Bind(&TestTracker::FastTask, tracker(), 100), 492 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); 493 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( 494 FROM_HERE, 495 base::Bind(&TestTracker::FastTask, tracker(), 101), 496 SequencedWorkerPool::SKIP_ON_SHUTDOWN)); 497 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( 498 FROM_HERE, 499 base::Bind(&TestTracker::FastTask, tracker(), 102), 500 SequencedWorkerPool::BLOCK_SHUTDOWN)); 501 502 ASSERT_EQ(old_has_work_call_count, has_work_call_count()); 503 } 504 505 TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) { 506 // Test that <n> new blocking tasks are allowed provided they're posted 507 // by a running tasks. 508 EnsureAllWorkersCreated(); 509 ThreadBlocker blocker; 510 511 // Start tasks to take all the threads and block them. 512 const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads); 513 for (int i = 0; i < kNumBlockTasks; ++i) { 514 EXPECT_TRUE(pool()->PostWorkerTask( 515 FROM_HERE, 516 base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker))); 517 } 518 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); 519 520 // Queue up shutdown blocking tasks behind those which will attempt to post 521 // additional tasks when run, PostAdditionalTasks attemtps to post 3 522 // new FastTasks, one for each shutdown_behavior. 523 const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads); 524 for (int i = 0; i < kNumQueuedTasks; ++i) { 525 EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior( 526 FROM_HERE, 527 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(), 528 false), 529 SequencedWorkerPool::BLOCK_SHUTDOWN)); 530 } 531 532 // Setup to open the floodgates from within Shutdown(). 533 SetWillWaitForShutdownCallback( 534 base::Bind(&EnsureTasksToCompleteCountAndUnblock, 535 scoped_refptr<TestTracker>(tracker()), 536 0, &blocker, kNumBlockTasks)); 537 538 // Allow half of the additional blocking tasks thru. 539 const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2; 540 pool()->Shutdown(kNumNewBlockingTasksToAllow); 541 542 // Ensure that the correct number of tasks actually got run. 543 tracker()->WaitUntilTasksComplete(static_cast<size_t>( 544 kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow)); 545 546 // Clean up the task IDs we added and go home. 547 tracker()->ClearCompleteSequence(); 548 } 549 550 // Tests that unrun tasks are discarded properly according to their shutdown 551 // mode. 552 TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { 553 // Start tasks to take all the threads and block them. 554 EnsureAllWorkersCreated(); 555 ThreadBlocker blocker; 556 for (size_t i = 0; i < kNumWorkerThreads; i++) { 557 pool()->PostWorkerTask(FROM_HERE, 558 base::Bind(&TestTracker::BlockTask, 559 tracker(), i, &blocker)); 560 } 561 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); 562 563 // Create some tasks with different shutdown modes. 564 pool()->PostWorkerTaskWithShutdownBehavior( 565 FROM_HERE, 566 base::Bind(&TestTracker::FastTask, tracker(), 100), 567 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); 568 pool()->PostWorkerTaskWithShutdownBehavior( 569 FROM_HERE, 570 base::Bind(&TestTracker::FastTask, tracker(), 101), 571 SequencedWorkerPool::SKIP_ON_SHUTDOWN); 572 pool()->PostWorkerTaskWithShutdownBehavior( 573 FROM_HERE, 574 base::Bind(&TestTracker::FastTask, tracker(), 102), 575 SequencedWorkerPool::BLOCK_SHUTDOWN); 576 577 // Shutdown the worker pool. This should discard all non-blocking tasks. 578 SetWillWaitForShutdownCallback( 579 base::Bind(&EnsureTasksToCompleteCountAndUnblock, 580 scoped_refptr<TestTracker>(tracker()), 0, 581 &blocker, kNumWorkerThreads)); 582 pool()->Shutdown(); 583 584 std::vector<int> result = 585 tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1); 586 587 // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN 588 // one, in no particular order. 589 ASSERT_EQ(kNumWorkerThreads + 1, result.size()); 590 for (size_t i = 0; i < kNumWorkerThreads; i++) { 591 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) != 592 result.end()); 593 } 594 EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end()); 595 } 596 597 // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown. 598 TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) { 599 scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior( 600 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); 601 scoped_refptr<SequencedTaskRunner> sequenced_runner( 602 pool()->GetSequencedTaskRunnerWithShutdownBehavior( 603 pool()->GetSequenceToken(), 604 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); 605 EnsureAllWorkersCreated(); 606 ThreadBlocker blocker; 607 pool()->PostWorkerTaskWithShutdownBehavior( 608 FROM_HERE, 609 base::Bind(&TestTracker::BlockTask, 610 tracker(), 0, &blocker), 611 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); 612 runner->PostTask( 613 FROM_HERE, 614 base::Bind(&TestTracker::BlockTask, 615 tracker(), 1, &blocker)); 616 sequenced_runner->PostTask( 617 FROM_HERE, 618 base::Bind(&TestTracker::BlockTask, 619 tracker(), 2, &blocker)); 620 621 tracker()->WaitUntilTasksBlocked(3); 622 623 // This should not block. If this test hangs, it means it failed. 624 pool()->Shutdown(); 625 626 // The task should not have completed yet. 627 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); 628 629 // Posting more tasks should fail. 630 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( 631 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0), 632 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); 633 EXPECT_FALSE(runner->PostTask( 634 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0))); 635 EXPECT_FALSE(sequenced_runner->PostTask( 636 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0))); 637 638 // Continue the background thread and make sure the tasks can complete. 639 blocker.Unblock(3); 640 std::vector<int> result = tracker()->WaitUntilTasksComplete(3); 641 EXPECT_EQ(3u, result.size()); 642 } 643 644 // Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown 645 // until they stop, but tasks not yet started do not. 646 TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) { 647 // Start tasks to take all the threads and block them. 648 EnsureAllWorkersCreated(); 649 ThreadBlocker blocker; 650 651 // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not 652 // return until these tasks have completed. 653 for (size_t i = 0; i < kNumWorkerThreads; i++) { 654 pool()->PostWorkerTaskWithShutdownBehavior( 655 FROM_HERE, 656 base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker), 657 SequencedWorkerPool::SKIP_ON_SHUTDOWN); 658 } 659 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); 660 661 // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be 662 // executed once Shutdown() has been called. 663 pool()->PostWorkerTaskWithShutdownBehavior( 664 FROM_HERE, 665 base::Bind(&TestTracker::BlockTask, 666 tracker(), 0, &blocker), 667 SequencedWorkerPool::SKIP_ON_SHUTDOWN); 668 669 // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have 670 // been started block shutdown. 671 SetWillWaitForShutdownCallback( 672 base::Bind(&EnsureTasksToCompleteCountAndUnblock, 673 scoped_refptr<TestTracker>(tracker()), 0, 674 &blocker, kNumWorkerThreads)); 675 676 // No tasks should have completed yet. 677 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); 678 679 // This should not block. If this test hangs, it means it failed. 680 pool()->Shutdown(); 681 682 // Shutdown should not return until all of the tasks have completed. 683 std::vector<int> result = 684 tracker()->WaitUntilTasksComplete(kNumWorkerThreads); 685 686 // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be 687 // allowed to complete. No additional non-blocking tasks should have been 688 // started. 689 ASSERT_EQ(kNumWorkerThreads, result.size()); 690 for (size_t i = 0; i < kNumWorkerThreads; i++) { 691 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) != 692 result.end()); 693 } 694 } 695 696 // Ensure all worker threads are created, and then trigger a spurious 697 // work signal. This shouldn't cause any other work signals to be 698 // triggered. This is a regression test for http://crbug.com/117469. 699 TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) { 700 EnsureAllWorkersCreated(); 701 int old_has_work_call_count = has_work_call_count(); 702 pool()->SignalHasWorkForTesting(); 703 // This is inherently racy, but can only produce false positives. 704 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100)); 705 EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count()); 706 } 707 708 void IsRunningOnCurrentThreadTask( 709 SequencedWorkerPool::SequenceToken test_positive_token, 710 SequencedWorkerPool::SequenceToken test_negative_token, 711 SequencedWorkerPool* pool, 712 SequencedWorkerPool* unused_pool) { 713 EXPECT_TRUE(pool->RunsTasksOnCurrentThread()); 714 EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token)); 715 EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token)); 716 EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread()); 717 EXPECT_FALSE( 718 unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token)); 719 EXPECT_FALSE( 720 unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token)); 721 } 722 723 // Verify correctness of the IsRunningSequenceOnCurrentThread method. 724 TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) { 725 SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken(); 726 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); 727 SequencedWorkerPool::SequenceToken unsequenced_token; 728 729 scoped_refptr<SequencedWorkerPool> unused_pool = 730 new SequencedWorkerPool(2, "unused_pool"); 731 732 EXPECT_FALSE(pool()->RunsTasksOnCurrentThread()); 733 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1)); 734 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2)); 735 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token)); 736 EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread()); 737 EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1)); 738 EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2)); 739 EXPECT_FALSE( 740 unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token)); 741 742 pool()->PostSequencedWorkerTask( 743 token1, FROM_HERE, 744 base::Bind(&IsRunningOnCurrentThreadTask, 745 token1, token2, pool(), unused_pool)); 746 pool()->PostSequencedWorkerTask( 747 token2, FROM_HERE, 748 base::Bind(&IsRunningOnCurrentThreadTask, 749 token2, unsequenced_token, pool(), unused_pool)); 750 pool()->PostWorkerTask( 751 FROM_HERE, 752 base::Bind(&IsRunningOnCurrentThreadTask, 753 unsequenced_token, token1, pool(), unused_pool)); 754 pool()->Shutdown(); 755 unused_pool->Shutdown(); 756 } 757 758 // Verify that FlushForTesting works as intended. 759 TEST_F(SequencedWorkerPoolTest, FlushForTesting) { 760 // Should be fine to call on a new instance. 761 pool()->FlushForTesting(); 762 763 // Queue up a bunch of work, including a long delayed task and 764 // a task that produces additional tasks as an artifact. 765 pool()->PostDelayedWorkerTask( 766 FROM_HERE, 767 base::Bind(&TestTracker::FastTask, tracker(), 0), 768 TimeDelta::FromMinutes(5)); 769 pool()->PostWorkerTask(FROM_HERE, 770 base::Bind(&TestTracker::SlowTask, tracker(), 0)); 771 const size_t kNumFastTasks = 20; 772 for (size_t i = 0; i < kNumFastTasks; i++) { 773 pool()->PostWorkerTask(FROM_HERE, 774 base::Bind(&TestTracker::FastTask, tracker(), 0)); 775 } 776 pool()->PostWorkerTask( 777 FROM_HERE, 778 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(), 779 true)); 780 781 // We expect all except the delayed task to have been run. We verify all 782 // closures have been deleted by looking at the refcount of the 783 // tracker. 784 EXPECT_FALSE(tracker()->HasOneRef()); 785 pool()->FlushForTesting(); 786 EXPECT_TRUE(tracker()->HasOneRef()); 787 EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount()); 788 789 // Should be fine to call on an idle instance with all threads created, and 790 // spamming the method shouldn't deadlock or confuse the class. 791 pool()->FlushForTesting(); 792 pool()->FlushForTesting(); 793 794 // Should be fine to call after shutdown too. 795 pool()->Shutdown(); 796 pool()->FlushForTesting(); 797 } 798 799 TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) { 800 MessageLoop loop; 801 scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool")); 802 scoped_refptr<SequencedTaskRunner> task_runner = 803 pool->GetSequencedTaskRunnerWithShutdownBehavior( 804 pool->GetSequenceToken(), 805 base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); 806 807 // Upon test exit, should shut down without hanging. 808 pool->Shutdown(); 809 } 810 811 class SequencedWorkerPoolTaskRunnerTestDelegate { 812 public: 813 SequencedWorkerPoolTaskRunnerTestDelegate() {} 814 815 ~SequencedWorkerPoolTaskRunnerTestDelegate() {} 816 817 void StartTaskRunner() { 818 pool_owner_.reset( 819 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); 820 } 821 822 scoped_refptr<SequencedWorkerPool> GetTaskRunner() { 823 return pool_owner_->pool(); 824 } 825 826 void StopTaskRunner() { 827 // Make sure all tasks are run before shutting down. Delayed tasks are 828 // not run, they're simply deleted. 829 pool_owner_->pool()->FlushForTesting(); 830 pool_owner_->pool()->Shutdown(); 831 // Don't reset |pool_owner_| here, as the test may still hold a 832 // reference to the pool. 833 } 834 835 private: 836 MessageLoop message_loop_; 837 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_; 838 }; 839 840 INSTANTIATE_TYPED_TEST_CASE_P( 841 SequencedWorkerPool, TaskRunnerTest, 842 SequencedWorkerPoolTaskRunnerTestDelegate); 843 844 class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate { 845 public: 846 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {} 847 848 ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() { 849 } 850 851 void StartTaskRunner() { 852 pool_owner_.reset( 853 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); 854 task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior( 855 SequencedWorkerPool::BLOCK_SHUTDOWN); 856 } 857 858 scoped_refptr<TaskRunner> GetTaskRunner() { 859 return task_runner_; 860 } 861 862 void StopTaskRunner() { 863 // Make sure all tasks are run before shutting down. Delayed tasks are 864 // not run, they're simply deleted. 865 pool_owner_->pool()->FlushForTesting(); 866 pool_owner_->pool()->Shutdown(); 867 // Don't reset |pool_owner_| here, as the test may still hold a 868 // reference to the pool. 869 } 870 871 private: 872 MessageLoop message_loop_; 873 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_; 874 scoped_refptr<TaskRunner> task_runner_; 875 }; 876 877 INSTANTIATE_TYPED_TEST_CASE_P( 878 SequencedWorkerPoolTaskRunner, TaskRunnerTest, 879 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate); 880 881 class SequencedWorkerPoolSequencedTaskRunnerTestDelegate { 882 public: 883 SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {} 884 885 ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() { 886 } 887 888 void StartTaskRunner() { 889 pool_owner_.reset(new SequencedWorkerPoolOwner( 890 10, "SequencedWorkerPoolSequencedTaskRunnerTest")); 891 task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner( 892 pool_owner_->pool()->GetSequenceToken()); 893 } 894 895 scoped_refptr<SequencedTaskRunner> GetTaskRunner() { 896 return task_runner_; 897 } 898 899 void StopTaskRunner() { 900 // Make sure all tasks are run before shutting down. Delayed tasks are 901 // not run, they're simply deleted. 902 pool_owner_->pool()->FlushForTesting(); 903 pool_owner_->pool()->Shutdown(); 904 // Don't reset |pool_owner_| here, as the test may still hold a 905 // reference to the pool. 906 } 907 908 private: 909 MessageLoop message_loop_; 910 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_; 911 scoped_refptr<SequencedTaskRunner> task_runner_; 912 }; 913 914 INSTANTIATE_TYPED_TEST_CASE_P( 915 SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest, 916 SequencedWorkerPoolSequencedTaskRunnerTestDelegate); 917 918 INSTANTIATE_TYPED_TEST_CASE_P( 919 SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest, 920 SequencedWorkerPoolSequencedTaskRunnerTestDelegate); 921 922 } // namespace 923 924 } // namespace base 925