Home | History | Annotate | Download | only in threading
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "base/threading/sequenced_worker_pool.h"
      6 
      7 #include <algorithm>
      8 
      9 #include "base/bind.h"
     10 #include "base/compiler_specific.h"
     11 #include "base/memory/ref_counted.h"
     12 #include "base/memory/scoped_ptr.h"
     13 #include "base/message_loop/message_loop.h"
     14 #include "base/message_loop/message_loop_proxy.h"
     15 #include "base/synchronization/condition_variable.h"
     16 #include "base/synchronization/lock.h"
     17 #include "base/test/sequenced_task_runner_test_template.h"
     18 #include "base/test/sequenced_worker_pool_owner.h"
     19 #include "base/test/task_runner_test_template.h"
     20 #include "base/test/test_timeouts.h"
     21 #include "base/threading/platform_thread.h"
     22 #include "base/time/time.h"
     23 #include "base/tracked_objects.h"
     24 #include "testing/gtest/include/gtest/gtest.h"
     25 
     26 namespace base {
     27 
     28 // IMPORTANT NOTE:
     29 //
     30 // Many of these tests have failure modes where they'll hang forever. These
     31 // tests should not be flaky, and hanging indicates a type of failure. Do not
     32 // mark as flaky if they're hanging, it's likely an actual bug.
     33 
     34 namespace {
     35 
     36 const size_t kNumWorkerThreads = 3;
     37 
     38 // Allows a number of threads to all be blocked on the same event, and
     39 // provides a way to unblock a certain number of them.
     40 class ThreadBlocker {
     41  public:
     42   ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {}
     43 
     44   void Block() {
     45     {
     46       base::AutoLock lock(lock_);
     47       while (unblock_counter_ == 0)
     48         cond_var_.Wait();
     49       unblock_counter_--;
     50     }
     51     cond_var_.Signal();
     52   }
     53 
     54   void Unblock(size_t count) {
     55     {
     56       base::AutoLock lock(lock_);
     57       DCHECK(unblock_counter_ == 0);
     58       unblock_counter_ = count;
     59     }
     60     cond_var_.Signal();
     61   }
     62 
     63  private:
     64   base::Lock lock_;
     65   base::ConditionVariable cond_var_;
     66 
     67   size_t unblock_counter_;
     68 };
     69 
     70 class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
     71  public:
     72   TestTracker()
     73       : lock_(),
     74         cond_var_(&lock_),
     75         started_events_(0) {
     76   }
     77 
     78   // Each of these tasks appends the argument to the complete sequence vector
     79   // so calling code can see what order they finished in.
     80   void FastTask(int id) {
     81     SignalWorkerDone(id);
     82   }
     83 
     84   void SlowTask(int id) {
     85     base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
     86     SignalWorkerDone(id);
     87   }
     88 
     89   void BlockTask(int id, ThreadBlocker* blocker) {
     90     // Note that this task has started and signal anybody waiting for that
     91     // to happen.
     92     {
     93       base::AutoLock lock(lock_);
     94       started_events_++;
     95     }
     96     cond_var_.Signal();
     97 
     98     blocker->Block();
     99     SignalWorkerDone(id);
    100   }
    101 
    102   void PostAdditionalTasks(
    103         int id, SequencedWorkerPool* pool,
    104         bool expected_return_value) {
    105     Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100);
    106     EXPECT_EQ(expected_return_value,
    107               pool->PostWorkerTaskWithShutdownBehavior(
    108                   FROM_HERE, fast_task,
    109                   SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
    110     EXPECT_EQ(expected_return_value,
    111               pool->PostWorkerTaskWithShutdownBehavior(
    112                   FROM_HERE, fast_task,
    113                   SequencedWorkerPool::SKIP_ON_SHUTDOWN));
    114     pool->PostWorkerTaskWithShutdownBehavior(
    115         FROM_HERE, fast_task,
    116         SequencedWorkerPool::BLOCK_SHUTDOWN);
    117     SignalWorkerDone(id);
    118   }
    119 
    120   // Waits until the given number of tasks have started executing.
    121   void WaitUntilTasksBlocked(size_t count) {
    122     {
    123       base::AutoLock lock(lock_);
    124       while (started_events_ < count)
    125         cond_var_.Wait();
    126     }
    127     cond_var_.Signal();
    128   }
    129 
    130   // Blocks the current thread until at least the given number of tasks are in
    131   // the completed vector, and then returns a copy.
    132   std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
    133     std::vector<int> ret;
    134     {
    135       base::AutoLock lock(lock_);
    136       while (complete_sequence_.size() < num_tasks)
    137         cond_var_.Wait();
    138       ret = complete_sequence_;
    139     }
    140     cond_var_.Signal();
    141     return ret;
    142   }
    143 
    144   size_t GetTasksCompletedCount() {
    145     base::AutoLock lock(lock_);
    146     return complete_sequence_.size();
    147   }
    148 
    149   void ClearCompleteSequence() {
    150     base::AutoLock lock(lock_);
    151     complete_sequence_.clear();
    152     started_events_ = 0;
    153   }
    154 
    155  private:
    156   friend class base::RefCountedThreadSafe<TestTracker>;
    157   ~TestTracker() {}
    158 
    159   void SignalWorkerDone(int id) {
    160     {
    161       base::AutoLock lock(lock_);
    162       complete_sequence_.push_back(id);
    163     }
    164     cond_var_.Signal();
    165   }
    166 
    167   // Protects the complete_sequence.
    168   base::Lock lock_;
    169 
    170   base::ConditionVariable cond_var_;
    171 
    172   // Protected by lock_.
    173   std::vector<int> complete_sequence_;
    174 
    175   // Counter of the number of "block" workers that have started.
    176   size_t started_events_;
    177 };
    178 
    179 class SequencedWorkerPoolTest : public testing::Test {
    180  public:
    181   SequencedWorkerPoolTest()
    182       : tracker_(new TestTracker) {
    183     ResetPool();
    184   }
    185 
    186   virtual ~SequencedWorkerPoolTest() {}
    187 
    188   virtual void SetUp() OVERRIDE {}
    189 
    190   virtual void TearDown() OVERRIDE {
    191     pool()->Shutdown();
    192   }
    193 
    194   const scoped_refptr<SequencedWorkerPool>& pool() {
    195     return pool_owner_->pool();
    196   }
    197   TestTracker* tracker() { return tracker_.get(); }
    198 
    199   // Destroys the SequencedWorkerPool instance, blocking until it is fully shut
    200   // down, and creates a new instance.
    201   void ResetPool() {
    202     pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test"));
    203   }
    204 
    205   void SetWillWaitForShutdownCallback(const Closure& callback) {
    206     pool_owner_->SetWillWaitForShutdownCallback(callback);
    207   }
    208 
    209   // Ensures that the given number of worker threads is created by adding
    210   // tasks and waiting until they complete. Worker thread creation is
    211   // serialized, can happen on background threads asynchronously, and doesn't
    212   // happen any more at shutdown. This means that if a test posts a bunch of
    213   // tasks and calls shutdown, fewer workers will be created than the test may
    214   // expect.
    215   //
    216   // This function ensures that this condition can't happen so tests can make
    217   // assumptions about the number of workers active. See the comment in
    218   // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
    219   // details.
    220   //
    221   // It will post tasks to the queue with id -1. It also assumes this is the
    222   // first thing called in a test since it will clear the complete_sequence_.
    223   void EnsureAllWorkersCreated() {
    224     // Create a bunch of threads, all waiting. This will cause that may
    225     // workers to be created.
    226     ThreadBlocker blocker;
    227     for (size_t i = 0; i < kNumWorkerThreads; i++) {
    228       pool()->PostWorkerTask(FROM_HERE,
    229                              base::Bind(&TestTracker::BlockTask,
    230                                         tracker(), -1, &blocker));
    231     }
    232     tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
    233 
    234     // Now wake them up and wait until they're done.
    235     blocker.Unblock(kNumWorkerThreads);
    236     tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
    237 
    238     // Clean up the task IDs we added.
    239     tracker()->ClearCompleteSequence();
    240   }
    241 
    242   int has_work_call_count() const {
    243     return pool_owner_->has_work_call_count();
    244   }
    245 
    246  private:
    247   MessageLoop message_loop_;
    248   scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
    249   const scoped_refptr<TestTracker> tracker_;
    250 };
    251 
    252 // Checks that the given number of entries are in the tasks to complete of
    253 // the given tracker, and then signals the given event the given number of
    254 // times. This is used to wakt up blocked background threads before blocking
    255 // on shutdown.
    256 void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker,
    257                                           size_t expected_tasks_to_complete,
    258                                           ThreadBlocker* blocker,
    259                                           size_t threads_to_awake) {
    260   EXPECT_EQ(
    261       expected_tasks_to_complete,
    262       tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());
    263 
    264   blocker->Unblock(threads_to_awake);
    265 }
    266 
    267 class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> {
    268  public:
    269   explicit DeletionHelper(
    270       const scoped_refptr<base::RefCountedData<bool> >& deleted_flag)
    271       : deleted_flag_(deleted_flag) {
    272   }
    273 
    274  private:
    275   friend class base::RefCountedThreadSafe<DeletionHelper>;
    276   virtual ~DeletionHelper() { deleted_flag_->data = true; }
    277 
    278   const scoped_refptr<base::RefCountedData<bool> > deleted_flag_;
    279   DISALLOW_COPY_AND_ASSIGN(DeletionHelper);
    280 };
    281 
    282 void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool,
    283                        const scoped_refptr<DeletionHelper>& helper) {
    284   ADD_FAILURE() << "Should never run";
    285 }
    286 
    287 // Tests that delayed tasks are deleted upon shutdown of the pool.
    288 TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) {
    289   // Post something to verify the pool is started up.
    290   EXPECT_TRUE(pool()->PostTask(
    291       FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1)));
    292 
    293   scoped_refptr<base::RefCountedData<bool> > deleted_flag(
    294       new base::RefCountedData<bool>(false));
    295 
    296   base::Time posted_at(base::Time::Now());
    297   // Post something that shouldn't run.
    298   EXPECT_TRUE(pool()->PostDelayedTask(
    299       FROM_HERE,
    300       base::Bind(&HoldPoolReference,
    301                  pool(),
    302                  make_scoped_refptr(new DeletionHelper(deleted_flag))),
    303       TestTimeouts::action_timeout()));
    304 
    305   std::vector<int> completion_sequence = tracker()->WaitUntilTasksComplete(1);
    306   ASSERT_EQ(1u, completion_sequence.size());
    307   ASSERT_EQ(1, completion_sequence[0]);
    308 
    309   pool()->Shutdown();
    310   // Shutdown is asynchronous, so use ResetPool() to block until the pool is
    311   // fully destroyed (and thus shut down).
    312   ResetPool();
    313 
    314   // Verify that we didn't block until the task was due.
    315   ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout());
    316 
    317   // Verify that the deferred task has not only not run, but has also been
    318   // destroyed.
    319   ASSERT_TRUE(deleted_flag->data);
    320 }
    321 
    322 // Tests that same-named tokens have the same ID.
    323 TEST_F(SequencedWorkerPoolTest, NamedTokens) {
    324   const std::string name1("hello");
    325   SequencedWorkerPool::SequenceToken token1 =
    326       pool()->GetNamedSequenceToken(name1);
    327 
    328   SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
    329 
    330   const std::string name3("goodbye");
    331   SequencedWorkerPool::SequenceToken token3 =
    332       pool()->GetNamedSequenceToken(name3);
    333 
    334   // All 3 tokens should be different.
    335   EXPECT_FALSE(token1.Equals(token2));
    336   EXPECT_FALSE(token1.Equals(token3));
    337   EXPECT_FALSE(token2.Equals(token3));
    338 
    339   // Requesting the same name again should give the same value.
    340   SequencedWorkerPool::SequenceToken token1again =
    341       pool()->GetNamedSequenceToken(name1);
    342   EXPECT_TRUE(token1.Equals(token1again));
    343 
    344   SequencedWorkerPool::SequenceToken token3again =
    345       pool()->GetNamedSequenceToken(name3);
    346   EXPECT_TRUE(token3.Equals(token3again));
    347 }
    348 
    349 // Tests that posting a bunch of tasks (many more than the number of worker
    350 // threads) runs them all.
    351 TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
    352   pool()->PostWorkerTask(FROM_HERE,
    353                          base::Bind(&TestTracker::SlowTask, tracker(), 0));
    354 
    355   const size_t kNumTasks = 20;
    356   for (size_t i = 1; i < kNumTasks; i++) {
    357     pool()->PostWorkerTask(FROM_HERE,
    358                            base::Bind(&TestTracker::FastTask, tracker(), i));
    359   }
    360 
    361   std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
    362   EXPECT_EQ(kNumTasks, result.size());
    363 }
    364 
    365 // Tests that posting a bunch of tasks (many more than the number of
    366 // worker threads) to two pools simultaneously runs them all twice.
    367 // This test is meant to shake out any concurrency issues between
    368 // pools (like histograms).
    369 TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) {
    370   SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1");
    371   SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2");
    372 
    373   base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0);
    374   pool1.pool()->PostWorkerTask(FROM_HERE, slow_task);
    375   pool2.pool()->PostWorkerTask(FROM_HERE, slow_task);
    376 
    377   const size_t kNumTasks = 20;
    378   for (size_t i = 1; i < kNumTasks; i++) {
    379     base::Closure fast_task =
    380         base::Bind(&TestTracker::FastTask, tracker(), i);
    381     pool1.pool()->PostWorkerTask(FROM_HERE, fast_task);
    382     pool2.pool()->PostWorkerTask(FROM_HERE, fast_task);
    383   }
    384 
    385   std::vector<int> result =
    386       tracker()->WaitUntilTasksComplete(2*kNumTasks);
    387   EXPECT_EQ(2 * kNumTasks, result.size());
    388 
    389   pool2.pool()->Shutdown();
    390   pool1.pool()->Shutdown();
    391 }
    392 
    393 // Test that tasks with the same sequence token are executed in order but don't
    394 // affect other tasks.
    395 TEST_F(SequencedWorkerPoolTest, Sequence) {
    396   // Fill all the worker threads except one.
    397   const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
    398   ThreadBlocker background_blocker;
    399   for (size_t i = 0; i < kNumBackgroundTasks; i++) {
    400     pool()->PostWorkerTask(FROM_HERE,
    401                            base::Bind(&TestTracker::BlockTask,
    402                                       tracker(), i, &background_blocker));
    403   }
    404   tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks);
    405 
    406   // Create two tasks with the same sequence token, one that will block on the
    407   // event, and one which will just complete quickly when it's run. Since there
    408   // is one worker thread free, the first task will start and then block, and
    409   // the second task should be waiting.
    410   ThreadBlocker blocker;
    411   SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
    412   pool()->PostSequencedWorkerTask(
    413       token1, FROM_HERE,
    414       base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker));
    415   pool()->PostSequencedWorkerTask(
    416       token1, FROM_HERE,
    417       base::Bind(&TestTracker::FastTask, tracker(), 101));
    418   EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
    419 
    420   // Create another two tasks as above with a different token. These will be
    421   // blocked since there are no slots to run.
    422   SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
    423   pool()->PostSequencedWorkerTask(
    424       token2, FROM_HERE,
    425       base::Bind(&TestTracker::FastTask, tracker(), 200));
    426   pool()->PostSequencedWorkerTask(
    427       token2, FROM_HERE,
    428       base::Bind(&TestTracker::FastTask, tracker(), 201));
    429   EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
    430 
    431   // Let one background task complete. This should then let both tasks of
    432   // token2 run to completion in order. The second task of token1 should still
    433   // be blocked.
    434   background_blocker.Unblock(1);
    435   std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
    436   ASSERT_EQ(3u, result.size());
    437   EXPECT_EQ(200, result[1]);
    438   EXPECT_EQ(201, result[2]);
    439 
    440   // Finish the rest of the background tasks. This should leave some workers
    441   // free with the second token1 task still blocked on the first.
    442   background_blocker.Unblock(kNumBackgroundTasks - 1);
    443   EXPECT_EQ(kNumBackgroundTasks + 2,
    444             tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());
    445 
    446   // Allow the first task of token1 to complete. This should run the second.
    447   blocker.Unblock(1);
    448   result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
    449   ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
    450   EXPECT_EQ(100, result[result.size() - 2]);
    451   EXPECT_EQ(101, result[result.size() - 1]);
    452 }
    453 
    454 // Tests that any tasks posted after Shutdown are ignored.
    455 // Disabled for flakiness.  See http://crbug.com/166451.
    456 TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) {
    457   // Start tasks to take all the threads and block them.
    458   EnsureAllWorkersCreated();
    459   ThreadBlocker blocker;
    460   for (size_t i = 0; i < kNumWorkerThreads; i++) {
    461     pool()->PostWorkerTask(FROM_HERE,
    462                            base::Bind(&TestTracker::BlockTask,
    463                                       tracker(), i, &blocker));
    464   }
    465   tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
    466 
    467   SetWillWaitForShutdownCallback(
    468       base::Bind(&EnsureTasksToCompleteCountAndUnblock,
    469                  scoped_refptr<TestTracker>(tracker()), 0,
    470                  &blocker, kNumWorkerThreads));
    471 
    472   // Shutdown the worker pool. This should discard all non-blocking tasks.
    473   const int kMaxNewBlockingTasksAfterShutdown = 100;
    474   pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown);
    475 
    476   int old_has_work_call_count = has_work_call_count();
    477 
    478   std::vector<int> result =
    479       tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
    480 
    481   // The kNumWorkerThread items should have completed, in no particular order.
    482   ASSERT_EQ(kNumWorkerThreads, result.size());
    483   for (size_t i = 0; i < kNumWorkerThreads; i++) {
    484     EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
    485                 result.end());
    486   }
    487 
    488   // No further tasks, regardless of shutdown mode, should be allowed.
    489   EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
    490       FROM_HERE,
    491       base::Bind(&TestTracker::FastTask, tracker(), 100),
    492       SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
    493   EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
    494       FROM_HERE,
    495       base::Bind(&TestTracker::FastTask, tracker(), 101),
    496       SequencedWorkerPool::SKIP_ON_SHUTDOWN));
    497   EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
    498       FROM_HERE,
    499       base::Bind(&TestTracker::FastTask, tracker(), 102),
    500       SequencedWorkerPool::BLOCK_SHUTDOWN));
    501 
    502   ASSERT_EQ(old_has_work_call_count, has_work_call_count());
    503 }
    504 
    505 TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) {
    506   // Test that <n> new blocking tasks are allowed provided they're posted
    507   // by a running tasks.
    508   EnsureAllWorkersCreated();
    509   ThreadBlocker blocker;
    510 
    511   // Start tasks to take all the threads and block them.
    512   const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
    513   for (int i = 0; i < kNumBlockTasks; ++i) {
    514     EXPECT_TRUE(pool()->PostWorkerTask(
    515         FROM_HERE,
    516         base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
    517   }
    518   tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
    519 
    520   // Queue up shutdown blocking tasks behind those which will attempt to post
    521   // additional tasks when run, PostAdditionalTasks attemtps to post 3
    522   // new FastTasks, one for each shutdown_behavior.
    523   const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads);
    524   for (int i = 0; i < kNumQueuedTasks; ++i) {
    525     EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
    526         FROM_HERE,
    527         base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(),
    528                    false),
    529         SequencedWorkerPool::BLOCK_SHUTDOWN));
    530   }
    531 
    532   // Setup to open the floodgates from within Shutdown().
    533   SetWillWaitForShutdownCallback(
    534       base::Bind(&EnsureTasksToCompleteCountAndUnblock,
    535                  scoped_refptr<TestTracker>(tracker()),
    536                  0, &blocker, kNumBlockTasks));
    537 
    538   // Allow half of the additional blocking tasks thru.
    539   const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2;
    540   pool()->Shutdown(kNumNewBlockingTasksToAllow);
    541 
    542   // Ensure that the correct number of tasks actually got run.
    543   tracker()->WaitUntilTasksComplete(static_cast<size_t>(
    544       kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow));
    545 
    546   // Clean up the task IDs we added and go home.
    547   tracker()->ClearCompleteSequence();
    548 }
    549 
    550 // Tests that unrun tasks are discarded properly according to their shutdown
    551 // mode.
    552 TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
    553   // Start tasks to take all the threads and block them.
    554   EnsureAllWorkersCreated();
    555   ThreadBlocker blocker;
    556   for (size_t i = 0; i < kNumWorkerThreads; i++) {
    557     pool()->PostWorkerTask(FROM_HERE,
    558                            base::Bind(&TestTracker::BlockTask,
    559                                       tracker(), i, &blocker));
    560   }
    561   tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
    562 
    563   // Create some tasks with different shutdown modes.
    564   pool()->PostWorkerTaskWithShutdownBehavior(
    565       FROM_HERE,
    566       base::Bind(&TestTracker::FastTask, tracker(), 100),
    567       SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
    568   pool()->PostWorkerTaskWithShutdownBehavior(
    569       FROM_HERE,
    570       base::Bind(&TestTracker::FastTask, tracker(), 101),
    571       SequencedWorkerPool::SKIP_ON_SHUTDOWN);
    572   pool()->PostWorkerTaskWithShutdownBehavior(
    573       FROM_HERE,
    574       base::Bind(&TestTracker::FastTask, tracker(), 102),
    575       SequencedWorkerPool::BLOCK_SHUTDOWN);
    576 
    577   // Shutdown the worker pool. This should discard all non-blocking tasks.
    578   SetWillWaitForShutdownCallback(
    579       base::Bind(&EnsureTasksToCompleteCountAndUnblock,
    580                  scoped_refptr<TestTracker>(tracker()), 0,
    581                  &blocker, kNumWorkerThreads));
    582   pool()->Shutdown();
    583 
    584   std::vector<int> result =
    585       tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1);
    586 
    587   // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
    588   // one, in no particular order.
    589   ASSERT_EQ(kNumWorkerThreads + 1, result.size());
    590   for (size_t i = 0; i < kNumWorkerThreads; i++) {
    591     EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
    592                 result.end());
    593   }
    594   EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
    595 }
    596 
    597 // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
    598 TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
    599   scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior(
    600       SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
    601   scoped_refptr<SequencedTaskRunner> sequenced_runner(
    602       pool()->GetSequencedTaskRunnerWithShutdownBehavior(
    603           pool()->GetSequenceToken(),
    604           SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
    605   EnsureAllWorkersCreated();
    606   ThreadBlocker blocker;
    607   pool()->PostWorkerTaskWithShutdownBehavior(
    608       FROM_HERE,
    609       base::Bind(&TestTracker::BlockTask,
    610                  tracker(), 0, &blocker),
    611       SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
    612   runner->PostTask(
    613       FROM_HERE,
    614       base::Bind(&TestTracker::BlockTask,
    615                  tracker(), 1, &blocker));
    616   sequenced_runner->PostTask(
    617       FROM_HERE,
    618       base::Bind(&TestTracker::BlockTask,
    619                  tracker(), 2, &blocker));
    620 
    621   tracker()->WaitUntilTasksBlocked(3);
    622 
    623   // This should not block. If this test hangs, it means it failed.
    624   pool()->Shutdown();
    625 
    626   // The task should not have completed yet.
    627   EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
    628 
    629   // Posting more tasks should fail.
    630   EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
    631       FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
    632       SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
    633   EXPECT_FALSE(runner->PostTask(
    634       FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
    635   EXPECT_FALSE(sequenced_runner->PostTask(
    636       FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
    637 
    638   // Continue the background thread and make sure the tasks can complete.
    639   blocker.Unblock(3);
    640   std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
    641   EXPECT_EQ(3u, result.size());
    642 }
    643 
    644 // Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown
    645 // until they stop, but tasks not yet started do not.
    646 TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) {
    647   // Start tasks to take all the threads and block them.
    648   EnsureAllWorkersCreated();
    649   ThreadBlocker blocker;
    650 
    651   // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not
    652   // return until these tasks have completed.
    653   for (size_t i = 0; i < kNumWorkerThreads; i++) {
    654     pool()->PostWorkerTaskWithShutdownBehavior(
    655         FROM_HERE,
    656         base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker),
    657         SequencedWorkerPool::SKIP_ON_SHUTDOWN);
    658   }
    659   tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
    660 
    661   // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be
    662   // executed once Shutdown() has been called.
    663   pool()->PostWorkerTaskWithShutdownBehavior(
    664       FROM_HERE,
    665       base::Bind(&TestTracker::BlockTask,
    666                  tracker(), 0, &blocker),
    667       SequencedWorkerPool::SKIP_ON_SHUTDOWN);
    668 
    669   // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have
    670   // been started block shutdown.
    671   SetWillWaitForShutdownCallback(
    672       base::Bind(&EnsureTasksToCompleteCountAndUnblock,
    673                  scoped_refptr<TestTracker>(tracker()), 0,
    674                  &blocker, kNumWorkerThreads));
    675 
    676   // No tasks should have completed yet.
    677   EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
    678 
    679   // This should not block. If this test hangs, it means it failed.
    680   pool()->Shutdown();
    681 
    682   // Shutdown should not return until all of the tasks have completed.
    683   std::vector<int> result =
    684       tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
    685 
    686   // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be
    687   // allowed to complete. No additional non-blocking tasks should have been
    688   // started.
    689   ASSERT_EQ(kNumWorkerThreads, result.size());
    690   for (size_t i = 0; i < kNumWorkerThreads; i++) {
    691     EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
    692                 result.end());
    693   }
    694 }
    695 
    696 // Ensure all worker threads are created, and then trigger a spurious
    697 // work signal. This shouldn't cause any other work signals to be
    698 // triggered. This is a regression test for http://crbug.com/117469.
    699 TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) {
    700   EnsureAllWorkersCreated();
    701   int old_has_work_call_count = has_work_call_count();
    702   pool()->SignalHasWorkForTesting();
    703   // This is inherently racy, but can only produce false positives.
    704   base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
    705   EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count());
    706 }
    707 
    708 void IsRunningOnCurrentThreadTask(
    709     SequencedWorkerPool::SequenceToken test_positive_token,
    710     SequencedWorkerPool::SequenceToken test_negative_token,
    711     SequencedWorkerPool* pool,
    712     SequencedWorkerPool* unused_pool) {
    713   EXPECT_TRUE(pool->RunsTasksOnCurrentThread());
    714   EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token));
    715   EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token));
    716   EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
    717   EXPECT_FALSE(
    718       unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token));
    719   EXPECT_FALSE(
    720       unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token));
    721 }
    722 
    723 // Verify correctness of the IsRunningSequenceOnCurrentThread method.
    724 TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) {
    725   SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
    726   SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
    727   SequencedWorkerPool::SequenceToken unsequenced_token;
    728 
    729   scoped_refptr<SequencedWorkerPool> unused_pool =
    730       new SequencedWorkerPool(2, "unused_pool");
    731 
    732   EXPECT_FALSE(pool()->RunsTasksOnCurrentThread());
    733   EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1));
    734   EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2));
    735   EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token));
    736   EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
    737   EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1));
    738   EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2));
    739   EXPECT_FALSE(
    740       unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token));
    741 
    742   pool()->PostSequencedWorkerTask(
    743       token1, FROM_HERE,
    744       base::Bind(&IsRunningOnCurrentThreadTask,
    745                  token1, token2, pool(), unused_pool));
    746   pool()->PostSequencedWorkerTask(
    747       token2, FROM_HERE,
    748       base::Bind(&IsRunningOnCurrentThreadTask,
    749                  token2, unsequenced_token, pool(), unused_pool));
    750   pool()->PostWorkerTask(
    751       FROM_HERE,
    752       base::Bind(&IsRunningOnCurrentThreadTask,
    753                  unsequenced_token, token1, pool(), unused_pool));
    754   pool()->Shutdown();
    755   unused_pool->Shutdown();
    756 }
    757 
    758 // Verify that FlushForTesting works as intended.
    759 TEST_F(SequencedWorkerPoolTest, FlushForTesting) {
    760   // Should be fine to call on a new instance.
    761   pool()->FlushForTesting();
    762 
    763   // Queue up a bunch of work, including  a long delayed task and
    764   // a task that produces additional tasks as an artifact.
    765   pool()->PostDelayedWorkerTask(
    766       FROM_HERE,
    767       base::Bind(&TestTracker::FastTask, tracker(), 0),
    768       TimeDelta::FromMinutes(5));
    769   pool()->PostWorkerTask(FROM_HERE,
    770                          base::Bind(&TestTracker::SlowTask, tracker(), 0));
    771   const size_t kNumFastTasks = 20;
    772   for (size_t i = 0; i < kNumFastTasks; i++) {
    773     pool()->PostWorkerTask(FROM_HERE,
    774                            base::Bind(&TestTracker::FastTask, tracker(), 0));
    775   }
    776   pool()->PostWorkerTask(
    777       FROM_HERE,
    778       base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(),
    779                  true));
    780 
    781   // We expect all except the delayed task to have been run. We verify all
    782   // closures have been deleted by looking at the refcount of the
    783   // tracker.
    784   EXPECT_FALSE(tracker()->HasOneRef());
    785   pool()->FlushForTesting();
    786   EXPECT_TRUE(tracker()->HasOneRef());
    787   EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount());
    788 
    789   // Should be fine to call on an idle instance with all threads created, and
    790   // spamming the method shouldn't deadlock or confuse the class.
    791   pool()->FlushForTesting();
    792   pool()->FlushForTesting();
    793 
    794   // Should be fine to call after shutdown too.
    795   pool()->Shutdown();
    796   pool()->FlushForTesting();
    797 }
    798 
    799 TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) {
    800   MessageLoop loop;
    801   scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool"));
    802   scoped_refptr<SequencedTaskRunner> task_runner =
    803       pool->GetSequencedTaskRunnerWithShutdownBehavior(
    804           pool->GetSequenceToken(),
    805           base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
    806 
    807   // Upon test exit, should shut down without hanging.
    808   pool->Shutdown();
    809 }
    810 
    811 class SequencedWorkerPoolTaskRunnerTestDelegate {
    812  public:
    813   SequencedWorkerPoolTaskRunnerTestDelegate() {}
    814 
    815   ~SequencedWorkerPoolTaskRunnerTestDelegate() {}
    816 
    817   void StartTaskRunner() {
    818     pool_owner_.reset(
    819         new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
    820   }
    821 
    822   scoped_refptr<SequencedWorkerPool> GetTaskRunner() {
    823     return pool_owner_->pool();
    824   }
    825 
    826   void StopTaskRunner() {
    827     // Make sure all tasks are run before shutting down. Delayed tasks are
    828     // not run, they're simply deleted.
    829     pool_owner_->pool()->FlushForTesting();
    830     pool_owner_->pool()->Shutdown();
    831     // Don't reset |pool_owner_| here, as the test may still hold a
    832     // reference to the pool.
    833   }
    834 
    835   bool TaskRunnerHandlesNonZeroDelays() const {
    836     return true;
    837   }
    838 
    839  private:
    840   MessageLoop message_loop_;
    841   scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
    842 };
    843 
    844 INSTANTIATE_TYPED_TEST_CASE_P(
    845     SequencedWorkerPool, TaskRunnerTest,
    846     SequencedWorkerPoolTaskRunnerTestDelegate);
    847 
    848 class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate {
    849  public:
    850   SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {}
    851 
    852   ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {
    853   }
    854 
    855   void StartTaskRunner() {
    856     pool_owner_.reset(
    857         new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
    858     task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior(
    859         SequencedWorkerPool::BLOCK_SHUTDOWN);
    860   }
    861 
    862   scoped_refptr<TaskRunner> GetTaskRunner() {
    863     return task_runner_;
    864   }
    865 
    866   void StopTaskRunner() {
    867     // Make sure all tasks are run before shutting down. Delayed tasks are
    868     // not run, they're simply deleted.
    869     pool_owner_->pool()->FlushForTesting();
    870     pool_owner_->pool()->Shutdown();
    871     // Don't reset |pool_owner_| here, as the test may still hold a
    872     // reference to the pool.
    873   }
    874 
    875   bool TaskRunnerHandlesNonZeroDelays() const {
    876     return true;
    877   }
    878 
    879  private:
    880   MessageLoop message_loop_;
    881   scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
    882   scoped_refptr<TaskRunner> task_runner_;
    883 };
    884 
    885 INSTANTIATE_TYPED_TEST_CASE_P(
    886     SequencedWorkerPoolTaskRunner, TaskRunnerTest,
    887     SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate);
    888 
    889 class SequencedWorkerPoolSequencedTaskRunnerTestDelegate {
    890  public:
    891   SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {}
    892 
    893   ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {
    894   }
    895 
    896   void StartTaskRunner() {
    897     pool_owner_.reset(new SequencedWorkerPoolOwner(
    898         10, "SequencedWorkerPoolSequencedTaskRunnerTest"));
    899     task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner(
    900         pool_owner_->pool()->GetSequenceToken());
    901   }
    902 
    903   scoped_refptr<SequencedTaskRunner> GetTaskRunner() {
    904     return task_runner_;
    905   }
    906 
    907   void StopTaskRunner() {
    908     // Make sure all tasks are run before shutting down. Delayed tasks are
    909     // not run, they're simply deleted.
    910     pool_owner_->pool()->FlushForTesting();
    911     pool_owner_->pool()->Shutdown();
    912     // Don't reset |pool_owner_| here, as the test may still hold a
    913     // reference to the pool.
    914   }
    915 
    916   bool TaskRunnerHandlesNonZeroDelays() const {
    917     return true;
    918   }
    919 
    920  private:
    921   MessageLoop message_loop_;
    922   scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
    923   scoped_refptr<SequencedTaskRunner> task_runner_;
    924 };
    925 
    926 INSTANTIATE_TYPED_TEST_CASE_P(
    927     SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest,
    928     SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
    929 
    930 INSTANTIATE_TYPED_TEST_CASE_P(
    931     SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest,
    932     SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
    933 
    934 }  // namespace
    935 
    936 }  // namespace base
    937