Home | History | Annotate | Download | only in threading
      1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "base/threading/worker_pool_posix.h"
      6 
      7 #include <set>
      8 
      9 #include "base/synchronization/condition_variable.h"
     10 #include "base/synchronization/lock.h"
     11 #include "base/task.h"
     12 #include "base/threading/platform_thread.h"
     13 #include "base/synchronization/waitable_event.h"
     14 #include "testing/gtest/include/gtest/gtest.h"
     15 
     16 namespace base {
     17 
     18 // Peer class to provide passthrough access to PosixDynamicThreadPool internals.
     19 class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer {
     20  public:
     21   explicit PosixDynamicThreadPoolPeer(PosixDynamicThreadPool* pool)
     22       : pool_(pool) {}
     23 
     24   Lock* lock() { return &pool_->lock_; }
     25   ConditionVariable* tasks_available_cv() {
     26     return &pool_->tasks_available_cv_;
     27   }
     28   const std::queue<Task*>& tasks() const { return pool_->tasks_; }
     29   int num_idle_threads() const { return pool_->num_idle_threads_; }
     30   ConditionVariable* num_idle_threads_cv() {
     31     return pool_->num_idle_threads_cv_.get();
     32   }
     33   void set_num_idle_threads_cv(ConditionVariable* cv) {
     34     pool_->num_idle_threads_cv_.reset(cv);
     35   }
     36 
     37  private:
     38   PosixDynamicThreadPool* pool_;
     39 
     40   DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPoolPeer);
     41 };
     42 
     43 namespace {
     44 
     45 // IncrementingTask's main purpose is to increment a counter.  It also updates a
     46 // set of unique thread ids, and signals a ConditionVariable on completion.
     47 // Note that since it does not block, there is no way to control the number of
     48 // threads used if more than one IncrementingTask is consecutively posted to the
     49 // thread pool, since the first one might finish executing before the subsequent
     50 // PostTask() calls get invoked.
     51 class IncrementingTask : public Task {
     52  public:
     53   IncrementingTask(Lock* counter_lock,
     54                    int* counter,
     55                    Lock* unique_threads_lock,
     56                    std::set<PlatformThreadId>* unique_threads)
     57       : counter_lock_(counter_lock),
     58         unique_threads_lock_(unique_threads_lock),
     59         unique_threads_(unique_threads),
     60         counter_(counter) {}
     61 
     62   virtual void Run() {
     63     AddSelfToUniqueThreadSet();
     64     base::AutoLock locked(*counter_lock_);
     65     (*counter_)++;
     66   }
     67 
     68   void AddSelfToUniqueThreadSet() {
     69     base::AutoLock locked(*unique_threads_lock_);
     70     unique_threads_->insert(PlatformThread::CurrentId());
     71   }
     72 
     73  private:
     74   Lock* counter_lock_;
     75   Lock* unique_threads_lock_;
     76   std::set<PlatformThreadId>* unique_threads_;
     77   int* counter_;
     78 
     79   DISALLOW_COPY_AND_ASSIGN(IncrementingTask);
     80 };
     81 
     82 // BlockingIncrementingTask is a simple wrapper around IncrementingTask that
     83 // allows for waiting at the start of Run() for a WaitableEvent to be signalled.
     84 class BlockingIncrementingTask : public Task {
     85  public:
     86   BlockingIncrementingTask(Lock* counter_lock,
     87                            int* counter,
     88                            Lock* unique_threads_lock,
     89                            std::set<PlatformThreadId>* unique_threads,
     90                            Lock* num_waiting_to_start_lock,
     91                            int* num_waiting_to_start,
     92                            ConditionVariable* num_waiting_to_start_cv,
     93                            base::WaitableEvent* start)
     94       : incrementer_(
     95           counter_lock, counter, unique_threads_lock, unique_threads),
     96         num_waiting_to_start_lock_(num_waiting_to_start_lock),
     97         num_waiting_to_start_(num_waiting_to_start),
     98         num_waiting_to_start_cv_(num_waiting_to_start_cv),
     99         start_(start) {}
    100 
    101   virtual void Run() {
    102     {
    103       base::AutoLock num_waiting_to_start_locked(*num_waiting_to_start_lock_);
    104       (*num_waiting_to_start_)++;
    105     }
    106     num_waiting_to_start_cv_->Signal();
    107     CHECK(start_->Wait());
    108     incrementer_.Run();
    109   }
    110 
    111  private:
    112   IncrementingTask incrementer_;
    113   Lock* num_waiting_to_start_lock_;
    114   int* num_waiting_to_start_;
    115   ConditionVariable* num_waiting_to_start_cv_;
    116   base::WaitableEvent* start_;
    117 
    118   DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask);
    119 };
    120 
    121 class PosixDynamicThreadPoolTest : public testing::Test {
    122  protected:
    123   PosixDynamicThreadPoolTest()
    124       : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60*60)),
    125         peer_(pool_.get()),
    126         counter_(0),
    127         num_waiting_to_start_(0),
    128         num_waiting_to_start_cv_(&num_waiting_to_start_lock_),
    129         start_(true, false) {}
    130 
    131   virtual void SetUp() {
    132     peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock()));
    133   }
    134 
    135   virtual void TearDown() {
    136     // Wake up the idle threads so they can terminate.
    137     if (pool_.get()) pool_->Terminate();
    138   }
    139 
    140   void WaitForTasksToStart(int num_tasks) {
    141     base::AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_);
    142     while (num_waiting_to_start_ < num_tasks) {
    143       num_waiting_to_start_cv_.Wait();
    144     }
    145   }
    146 
    147   void WaitForIdleThreads(int num_idle_threads) {
    148     base::AutoLock pool_locked(*peer_.lock());
    149     while (peer_.num_idle_threads() < num_idle_threads) {
    150       peer_.num_idle_threads_cv()->Wait();
    151     }
    152   }
    153 
    154   Task* CreateNewIncrementingTask() {
    155     return new IncrementingTask(&counter_lock_, &counter_,
    156                                 &unique_threads_lock_, &unique_threads_);
    157   }
    158 
    159   Task* CreateNewBlockingIncrementingTask() {
    160     return new BlockingIncrementingTask(
    161         &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_,
    162         &num_waiting_to_start_lock_, &num_waiting_to_start_,
    163         &num_waiting_to_start_cv_, &start_);
    164   }
    165 
    166   scoped_refptr<base::PosixDynamicThreadPool> pool_;
    167   base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_;
    168   Lock counter_lock_;
    169   int counter_;
    170   Lock unique_threads_lock_;
    171   std::set<PlatformThreadId> unique_threads_;
    172   Lock num_waiting_to_start_lock_;
    173   int num_waiting_to_start_;
    174   ConditionVariable num_waiting_to_start_cv_;
    175   base::WaitableEvent start_;
    176 };
    177 
    178 }  // namespace
    179 
    180 TEST_F(PosixDynamicThreadPoolTest, Basic) {
    181   EXPECT_EQ(0, peer_.num_idle_threads());
    182   EXPECT_EQ(0U, unique_threads_.size());
    183   EXPECT_EQ(0U, peer_.tasks().size());
    184 
    185   // Add one task and wait for it to be completed.
    186   pool_->PostTask(CreateNewIncrementingTask());
    187 
    188   WaitForIdleThreads(1);
    189 
    190   EXPECT_EQ(1U, unique_threads_.size()) <<
    191       "There should be only one thread allocated for one task.";
    192   EXPECT_EQ(1, peer_.num_idle_threads());
    193   EXPECT_EQ(1, counter_);
    194 }
    195 
    196 TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
    197   // Add one task and wait for it to be completed.
    198   pool_->PostTask(CreateNewIncrementingTask());
    199 
    200   WaitForIdleThreads(1);
    201 
    202   // Add another 2 tasks.  One should reuse the existing worker thread.
    203   pool_->PostTask(CreateNewBlockingIncrementingTask());
    204   pool_->PostTask(CreateNewBlockingIncrementingTask());
    205 
    206   WaitForTasksToStart(2);
    207   start_.Signal();
    208   WaitForIdleThreads(2);
    209 
    210   EXPECT_EQ(2U, unique_threads_.size());
    211   EXPECT_EQ(2, peer_.num_idle_threads());
    212   EXPECT_EQ(3, counter_);
    213 }
    214 
    215 TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
    216   // Add two blocking tasks.
    217   pool_->PostTask(CreateNewBlockingIncrementingTask());
    218   pool_->PostTask(CreateNewBlockingIncrementingTask());
    219 
    220   EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet.";
    221 
    222   WaitForTasksToStart(2);
    223   start_.Signal();
    224   WaitForIdleThreads(2);
    225 
    226   EXPECT_EQ(2U, unique_threads_.size());
    227   EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle.";
    228   EXPECT_EQ(2, counter_);
    229 }
    230 
    231 TEST_F(PosixDynamicThreadPoolTest, Complex) {
    232   // Add two non blocking tasks and wait for them to finish.
    233   pool_->PostTask(CreateNewIncrementingTask());
    234 
    235   WaitForIdleThreads(1);
    236 
    237   // Add two blocking tasks, start them simultaneously, and wait for them to
    238   // finish.
    239   pool_->PostTask(CreateNewBlockingIncrementingTask());
    240   pool_->PostTask(CreateNewBlockingIncrementingTask());
    241 
    242   WaitForTasksToStart(2);
    243   start_.Signal();
    244   WaitForIdleThreads(2);
    245 
    246   EXPECT_EQ(3, counter_);
    247   EXPECT_EQ(2, peer_.num_idle_threads());
    248   EXPECT_EQ(2U, unique_threads_.size());
    249 
    250   // Wake up all idle threads so they can exit.
    251   {
    252     base::AutoLock locked(*peer_.lock());
    253     while (peer_.num_idle_threads() > 0) {
    254       peer_.tasks_available_cv()->Signal();
    255       peer_.num_idle_threads_cv()->Wait();
    256     }
    257   }
    258 
    259   // Add another non blocking task.  There are no threads to reuse.
    260   pool_->PostTask(CreateNewIncrementingTask());
    261   WaitForIdleThreads(1);
    262 
    263   EXPECT_EQ(3U, unique_threads_.size());
    264   EXPECT_EQ(1, peer_.num_idle_threads());
    265   EXPECT_EQ(4, counter_);
    266 }
    267 
    268 }  // namespace base
    269