1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 // Multi-threaded tests of ConditionVariable class. 6 7 #include <time.h> 8 #include <algorithm> 9 #include <vector> 10 11 #include "base/bind.h" 12 #include "base/location.h" 13 #include "base/logging.h" 14 #include "base/memory/scoped_ptr.h" 15 #include "base/single_thread_task_runner.h" 16 #include "base/synchronization/condition_variable.h" 17 #include "base/synchronization/lock.h" 18 #include "base/synchronization/spin_wait.h" 19 #include "base/threading/platform_thread.h" 20 #include "base/threading/thread.h" 21 #include "base/threading/thread_collision_warner.h" 22 #include "base/time/time.h" 23 #include "build/build_config.h" 24 #include "testing/gtest/include/gtest/gtest.h" 25 #include "testing/platform_test.h" 26 27 namespace base { 28 29 namespace { 30 //------------------------------------------------------------------------------ 31 // Define our test class, with several common variables. 32 //------------------------------------------------------------------------------ 33 34 class ConditionVariableTest : public PlatformTest { 35 public: 36 const TimeDelta kZeroMs; 37 const TimeDelta kTenMs; 38 const TimeDelta kThirtyMs; 39 const TimeDelta kFortyFiveMs; 40 const TimeDelta kSixtyMs; 41 const TimeDelta kOneHundredMs; 42 43 ConditionVariableTest() 44 : kZeroMs(TimeDelta::FromMilliseconds(0)), 45 kTenMs(TimeDelta::FromMilliseconds(10)), 46 kThirtyMs(TimeDelta::FromMilliseconds(30)), 47 kFortyFiveMs(TimeDelta::FromMilliseconds(45)), 48 kSixtyMs(TimeDelta::FromMilliseconds(60)), 49 kOneHundredMs(TimeDelta::FromMilliseconds(100)) { 50 } 51 }; 52 53 //------------------------------------------------------------------------------ 54 // Define a class that will control activities an several multi-threaded tests. 55 // The general structure of multi-threaded tests is that a test case will 56 // construct an instance of a WorkQueue. The WorkQueue will spin up some 57 // threads and control them throughout their lifetime, as well as maintaining 58 // a central repository of the work thread's activity. Finally, the WorkQueue 59 // will command the the worker threads to terminate. At that point, the test 60 // cases will validate that the WorkQueue has records showing that the desired 61 // activities were performed. 62 //------------------------------------------------------------------------------ 63 64 // Callers are responsible for synchronizing access to the following class. 65 // The WorkQueue::lock_, as accessed via WorkQueue::lock(), should be used for 66 // all synchronized access. 67 class WorkQueue : public PlatformThread::Delegate { 68 public: 69 explicit WorkQueue(int thread_count); 70 ~WorkQueue() override; 71 72 // PlatformThread::Delegate interface. 73 void ThreadMain() override; 74 75 //---------------------------------------------------------------------------- 76 // Worker threads only call the following methods. 77 // They should use the lock to get exclusive access. 78 int GetThreadId(); // Get an ID assigned to a thread.. 79 bool EveryIdWasAllocated() const; // Indicates that all IDs were handed out. 80 TimeDelta GetAnAssignment(int thread_id); // Get a work task duration. 81 void WorkIsCompleted(int thread_id); 82 83 int task_count() const; 84 bool allow_help_requests() const; // Workers can signal more workers. 85 bool shutdown() const; // Check if shutdown has been requested. 86 87 void thread_shutting_down(); 88 89 90 //---------------------------------------------------------------------------- 91 // Worker threads can call them but not needed to acquire a lock. 92 Lock* lock(); 93 94 ConditionVariable* work_is_available(); 95 ConditionVariable* all_threads_have_ids(); 96 ConditionVariable* no_more_tasks(); 97 98 //---------------------------------------------------------------------------- 99 // The rest of the methods are for use by the controlling master thread (the 100 // test case code). 101 void ResetHistory(); 102 int GetMinCompletionsByWorkerThread() const; 103 int GetMaxCompletionsByWorkerThread() const; 104 int GetNumThreadsTakingAssignments() const; 105 int GetNumThreadsCompletingTasks() const; 106 int GetNumberOfCompletedTasks() const; 107 108 void SetWorkTime(TimeDelta delay); 109 void SetTaskCount(int count); 110 void SetAllowHelp(bool allow); 111 112 // The following must be called without locking, and will spin wait until the 113 // threads are all in a wait state. 114 void SpinUntilAllThreadsAreWaiting(); 115 void SpinUntilTaskCountLessThan(int task_count); 116 117 // Caller must acquire lock before calling. 118 void SetShutdown(); 119 120 // Compares the |shutdown_task_count_| to the |thread_count| and returns true 121 // if they are equal. This check will acquire the |lock_| so the caller 122 // should not hold the lock when calling this method. 123 bool ThreadSafeCheckShutdown(int thread_count); 124 125 private: 126 // Both worker threads and controller use the following to synchronize. 127 Lock lock_; 128 ConditionVariable work_is_available_; // To tell threads there is work. 129 130 // Conditions to notify the controlling process (if it is interested). 131 ConditionVariable all_threads_have_ids_; // All threads are running. 132 ConditionVariable no_more_tasks_; // Task count is zero. 133 134 const int thread_count_; 135 int waiting_thread_count_; 136 scoped_ptr<PlatformThreadHandle[]> thread_handles_; 137 std::vector<int> assignment_history_; // Number of assignment per worker. 138 std::vector<int> completion_history_; // Number of completions per worker. 139 int thread_started_counter_; // Used to issue unique id to workers. 140 int shutdown_task_count_; // Number of tasks told to shutdown 141 int task_count_; // Number of assignment tasks waiting to be processed. 142 TimeDelta worker_delay_; // Time each task takes to complete. 143 bool allow_help_requests_; // Workers can signal more workers. 144 bool shutdown_; // Set when threads need to terminate. 145 146 DFAKE_MUTEX(locked_methods_); 147 }; 148 149 //------------------------------------------------------------------------------ 150 // The next section contains the actual tests. 151 //------------------------------------------------------------------------------ 152 153 TEST_F(ConditionVariableTest, StartupShutdownTest) { 154 Lock lock; 155 156 // First try trivial startup/shutdown. 157 { 158 ConditionVariable cv1(&lock); 159 } // Call for cv1 destruction. 160 161 // Exercise with at least a few waits. 162 ConditionVariable cv(&lock); 163 164 lock.Acquire(); 165 cv.TimedWait(kTenMs); // Wait for 10 ms. 166 cv.TimedWait(kTenMs); // Wait for 10 ms. 167 lock.Release(); 168 169 lock.Acquire(); 170 cv.TimedWait(kTenMs); // Wait for 10 ms. 171 cv.TimedWait(kTenMs); // Wait for 10 ms. 172 cv.TimedWait(kTenMs); // Wait for 10 ms. 173 lock.Release(); 174 } // Call for cv destruction. 175 176 TEST_F(ConditionVariableTest, TimeoutTest) { 177 Lock lock; 178 ConditionVariable cv(&lock); 179 lock.Acquire(); 180 181 TimeTicks start = TimeTicks::Now(); 182 const TimeDelta WAIT_TIME = TimeDelta::FromMilliseconds(300); 183 // Allow for clocking rate granularity. 184 const TimeDelta FUDGE_TIME = TimeDelta::FromMilliseconds(50); 185 186 cv.TimedWait(WAIT_TIME + FUDGE_TIME); 187 TimeDelta duration = TimeTicks::Now() - start; 188 // We can't use EXPECT_GE here as the TimeDelta class does not support the 189 // required stream conversion. 190 EXPECT_TRUE(duration >= WAIT_TIME); 191 192 lock.Release(); 193 } 194 195 #if defined(OS_POSIX) 196 const int kDiscontinuitySeconds = 2; 197 198 void BackInTime(Lock* lock) { 199 AutoLock auto_lock(*lock); 200 201 timeval tv; 202 gettimeofday(&tv, NULL); 203 tv.tv_sec -= kDiscontinuitySeconds; 204 settimeofday(&tv, NULL); 205 } 206 207 // Tests that TimedWait ignores changes to the system clock. 208 // Test is disabled by default, because it needs to run as root to muck with the 209 // system clock. 210 // http://crbug.com/293736 211 TEST_F(ConditionVariableTest, DISABLED_TimeoutAcrossSetTimeOfDay) { 212 timeval tv; 213 gettimeofday(&tv, NULL); 214 tv.tv_sec += kDiscontinuitySeconds; 215 if (settimeofday(&tv, NULL) < 0) { 216 PLOG(ERROR) << "Could not set time of day. Run as root?"; 217 return; 218 } 219 220 Lock lock; 221 ConditionVariable cv(&lock); 222 lock.Acquire(); 223 224 Thread thread("Helper"); 225 thread.Start(); 226 thread.task_runner()->PostTask(FROM_HERE, base::Bind(&BackInTime, &lock)); 227 228 TimeTicks start = TimeTicks::Now(); 229 const TimeDelta kWaitTime = TimeDelta::FromMilliseconds(300); 230 // Allow for clocking rate granularity. 231 const TimeDelta kFudgeTime = TimeDelta::FromMilliseconds(50); 232 233 cv.TimedWait(kWaitTime + kFudgeTime); 234 TimeDelta duration = TimeTicks::Now() - start; 235 236 thread.Stop(); 237 // We can't use EXPECT_GE here as the TimeDelta class does not support the 238 // required stream conversion. 239 EXPECT_TRUE(duration >= kWaitTime); 240 EXPECT_TRUE(duration <= TimeDelta::FromSeconds(kDiscontinuitySeconds)); 241 242 lock.Release(); 243 } 244 #endif 245 246 247 // Suddenly got flaky on Win, see http://crbug.com/10607 (starting at 248 // comment #15). 249 #if defined(OS_WIN) 250 #define MAYBE_MultiThreadConsumerTest DISABLED_MultiThreadConsumerTest 251 #else 252 #define MAYBE_MultiThreadConsumerTest MultiThreadConsumerTest 253 #endif 254 // Test serial task servicing, as well as two parallel task servicing methods. 255 TEST_F(ConditionVariableTest, MAYBE_MultiThreadConsumerTest) { 256 const int kThreadCount = 10; 257 WorkQueue queue(kThreadCount); // Start the threads. 258 259 const int kTaskCount = 10; // Number of tasks in each mini-test here. 260 261 Time start_time; // Used to time task processing. 262 263 { 264 base::AutoLock auto_lock(*queue.lock()); 265 while (!queue.EveryIdWasAllocated()) 266 queue.all_threads_have_ids()->Wait(); 267 } 268 269 // If threads aren't in a wait state, they may start to gobble up tasks in 270 // parallel, short-circuiting (breaking) this test. 271 queue.SpinUntilAllThreadsAreWaiting(); 272 273 { 274 // Since we have no tasks yet, all threads should be waiting by now. 275 base::AutoLock auto_lock(*queue.lock()); 276 EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments()); 277 EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks()); 278 EXPECT_EQ(0, queue.task_count()); 279 EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread()); 280 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); 281 EXPECT_EQ(0, queue.GetNumberOfCompletedTasks()); 282 283 // Set up to make each task include getting help from another worker, so 284 // so that the work gets done in paralell. 285 queue.ResetHistory(); 286 queue.SetTaskCount(kTaskCount); 287 queue.SetWorkTime(kThirtyMs); 288 queue.SetAllowHelp(true); 289 290 start_time = Time::Now(); 291 } 292 293 queue.work_is_available()->Signal(); // But each worker can signal another. 294 // Wait till we at least start to handle tasks (and we're not all waiting). 295 queue.SpinUntilTaskCountLessThan(kTaskCount); 296 // Wait to allow the all workers to get done. 297 queue.SpinUntilAllThreadsAreWaiting(); 298 299 { 300 // Wait until all work tasks have at least been assigned. 301 base::AutoLock auto_lock(*queue.lock()); 302 while (queue.task_count()) 303 queue.no_more_tasks()->Wait(); 304 305 // To avoid racy assumptions, we'll just assert that at least 2 threads 306 // did work. We know that the first worker should have gone to sleep, and 307 // hence a second worker should have gotten an assignment. 308 EXPECT_LE(2, queue.GetNumThreadsTakingAssignments()); 309 EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks()); 310 311 // Try to ask all workers to help, and only a few will do the work. 312 queue.ResetHistory(); 313 queue.SetTaskCount(3); 314 queue.SetWorkTime(kThirtyMs); 315 queue.SetAllowHelp(false); 316 } 317 queue.work_is_available()->Broadcast(); // Make them all try. 318 // Wait till we at least start to handle tasks (and we're not all waiting). 319 queue.SpinUntilTaskCountLessThan(3); 320 // Wait to allow the 3 workers to get done. 321 queue.SpinUntilAllThreadsAreWaiting(); 322 323 { 324 base::AutoLock auto_lock(*queue.lock()); 325 EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments()); 326 EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks()); 327 EXPECT_EQ(0, queue.task_count()); 328 EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread()); 329 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); 330 EXPECT_EQ(3, queue.GetNumberOfCompletedTasks()); 331 332 // Set up to make each task get help from another worker. 333 queue.ResetHistory(); 334 queue.SetTaskCount(3); 335 queue.SetWorkTime(kThirtyMs); 336 queue.SetAllowHelp(true); // Allow (unnecessary) help requests. 337 } 338 queue.work_is_available()->Broadcast(); // Signal all threads. 339 // Wait till we at least start to handle tasks (and we're not all waiting). 340 queue.SpinUntilTaskCountLessThan(3); 341 // Wait to allow the 3 workers to get done. 342 queue.SpinUntilAllThreadsAreWaiting(); 343 344 { 345 base::AutoLock auto_lock(*queue.lock()); 346 EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments()); 347 EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks()); 348 EXPECT_EQ(0, queue.task_count()); 349 EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread()); 350 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); 351 EXPECT_EQ(3, queue.GetNumberOfCompletedTasks()); 352 353 // Set up to make each task get help from another worker. 354 queue.ResetHistory(); 355 queue.SetTaskCount(20); // 2 tasks per thread. 356 queue.SetWorkTime(kThirtyMs); 357 queue.SetAllowHelp(true); 358 } 359 queue.work_is_available()->Signal(); // But each worker can signal another. 360 // Wait till we at least start to handle tasks (and we're not all waiting). 361 queue.SpinUntilTaskCountLessThan(20); 362 // Wait to allow the 10 workers to get done. 363 queue.SpinUntilAllThreadsAreWaiting(); // Should take about 60 ms. 364 365 { 366 base::AutoLock auto_lock(*queue.lock()); 367 EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments()); 368 EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks()); 369 EXPECT_EQ(0, queue.task_count()); 370 EXPECT_EQ(20, queue.GetNumberOfCompletedTasks()); 371 372 // Same as last test, but with Broadcast(). 373 queue.ResetHistory(); 374 queue.SetTaskCount(20); // 2 tasks per thread. 375 queue.SetWorkTime(kThirtyMs); 376 queue.SetAllowHelp(true); 377 } 378 queue.work_is_available()->Broadcast(); 379 // Wait till we at least start to handle tasks (and we're not all waiting). 380 queue.SpinUntilTaskCountLessThan(20); 381 // Wait to allow the 10 workers to get done. 382 queue.SpinUntilAllThreadsAreWaiting(); // Should take about 60 ms. 383 384 { 385 base::AutoLock auto_lock(*queue.lock()); 386 EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments()); 387 EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks()); 388 EXPECT_EQ(0, queue.task_count()); 389 EXPECT_EQ(20, queue.GetNumberOfCompletedTasks()); 390 391 queue.SetShutdown(); 392 } 393 queue.work_is_available()->Broadcast(); // Force check for shutdown. 394 395 SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1), 396 queue.ThreadSafeCheckShutdown(kThreadCount)); 397 } 398 399 TEST_F(ConditionVariableTest, LargeFastTaskTest) { 400 const int kThreadCount = 200; 401 WorkQueue queue(kThreadCount); // Start the threads. 402 403 Lock private_lock; // Used locally for master to wait. 404 base::AutoLock private_held_lock(private_lock); 405 ConditionVariable private_cv(&private_lock); 406 407 { 408 base::AutoLock auto_lock(*queue.lock()); 409 while (!queue.EveryIdWasAllocated()) 410 queue.all_threads_have_ids()->Wait(); 411 } 412 413 // Wait a bit more to allow threads to reach their wait state. 414 queue.SpinUntilAllThreadsAreWaiting(); 415 416 { 417 // Since we have no tasks, all threads should be waiting by now. 418 base::AutoLock auto_lock(*queue.lock()); 419 EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments()); 420 EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks()); 421 EXPECT_EQ(0, queue.task_count()); 422 EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread()); 423 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); 424 EXPECT_EQ(0, queue.GetNumberOfCompletedTasks()); 425 426 // Set up to make all workers do (an average of) 20 tasks. 427 queue.ResetHistory(); 428 queue.SetTaskCount(20 * kThreadCount); 429 queue.SetWorkTime(kFortyFiveMs); 430 queue.SetAllowHelp(false); 431 } 432 queue.work_is_available()->Broadcast(); // Start up all threads. 433 // Wait until we've handed out all tasks. 434 { 435 base::AutoLock auto_lock(*queue.lock()); 436 while (queue.task_count() != 0) 437 queue.no_more_tasks()->Wait(); 438 } 439 440 // Wait till the last of the tasks complete. 441 queue.SpinUntilAllThreadsAreWaiting(); 442 443 { 444 // With Broadcast(), every thread should have participated. 445 // but with racing.. they may not all have done equal numbers of tasks. 446 base::AutoLock auto_lock(*queue.lock()); 447 EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments()); 448 EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks()); 449 EXPECT_EQ(0, queue.task_count()); 450 EXPECT_LE(20, queue.GetMaxCompletionsByWorkerThread()); 451 EXPECT_EQ(20 * kThreadCount, queue.GetNumberOfCompletedTasks()); 452 453 // Set up to make all workers do (an average of) 4 tasks. 454 queue.ResetHistory(); 455 queue.SetTaskCount(kThreadCount * 4); 456 queue.SetWorkTime(kFortyFiveMs); 457 queue.SetAllowHelp(true); // Might outperform Broadcast(). 458 } 459 queue.work_is_available()->Signal(); // Start up one thread. 460 461 // Wait until we've handed out all tasks 462 { 463 base::AutoLock auto_lock(*queue.lock()); 464 while (queue.task_count() != 0) 465 queue.no_more_tasks()->Wait(); 466 } 467 468 // Wait till the last of the tasks complete. 469 queue.SpinUntilAllThreadsAreWaiting(); 470 471 { 472 // With Signal(), every thread should have participated. 473 // but with racing.. they may not all have done four tasks. 474 base::AutoLock auto_lock(*queue.lock()); 475 EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments()); 476 EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks()); 477 EXPECT_EQ(0, queue.task_count()); 478 EXPECT_LE(4, queue.GetMaxCompletionsByWorkerThread()); 479 EXPECT_EQ(4 * kThreadCount, queue.GetNumberOfCompletedTasks()); 480 481 queue.SetShutdown(); 482 } 483 queue.work_is_available()->Broadcast(); // Force check for shutdown. 484 485 // Wait for shutdowns to complete. 486 SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1), 487 queue.ThreadSafeCheckShutdown(kThreadCount)); 488 } 489 490 //------------------------------------------------------------------------------ 491 // Finally we provide the implementation for the methods in the WorkQueue class. 492 //------------------------------------------------------------------------------ 493 494 WorkQueue::WorkQueue(int thread_count) 495 : lock_(), 496 work_is_available_(&lock_), 497 all_threads_have_ids_(&lock_), 498 no_more_tasks_(&lock_), 499 thread_count_(thread_count), 500 waiting_thread_count_(0), 501 thread_handles_(new PlatformThreadHandle[thread_count]), 502 assignment_history_(thread_count), 503 completion_history_(thread_count), 504 thread_started_counter_(0), 505 shutdown_task_count_(0), 506 task_count_(0), 507 allow_help_requests_(false), 508 shutdown_(false) { 509 EXPECT_GE(thread_count_, 1); 510 ResetHistory(); 511 SetTaskCount(0); 512 SetWorkTime(TimeDelta::FromMilliseconds(30)); 513 514 for (int i = 0; i < thread_count_; ++i) { 515 PlatformThreadHandle pth; 516 EXPECT_TRUE(PlatformThread::Create(0, this, &pth)); 517 thread_handles_[i] = pth; 518 } 519 } 520 521 WorkQueue::~WorkQueue() { 522 { 523 base::AutoLock auto_lock(lock_); 524 SetShutdown(); 525 } 526 work_is_available_.Broadcast(); // Tell them all to terminate. 527 528 for (int i = 0; i < thread_count_; ++i) { 529 PlatformThread::Join(thread_handles_[i]); 530 } 531 EXPECT_EQ(0, waiting_thread_count_); 532 } 533 534 int WorkQueue::GetThreadId() { 535 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 536 DCHECK(!EveryIdWasAllocated()); 537 return thread_started_counter_++; // Give out Unique IDs. 538 } 539 540 bool WorkQueue::EveryIdWasAllocated() const { 541 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 542 return thread_count_ == thread_started_counter_; 543 } 544 545 TimeDelta WorkQueue::GetAnAssignment(int thread_id) { 546 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 547 DCHECK_LT(0, task_count_); 548 assignment_history_[thread_id]++; 549 if (0 == --task_count_) { 550 no_more_tasks_.Signal(); 551 } 552 return worker_delay_; 553 } 554 555 void WorkQueue::WorkIsCompleted(int thread_id) { 556 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 557 completion_history_[thread_id]++; 558 } 559 560 int WorkQueue::task_count() const { 561 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 562 return task_count_; 563 } 564 565 bool WorkQueue::allow_help_requests() const { 566 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 567 return allow_help_requests_; 568 } 569 570 bool WorkQueue::shutdown() const { 571 lock_.AssertAcquired(); 572 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 573 return shutdown_; 574 } 575 576 // Because this method is called from the test's main thread we need to actually 577 // take the lock. Threads will call the thread_shutting_down() method with the 578 // lock already acquired. 579 bool WorkQueue::ThreadSafeCheckShutdown(int thread_count) { 580 bool all_shutdown; 581 base::AutoLock auto_lock(lock_); 582 { 583 // Declare in scope so DFAKE is guranteed to be destroyed before AutoLock. 584 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 585 all_shutdown = (shutdown_task_count_ == thread_count); 586 } 587 return all_shutdown; 588 } 589 590 void WorkQueue::thread_shutting_down() { 591 lock_.AssertAcquired(); 592 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 593 shutdown_task_count_++; 594 } 595 596 Lock* WorkQueue::lock() { 597 return &lock_; 598 } 599 600 ConditionVariable* WorkQueue::work_is_available() { 601 return &work_is_available_; 602 } 603 604 ConditionVariable* WorkQueue::all_threads_have_ids() { 605 return &all_threads_have_ids_; 606 } 607 608 ConditionVariable* WorkQueue::no_more_tasks() { 609 return &no_more_tasks_; 610 } 611 612 void WorkQueue::ResetHistory() { 613 for (int i = 0; i < thread_count_; ++i) { 614 assignment_history_[i] = 0; 615 completion_history_[i] = 0; 616 } 617 } 618 619 int WorkQueue::GetMinCompletionsByWorkerThread() const { 620 int minumum = completion_history_[0]; 621 for (int i = 0; i < thread_count_; ++i) 622 minumum = std::min(minumum, completion_history_[i]); 623 return minumum; 624 } 625 626 int WorkQueue::GetMaxCompletionsByWorkerThread() const { 627 int maximum = completion_history_[0]; 628 for (int i = 0; i < thread_count_; ++i) 629 maximum = std::max(maximum, completion_history_[i]); 630 return maximum; 631 } 632 633 int WorkQueue::GetNumThreadsTakingAssignments() const { 634 int count = 0; 635 for (int i = 0; i < thread_count_; ++i) 636 if (assignment_history_[i]) 637 count++; 638 return count; 639 } 640 641 int WorkQueue::GetNumThreadsCompletingTasks() const { 642 int count = 0; 643 for (int i = 0; i < thread_count_; ++i) 644 if (completion_history_[i]) 645 count++; 646 return count; 647 } 648 649 int WorkQueue::GetNumberOfCompletedTasks() const { 650 int total = 0; 651 for (int i = 0; i < thread_count_; ++i) 652 total += completion_history_[i]; 653 return total; 654 } 655 656 void WorkQueue::SetWorkTime(TimeDelta delay) { 657 worker_delay_ = delay; 658 } 659 660 void WorkQueue::SetTaskCount(int count) { 661 task_count_ = count; 662 } 663 664 void WorkQueue::SetAllowHelp(bool allow) { 665 allow_help_requests_ = allow; 666 } 667 668 void WorkQueue::SetShutdown() { 669 lock_.AssertAcquired(); 670 shutdown_ = true; 671 } 672 673 void WorkQueue::SpinUntilAllThreadsAreWaiting() { 674 while (true) { 675 { 676 base::AutoLock auto_lock(lock_); 677 if (waiting_thread_count_ == thread_count_) 678 break; 679 } 680 PlatformThread::Sleep(TimeDelta::FromMilliseconds(30)); 681 } 682 } 683 684 void WorkQueue::SpinUntilTaskCountLessThan(int task_count) { 685 while (true) { 686 { 687 base::AutoLock auto_lock(lock_); 688 if (task_count_ < task_count) 689 break; 690 } 691 PlatformThread::Sleep(TimeDelta::FromMilliseconds(30)); 692 } 693 } 694 695 696 //------------------------------------------------------------------------------ 697 // Define the standard worker task. Several tests will spin out many of these 698 // threads. 699 //------------------------------------------------------------------------------ 700 701 // The multithread tests involve several threads with a task to perform as 702 // directed by an instance of the class WorkQueue. 703 // The task is to: 704 // a) Check to see if there are more tasks (there is a task counter). 705 // a1) Wait on condition variable if there are no tasks currently. 706 // b) Call a function to see what should be done. 707 // c) Do some computation based on the number of milliseconds returned in (b). 708 // d) go back to (a). 709 710 // WorkQueue::ThreadMain() implements the above task for all threads. 711 // It calls the controlling object to tell the creator about progress, and to 712 // ask about tasks. 713 714 void WorkQueue::ThreadMain() { 715 int thread_id; 716 { 717 base::AutoLock auto_lock(lock_); 718 thread_id = GetThreadId(); 719 if (EveryIdWasAllocated()) 720 all_threads_have_ids()->Signal(); // Tell creator we're ready. 721 } 722 723 Lock private_lock; // Used to waste time on "our work". 724 while (1) { // This is the main consumer loop. 725 TimeDelta work_time; 726 bool could_use_help; 727 { 728 base::AutoLock auto_lock(lock_); 729 while (0 == task_count() && !shutdown()) { 730 ++waiting_thread_count_; 731 work_is_available()->Wait(); 732 --waiting_thread_count_; 733 } 734 if (shutdown()) { 735 // Ack the notification of a shutdown message back to the controller. 736 thread_shutting_down(); 737 return; // Terminate. 738 } 739 // Get our task duration from the queue. 740 work_time = GetAnAssignment(thread_id); 741 could_use_help = (task_count() > 0) && allow_help_requests(); 742 } // Release lock 743 744 // Do work (outside of locked region. 745 if (could_use_help) 746 work_is_available()->Signal(); // Get help from other threads. 747 748 if (work_time > TimeDelta::FromMilliseconds(0)) { 749 // We could just sleep(), but we'll instead further exercise the 750 // condition variable class, and do a timed wait. 751 base::AutoLock auto_lock(private_lock); 752 ConditionVariable private_cv(&private_lock); 753 private_cv.TimedWait(work_time); // Unsynchronized waiting. 754 } 755 756 { 757 base::AutoLock auto_lock(lock_); 758 // Send notification that we completed our "work." 759 WorkIsCompleted(thread_id); 760 } 761 } 762 } 763 764 } // namespace 765 766 } // namespace base 767