1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/threading/worker_pool_posix.h" 6 7 #include <set> 8 9 #include "base/synchronization/condition_variable.h" 10 #include "base/synchronization/lock.h" 11 #include "base/task.h" 12 #include "base/threading/platform_thread.h" 13 #include "base/synchronization/waitable_event.h" 14 #include "testing/gtest/include/gtest/gtest.h" 15 16 namespace base { 17 18 // Peer class to provide passthrough access to PosixDynamicThreadPool internals. 19 class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer { 20 public: 21 explicit PosixDynamicThreadPoolPeer(PosixDynamicThreadPool* pool) 22 : pool_(pool) {} 23 24 Lock* lock() { return &pool_->lock_; } 25 ConditionVariable* tasks_available_cv() { 26 return &pool_->tasks_available_cv_; 27 } 28 const std::queue<Task*>& tasks() const { return pool_->tasks_; } 29 int num_idle_threads() const { return pool_->num_idle_threads_; } 30 ConditionVariable* num_idle_threads_cv() { 31 return pool_->num_idle_threads_cv_.get(); 32 } 33 void set_num_idle_threads_cv(ConditionVariable* cv) { 34 pool_->num_idle_threads_cv_.reset(cv); 35 } 36 37 private: 38 PosixDynamicThreadPool* pool_; 39 40 DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPoolPeer); 41 }; 42 43 namespace { 44 45 // IncrementingTask's main purpose is to increment a counter. It also updates a 46 // set of unique thread ids, and signals a ConditionVariable on completion. 47 // Note that since it does not block, there is no way to control the number of 48 // threads used if more than one IncrementingTask is consecutively posted to the 49 // thread pool, since the first one might finish executing before the subsequent 50 // PostTask() calls get invoked. 51 class IncrementingTask : public Task { 52 public: 53 IncrementingTask(Lock* counter_lock, 54 int* counter, 55 Lock* unique_threads_lock, 56 std::set<PlatformThreadId>* unique_threads) 57 : counter_lock_(counter_lock), 58 unique_threads_lock_(unique_threads_lock), 59 unique_threads_(unique_threads), 60 counter_(counter) {} 61 62 virtual void Run() { 63 AddSelfToUniqueThreadSet(); 64 base::AutoLock locked(*counter_lock_); 65 (*counter_)++; 66 } 67 68 void AddSelfToUniqueThreadSet() { 69 base::AutoLock locked(*unique_threads_lock_); 70 unique_threads_->insert(PlatformThread::CurrentId()); 71 } 72 73 private: 74 Lock* counter_lock_; 75 Lock* unique_threads_lock_; 76 std::set<PlatformThreadId>* unique_threads_; 77 int* counter_; 78 79 DISALLOW_COPY_AND_ASSIGN(IncrementingTask); 80 }; 81 82 // BlockingIncrementingTask is a simple wrapper around IncrementingTask that 83 // allows for waiting at the start of Run() for a WaitableEvent to be signalled. 84 class BlockingIncrementingTask : public Task { 85 public: 86 BlockingIncrementingTask(Lock* counter_lock, 87 int* counter, 88 Lock* unique_threads_lock, 89 std::set<PlatformThreadId>* unique_threads, 90 Lock* num_waiting_to_start_lock, 91 int* num_waiting_to_start, 92 ConditionVariable* num_waiting_to_start_cv, 93 base::WaitableEvent* start) 94 : incrementer_( 95 counter_lock, counter, unique_threads_lock, unique_threads), 96 num_waiting_to_start_lock_(num_waiting_to_start_lock), 97 num_waiting_to_start_(num_waiting_to_start), 98 num_waiting_to_start_cv_(num_waiting_to_start_cv), 99 start_(start) {} 100 101 virtual void Run() { 102 { 103 base::AutoLock num_waiting_to_start_locked(*num_waiting_to_start_lock_); 104 (*num_waiting_to_start_)++; 105 } 106 num_waiting_to_start_cv_->Signal(); 107 CHECK(start_->Wait()); 108 incrementer_.Run(); 109 } 110 111 private: 112 IncrementingTask incrementer_; 113 Lock* num_waiting_to_start_lock_; 114 int* num_waiting_to_start_; 115 ConditionVariable* num_waiting_to_start_cv_; 116 base::WaitableEvent* start_; 117 118 DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask); 119 }; 120 121 class PosixDynamicThreadPoolTest : public testing::Test { 122 protected: 123 PosixDynamicThreadPoolTest() 124 : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60*60)), 125 peer_(pool_.get()), 126 counter_(0), 127 num_waiting_to_start_(0), 128 num_waiting_to_start_cv_(&num_waiting_to_start_lock_), 129 start_(true, false) {} 130 131 virtual void SetUp() { 132 peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock())); 133 } 134 135 virtual void TearDown() { 136 // Wake up the idle threads so they can terminate. 137 if (pool_.get()) pool_->Terminate(); 138 } 139 140 void WaitForTasksToStart(int num_tasks) { 141 base::AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_); 142 while (num_waiting_to_start_ < num_tasks) { 143 num_waiting_to_start_cv_.Wait(); 144 } 145 } 146 147 void WaitForIdleThreads(int num_idle_threads) { 148 base::AutoLock pool_locked(*peer_.lock()); 149 while (peer_.num_idle_threads() < num_idle_threads) { 150 peer_.num_idle_threads_cv()->Wait(); 151 } 152 } 153 154 Task* CreateNewIncrementingTask() { 155 return new IncrementingTask(&counter_lock_, &counter_, 156 &unique_threads_lock_, &unique_threads_); 157 } 158 159 Task* CreateNewBlockingIncrementingTask() { 160 return new BlockingIncrementingTask( 161 &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_, 162 &num_waiting_to_start_lock_, &num_waiting_to_start_, 163 &num_waiting_to_start_cv_, &start_); 164 } 165 166 scoped_refptr<base::PosixDynamicThreadPool> pool_; 167 base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_; 168 Lock counter_lock_; 169 int counter_; 170 Lock unique_threads_lock_; 171 std::set<PlatformThreadId> unique_threads_; 172 Lock num_waiting_to_start_lock_; 173 int num_waiting_to_start_; 174 ConditionVariable num_waiting_to_start_cv_; 175 base::WaitableEvent start_; 176 }; 177 178 } // namespace 179 180 TEST_F(PosixDynamicThreadPoolTest, Basic) { 181 EXPECT_EQ(0, peer_.num_idle_threads()); 182 EXPECT_EQ(0U, unique_threads_.size()); 183 EXPECT_EQ(0U, peer_.tasks().size()); 184 185 // Add one task and wait for it to be completed. 186 pool_->PostTask(CreateNewIncrementingTask()); 187 188 WaitForIdleThreads(1); 189 190 EXPECT_EQ(1U, unique_threads_.size()) << 191 "There should be only one thread allocated for one task."; 192 EXPECT_EQ(1, peer_.num_idle_threads()); 193 EXPECT_EQ(1, counter_); 194 } 195 196 TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) { 197 // Add one task and wait for it to be completed. 198 pool_->PostTask(CreateNewIncrementingTask()); 199 200 WaitForIdleThreads(1); 201 202 // Add another 2 tasks. One should reuse the existing worker thread. 203 pool_->PostTask(CreateNewBlockingIncrementingTask()); 204 pool_->PostTask(CreateNewBlockingIncrementingTask()); 205 206 WaitForTasksToStart(2); 207 start_.Signal(); 208 WaitForIdleThreads(2); 209 210 EXPECT_EQ(2U, unique_threads_.size()); 211 EXPECT_EQ(2, peer_.num_idle_threads()); 212 EXPECT_EQ(3, counter_); 213 } 214 215 TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) { 216 // Add two blocking tasks. 217 pool_->PostTask(CreateNewBlockingIncrementingTask()); 218 pool_->PostTask(CreateNewBlockingIncrementingTask()); 219 220 EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet."; 221 222 WaitForTasksToStart(2); 223 start_.Signal(); 224 WaitForIdleThreads(2); 225 226 EXPECT_EQ(2U, unique_threads_.size()); 227 EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle."; 228 EXPECT_EQ(2, counter_); 229 } 230 231 TEST_F(PosixDynamicThreadPoolTest, Complex) { 232 // Add two non blocking tasks and wait for them to finish. 233 pool_->PostTask(CreateNewIncrementingTask()); 234 235 WaitForIdleThreads(1); 236 237 // Add two blocking tasks, start them simultaneously, and wait for them to 238 // finish. 239 pool_->PostTask(CreateNewBlockingIncrementingTask()); 240 pool_->PostTask(CreateNewBlockingIncrementingTask()); 241 242 WaitForTasksToStart(2); 243 start_.Signal(); 244 WaitForIdleThreads(2); 245 246 EXPECT_EQ(3, counter_); 247 EXPECT_EQ(2, peer_.num_idle_threads()); 248 EXPECT_EQ(2U, unique_threads_.size()); 249 250 // Wake up all idle threads so they can exit. 251 { 252 base::AutoLock locked(*peer_.lock()); 253 while (peer_.num_idle_threads() > 0) { 254 peer_.tasks_available_cv()->Signal(); 255 peer_.num_idle_threads_cv()->Wait(); 256 } 257 } 258 259 // Add another non blocking task. There are no threads to reuse. 260 pool_->PostTask(CreateNewIncrementingTask()); 261 WaitForIdleThreads(1); 262 263 EXPECT_EQ(3U, unique_threads_.size()); 264 EXPECT_EQ(1, peer_.num_idle_threads()); 265 EXPECT_EQ(4, counter_); 266 } 267 268 } // namespace base 269