1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 // Multi-threaded tests of ConditionVariable class. 6 7 #include <time.h> 8 #include <algorithm> 9 #include <vector> 10 11 #include "base/logging.h" 12 #include "base/memory/scoped_ptr.h" 13 #include "base/spin_wait.h" 14 #include "base/synchronization/condition_variable.h" 15 #include "base/synchronization/lock.h" 16 #include "base/threading/platform_thread.h" 17 #include "base/threading/thread_collision_warner.h" 18 #include "base/time.h" 19 #include "testing/gtest/include/gtest/gtest.h" 20 #include "testing/platform_test.h" 21 22 namespace base { 23 24 namespace { 25 //------------------------------------------------------------------------------ 26 // Define our test class, with several common variables. 27 //------------------------------------------------------------------------------ 28 29 class ConditionVariableTest : public PlatformTest { 30 public: 31 const TimeDelta kZeroMs; 32 const TimeDelta kTenMs; 33 const TimeDelta kThirtyMs; 34 const TimeDelta kFortyFiveMs; 35 const TimeDelta kSixtyMs; 36 const TimeDelta kOneHundredMs; 37 38 explicit ConditionVariableTest() 39 : kZeroMs(TimeDelta::FromMilliseconds(0)), 40 kTenMs(TimeDelta::FromMilliseconds(10)), 41 kThirtyMs(TimeDelta::FromMilliseconds(30)), 42 kFortyFiveMs(TimeDelta::FromMilliseconds(45)), 43 kSixtyMs(TimeDelta::FromMilliseconds(60)), 44 kOneHundredMs(TimeDelta::FromMilliseconds(100)) { 45 } 46 }; 47 48 //------------------------------------------------------------------------------ 49 // Define a class that will control activities an several multi-threaded tests. 50 // The general structure of multi-threaded tests is that a test case will 51 // construct an instance of a WorkQueue. The WorkQueue will spin up some 52 // threads and control them throughout their lifetime, as well as maintaining 53 // a central repository of the work thread's activity. Finally, the WorkQueue 54 // will command the the worker threads to terminate. At that point, the test 55 // cases will validate that the WorkQueue has records showing that the desired 56 // activities were performed. 57 //------------------------------------------------------------------------------ 58 59 // Callers are responsible for synchronizing access to the following class. 60 // The WorkQueue::lock_, as accessed via WorkQueue::lock(), should be used for 61 // all synchronized access. 62 class WorkQueue : public PlatformThread::Delegate { 63 public: 64 explicit WorkQueue(int thread_count); 65 ~WorkQueue(); 66 67 // PlatformThread::Delegate interface. 68 void ThreadMain(); 69 70 //---------------------------------------------------------------------------- 71 // Worker threads only call the following methods. 72 // They should use the lock to get exclusive access. 73 int GetThreadId(); // Get an ID assigned to a thread.. 74 bool EveryIdWasAllocated() const; // Indicates that all IDs were handed out. 75 TimeDelta GetAnAssignment(int thread_id); // Get a work task duration. 76 void WorkIsCompleted(int thread_id); 77 78 int task_count() const; 79 bool allow_help_requests() const; // Workers can signal more workers. 80 bool shutdown() const; // Check if shutdown has been requested. 81 82 void thread_shutting_down(); 83 84 85 //---------------------------------------------------------------------------- 86 // Worker threads can call them but not needed to acquire a lock. 87 Lock* lock(); 88 89 ConditionVariable* work_is_available(); 90 ConditionVariable* all_threads_have_ids(); 91 ConditionVariable* no_more_tasks(); 92 93 //---------------------------------------------------------------------------- 94 // The rest of the methods are for use by the controlling master thread (the 95 // test case code). 96 void ResetHistory(); 97 int GetMinCompletionsByWorkerThread() const; 98 int GetMaxCompletionsByWorkerThread() const; 99 int GetNumThreadsTakingAssignments() const; 100 int GetNumThreadsCompletingTasks() const; 101 int GetNumberOfCompletedTasks() const; 102 TimeDelta GetWorkTime() const; 103 104 void SetWorkTime(TimeDelta delay); 105 void SetTaskCount(int count); 106 void SetAllowHelp(bool allow); 107 108 // The following must be called without locking, and will spin wait until the 109 // threads are all in a wait state. 110 void SpinUntilAllThreadsAreWaiting(); 111 void SpinUntilTaskCountLessThan(int task_count); 112 113 // Caller must acquire lock before calling. 114 void SetShutdown(); 115 116 // Compares the |shutdown_task_count_| to the |thread_count| and returns true 117 // if they are equal. This check will acquire the |lock_| so the caller 118 // should not hold the lock when calling this method. 119 bool ThreadSafeCheckShutdown(int thread_count); 120 121 private: 122 // Both worker threads and controller use the following to synchronize. 123 Lock lock_; 124 ConditionVariable work_is_available_; // To tell threads there is work. 125 126 // Conditions to notify the controlling process (if it is interested). 127 ConditionVariable all_threads_have_ids_; // All threads are running. 128 ConditionVariable no_more_tasks_; // Task count is zero. 129 130 const int thread_count_; 131 int waiting_thread_count_; 132 scoped_array<PlatformThreadHandle> thread_handles_; 133 std::vector<int> assignment_history_; // Number of assignment per worker. 134 std::vector<int> completion_history_; // Number of completions per worker. 135 int thread_started_counter_; // Used to issue unique id to workers. 136 int shutdown_task_count_; // Number of tasks told to shutdown 137 int task_count_; // Number of assignment tasks waiting to be processed. 138 TimeDelta worker_delay_; // Time each task takes to complete. 139 bool allow_help_requests_; // Workers can signal more workers. 140 bool shutdown_; // Set when threads need to terminate. 141 142 DFAKE_MUTEX(locked_methods_); 143 }; 144 145 //------------------------------------------------------------------------------ 146 // The next section contains the actual tests. 147 //------------------------------------------------------------------------------ 148 149 TEST_F(ConditionVariableTest, StartupShutdownTest) { 150 Lock lock; 151 152 // First try trivial startup/shutdown. 153 { 154 ConditionVariable cv1(&lock); 155 } // Call for cv1 destruction. 156 157 // Exercise with at least a few waits. 158 ConditionVariable cv(&lock); 159 160 lock.Acquire(); 161 cv.TimedWait(kTenMs); // Wait for 10 ms. 162 cv.TimedWait(kTenMs); // Wait for 10 ms. 163 lock.Release(); 164 165 lock.Acquire(); 166 cv.TimedWait(kTenMs); // Wait for 10 ms. 167 cv.TimedWait(kTenMs); // Wait for 10 ms. 168 cv.TimedWait(kTenMs); // Wait for 10 ms. 169 lock.Release(); 170 } // Call for cv destruction. 171 172 TEST_F(ConditionVariableTest, TimeoutTest) { 173 Lock lock; 174 ConditionVariable cv(&lock); 175 lock.Acquire(); 176 177 TimeTicks start = TimeTicks::Now(); 178 const TimeDelta WAIT_TIME = TimeDelta::FromMilliseconds(300); 179 // Allow for clocking rate granularity. 180 const TimeDelta FUDGE_TIME = TimeDelta::FromMilliseconds(50); 181 182 cv.TimedWait(WAIT_TIME + FUDGE_TIME); 183 TimeDelta duration = TimeTicks::Now() - start; 184 // We can't use EXPECT_GE here as the TimeDelta class does not support the 185 // required stream conversion. 186 EXPECT_TRUE(duration >= WAIT_TIME); 187 188 lock.Release(); 189 } 190 191 // Test serial task servicing, as well as two parallel task servicing methods. 192 TEST_F(ConditionVariableTest, MultiThreadConsumerTest) { 193 const int kThreadCount = 10; 194 WorkQueue queue(kThreadCount); // Start the threads. 195 196 const int kTaskCount = 10; // Number of tasks in each mini-test here. 197 198 Time start_time; // Used to time task processing. 199 200 { 201 base::AutoLock auto_lock(*queue.lock()); 202 while (!queue.EveryIdWasAllocated()) 203 queue.all_threads_have_ids()->Wait(); 204 } 205 206 // If threads aren't in a wait state, they may start to gobble up tasks in 207 // parallel, short-circuiting (breaking) this test. 208 queue.SpinUntilAllThreadsAreWaiting(); 209 210 { 211 // Since we have no tasks yet, all threads should be waiting by now. 212 base::AutoLock auto_lock(*queue.lock()); 213 EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments()); 214 EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks()); 215 EXPECT_EQ(0, queue.task_count()); 216 EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread()); 217 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); 218 EXPECT_EQ(0, queue.GetNumberOfCompletedTasks()); 219 220 // Set up to make one worker do 30ms tasks sequentially. 221 queue.ResetHistory(); 222 queue.SetTaskCount(kTaskCount); 223 queue.SetWorkTime(kThirtyMs); 224 queue.SetAllowHelp(false); 225 226 start_time = Time::Now(); 227 } 228 229 queue.work_is_available()->Signal(); // Start up one thread. 230 // Wait till we at least start to handle tasks (and we're not all waiting). 231 queue.SpinUntilTaskCountLessThan(kTaskCount); 232 233 { 234 // Wait until all 10 work tasks have at least been assigned. 235 base::AutoLock auto_lock(*queue.lock()); 236 while (queue.task_count()) 237 queue.no_more_tasks()->Wait(); 238 // The last of the tasks *might* still be running, but... all but one should 239 // be done by now, since tasks are being done serially. 240 EXPECT_LE(queue.GetWorkTime().InMilliseconds() * (kTaskCount - 1), 241 (Time::Now() - start_time).InMilliseconds()); 242 243 EXPECT_EQ(1, queue.GetNumThreadsTakingAssignments()); 244 EXPECT_EQ(1, queue.GetNumThreadsCompletingTasks()); 245 EXPECT_LE(kTaskCount - 1, queue.GetMaxCompletionsByWorkerThread()); 246 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); 247 EXPECT_LE(kTaskCount - 1, queue.GetNumberOfCompletedTasks()); 248 } 249 250 // Wait to be sure all tasks are done. 251 queue.SpinUntilAllThreadsAreWaiting(); 252 253 { 254 // Check that all work was done by one thread id. 255 base::AutoLock auto_lock(*queue.lock()); 256 EXPECT_EQ(1, queue.GetNumThreadsTakingAssignments()); 257 EXPECT_EQ(1, queue.GetNumThreadsCompletingTasks()); 258 EXPECT_EQ(0, queue.task_count()); 259 EXPECT_EQ(kTaskCount, queue.GetMaxCompletionsByWorkerThread()); 260 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); 261 EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks()); 262 263 // Set up to make each task include getting help from another worker, so 264 // so that the work gets done in paralell. 265 queue.ResetHistory(); 266 queue.SetTaskCount(kTaskCount); 267 queue.SetWorkTime(kThirtyMs); 268 queue.SetAllowHelp(true); 269 270 start_time = Time::Now(); 271 } 272 273 queue.work_is_available()->Signal(); // But each worker can signal another. 274 // Wait till we at least start to handle tasks (and we're not all waiting). 275 queue.SpinUntilTaskCountLessThan(kTaskCount); 276 // Wait to allow the all workers to get done. 277 queue.SpinUntilAllThreadsAreWaiting(); 278 279 { 280 // Wait until all work tasks have at least been assigned. 281 base::AutoLock auto_lock(*queue.lock()); 282 while (queue.task_count()) 283 queue.no_more_tasks()->Wait(); 284 285 // To avoid racy assumptions, we'll just assert that at least 2 threads 286 // did work. We know that the first worker should have gone to sleep, and 287 // hence a second worker should have gotten an assignment. 288 EXPECT_LE(2, queue.GetNumThreadsTakingAssignments()); 289 EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks()); 290 291 // Try to ask all workers to help, and only a few will do the work. 292 queue.ResetHistory(); 293 queue.SetTaskCount(3); 294 queue.SetWorkTime(kThirtyMs); 295 queue.SetAllowHelp(false); 296 } 297 queue.work_is_available()->Broadcast(); // Make them all try. 298 // Wait till we at least start to handle tasks (and we're not all waiting). 299 queue.SpinUntilTaskCountLessThan(3); 300 // Wait to allow the 3 workers to get done. 301 queue.SpinUntilAllThreadsAreWaiting(); 302 303 { 304 base::AutoLock auto_lock(*queue.lock()); 305 EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments()); 306 EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks()); 307 EXPECT_EQ(0, queue.task_count()); 308 EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread()); 309 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); 310 EXPECT_EQ(3, queue.GetNumberOfCompletedTasks()); 311 312 // Set up to make each task get help from another worker. 313 queue.ResetHistory(); 314 queue.SetTaskCount(3); 315 queue.SetWorkTime(kThirtyMs); 316 queue.SetAllowHelp(true); // Allow (unnecessary) help requests. 317 } 318 queue.work_is_available()->Broadcast(); // Signal all threads. 319 // Wait till we at least start to handle tasks (and we're not all waiting). 320 queue.SpinUntilTaskCountLessThan(3); 321 // Wait to allow the 3 workers to get done. 322 queue.SpinUntilAllThreadsAreWaiting(); 323 324 { 325 base::AutoLock auto_lock(*queue.lock()); 326 EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments()); 327 EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks()); 328 EXPECT_EQ(0, queue.task_count()); 329 EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread()); 330 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); 331 EXPECT_EQ(3, queue.GetNumberOfCompletedTasks()); 332 333 // Set up to make each task get help from another worker. 334 queue.ResetHistory(); 335 queue.SetTaskCount(20); // 2 tasks per thread. 336 queue.SetWorkTime(kThirtyMs); 337 queue.SetAllowHelp(true); 338 } 339 queue.work_is_available()->Signal(); // But each worker can signal another. 340 // Wait till we at least start to handle tasks (and we're not all waiting). 341 queue.SpinUntilTaskCountLessThan(20); 342 // Wait to allow the 10 workers to get done. 343 queue.SpinUntilAllThreadsAreWaiting(); // Should take about 60 ms. 344 345 { 346 base::AutoLock auto_lock(*queue.lock()); 347 EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments()); 348 EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks()); 349 EXPECT_EQ(0, queue.task_count()); 350 EXPECT_EQ(20, queue.GetNumberOfCompletedTasks()); 351 352 // Same as last test, but with Broadcast(). 353 queue.ResetHistory(); 354 queue.SetTaskCount(20); // 2 tasks per thread. 355 queue.SetWorkTime(kThirtyMs); 356 queue.SetAllowHelp(true); 357 } 358 queue.work_is_available()->Broadcast(); 359 // Wait till we at least start to handle tasks (and we're not all waiting). 360 queue.SpinUntilTaskCountLessThan(20); 361 // Wait to allow the 10 workers to get done. 362 queue.SpinUntilAllThreadsAreWaiting(); // Should take about 60 ms. 363 364 { 365 base::AutoLock auto_lock(*queue.lock()); 366 EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments()); 367 EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks()); 368 EXPECT_EQ(0, queue.task_count()); 369 EXPECT_EQ(20, queue.GetNumberOfCompletedTasks()); 370 371 queue.SetShutdown(); 372 } 373 queue.work_is_available()->Broadcast(); // Force check for shutdown. 374 375 SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1), 376 queue.ThreadSafeCheckShutdown(kThreadCount)); 377 } 378 379 TEST_F(ConditionVariableTest, LargeFastTaskTest) { 380 const int kThreadCount = 200; 381 WorkQueue queue(kThreadCount); // Start the threads. 382 383 Lock private_lock; // Used locally for master to wait. 384 base::AutoLock private_held_lock(private_lock); 385 ConditionVariable private_cv(&private_lock); 386 387 { 388 base::AutoLock auto_lock(*queue.lock()); 389 while (!queue.EveryIdWasAllocated()) 390 queue.all_threads_have_ids()->Wait(); 391 } 392 393 // Wait a bit more to allow threads to reach their wait state. 394 queue.SpinUntilAllThreadsAreWaiting(); 395 396 { 397 // Since we have no tasks, all threads should be waiting by now. 398 base::AutoLock auto_lock(*queue.lock()); 399 EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments()); 400 EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks()); 401 EXPECT_EQ(0, queue.task_count()); 402 EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread()); 403 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread()); 404 EXPECT_EQ(0, queue.GetNumberOfCompletedTasks()); 405 406 // Set up to make all workers do (an average of) 20 tasks. 407 queue.ResetHistory(); 408 queue.SetTaskCount(20 * kThreadCount); 409 queue.SetWorkTime(kFortyFiveMs); 410 queue.SetAllowHelp(false); 411 } 412 queue.work_is_available()->Broadcast(); // Start up all threads. 413 // Wait until we've handed out all tasks. 414 { 415 base::AutoLock auto_lock(*queue.lock()); 416 while (queue.task_count() != 0) 417 queue.no_more_tasks()->Wait(); 418 } 419 420 // Wait till the last of the tasks complete. 421 queue.SpinUntilAllThreadsAreWaiting(); 422 423 { 424 // With Broadcast(), every thread should have participated. 425 // but with racing.. they may not all have done equal numbers of tasks. 426 base::AutoLock auto_lock(*queue.lock()); 427 EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments()); 428 EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks()); 429 EXPECT_EQ(0, queue.task_count()); 430 EXPECT_LE(20, queue.GetMaxCompletionsByWorkerThread()); 431 EXPECT_EQ(20 * kThreadCount, queue.GetNumberOfCompletedTasks()); 432 433 // Set up to make all workers do (an average of) 4 tasks. 434 queue.ResetHistory(); 435 queue.SetTaskCount(kThreadCount * 4); 436 queue.SetWorkTime(kFortyFiveMs); 437 queue.SetAllowHelp(true); // Might outperform Broadcast(). 438 } 439 queue.work_is_available()->Signal(); // Start up one thread. 440 441 // Wait until we've handed out all tasks 442 { 443 base::AutoLock auto_lock(*queue.lock()); 444 while (queue.task_count() != 0) 445 queue.no_more_tasks()->Wait(); 446 } 447 448 // Wait till the last of the tasks complete. 449 queue.SpinUntilAllThreadsAreWaiting(); 450 451 { 452 // With Signal(), every thread should have participated. 453 // but with racing.. they may not all have done four tasks. 454 base::AutoLock auto_lock(*queue.lock()); 455 EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments()); 456 EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks()); 457 EXPECT_EQ(0, queue.task_count()); 458 EXPECT_LE(4, queue.GetMaxCompletionsByWorkerThread()); 459 EXPECT_EQ(4 * kThreadCount, queue.GetNumberOfCompletedTasks()); 460 461 queue.SetShutdown(); 462 } 463 queue.work_is_available()->Broadcast(); // Force check for shutdown. 464 465 // Wait for shutdowns to complete. 466 SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1), 467 queue.ThreadSafeCheckShutdown(kThreadCount)); 468 } 469 470 //------------------------------------------------------------------------------ 471 // Finally we provide the implementation for the methods in the WorkQueue class. 472 //------------------------------------------------------------------------------ 473 474 WorkQueue::WorkQueue(int thread_count) 475 : lock_(), 476 work_is_available_(&lock_), 477 all_threads_have_ids_(&lock_), 478 no_more_tasks_(&lock_), 479 thread_count_(thread_count), 480 waiting_thread_count_(0), 481 thread_handles_(new PlatformThreadHandle[thread_count]), 482 assignment_history_(thread_count), 483 completion_history_(thread_count), 484 thread_started_counter_(0), 485 shutdown_task_count_(0), 486 task_count_(0), 487 allow_help_requests_(false), 488 shutdown_(false) { 489 EXPECT_GE(thread_count_, 1); 490 ResetHistory(); 491 SetTaskCount(0); 492 SetWorkTime(TimeDelta::FromMilliseconds(30)); 493 494 for (int i = 0; i < thread_count_; ++i) { 495 PlatformThreadHandle pth; 496 EXPECT_TRUE(PlatformThread::Create(0, this, &pth)); 497 thread_handles_[i] = pth; 498 } 499 } 500 501 WorkQueue::~WorkQueue() { 502 { 503 base::AutoLock auto_lock(lock_); 504 SetShutdown(); 505 } 506 work_is_available_.Broadcast(); // Tell them all to terminate. 507 508 for (int i = 0; i < thread_count_; ++i) { 509 PlatformThread::Join(thread_handles_[i]); 510 } 511 EXPECT_EQ(0, waiting_thread_count_); 512 } 513 514 int WorkQueue::GetThreadId() { 515 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 516 DCHECK(!EveryIdWasAllocated()); 517 return thread_started_counter_++; // Give out Unique IDs. 518 } 519 520 bool WorkQueue::EveryIdWasAllocated() const { 521 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 522 return thread_count_ == thread_started_counter_; 523 } 524 525 TimeDelta WorkQueue::GetAnAssignment(int thread_id) { 526 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 527 DCHECK_LT(0, task_count_); 528 assignment_history_[thread_id]++; 529 if (0 == --task_count_) { 530 no_more_tasks_.Signal(); 531 } 532 return worker_delay_; 533 } 534 535 void WorkQueue::WorkIsCompleted(int thread_id) { 536 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 537 completion_history_[thread_id]++; 538 } 539 540 int WorkQueue::task_count() const { 541 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 542 return task_count_; 543 } 544 545 bool WorkQueue::allow_help_requests() const { 546 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 547 return allow_help_requests_; 548 } 549 550 bool WorkQueue::shutdown() const { 551 lock_.AssertAcquired(); 552 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 553 return shutdown_; 554 } 555 556 // Because this method is called from the test's main thread we need to actually 557 // take the lock. Threads will call the thread_shutting_down() method with the 558 // lock already acquired. 559 bool WorkQueue::ThreadSafeCheckShutdown(int thread_count) { 560 bool all_shutdown; 561 base::AutoLock auto_lock(lock_); 562 { 563 // Declare in scope so DFAKE is guranteed to be destroyed before AutoLock. 564 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 565 all_shutdown = (shutdown_task_count_ == thread_count); 566 } 567 return all_shutdown; 568 } 569 570 void WorkQueue::thread_shutting_down() { 571 lock_.AssertAcquired(); 572 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_); 573 shutdown_task_count_++; 574 } 575 576 Lock* WorkQueue::lock() { 577 return &lock_; 578 } 579 580 ConditionVariable* WorkQueue::work_is_available() { 581 return &work_is_available_; 582 } 583 584 ConditionVariable* WorkQueue::all_threads_have_ids() { 585 return &all_threads_have_ids_; 586 } 587 588 ConditionVariable* WorkQueue::no_more_tasks() { 589 return &no_more_tasks_; 590 } 591 592 void WorkQueue::ResetHistory() { 593 for (int i = 0; i < thread_count_; ++i) { 594 assignment_history_[i] = 0; 595 completion_history_[i] = 0; 596 } 597 } 598 599 int WorkQueue::GetMinCompletionsByWorkerThread() const { 600 int minumum = completion_history_[0]; 601 for (int i = 0; i < thread_count_; ++i) 602 minumum = std::min(minumum, completion_history_[i]); 603 return minumum; 604 } 605 606 int WorkQueue::GetMaxCompletionsByWorkerThread() const { 607 int maximum = completion_history_[0]; 608 for (int i = 0; i < thread_count_; ++i) 609 maximum = std::max(maximum, completion_history_[i]); 610 return maximum; 611 } 612 613 int WorkQueue::GetNumThreadsTakingAssignments() const { 614 int count = 0; 615 for (int i = 0; i < thread_count_; ++i) 616 if (assignment_history_[i]) 617 count++; 618 return count; 619 } 620 621 int WorkQueue::GetNumThreadsCompletingTasks() const { 622 int count = 0; 623 for (int i = 0; i < thread_count_; ++i) 624 if (completion_history_[i]) 625 count++; 626 return count; 627 } 628 629 int WorkQueue::GetNumberOfCompletedTasks() const { 630 int total = 0; 631 for (int i = 0; i < thread_count_; ++i) 632 total += completion_history_[i]; 633 return total; 634 } 635 636 TimeDelta WorkQueue::GetWorkTime() const { 637 return worker_delay_; 638 } 639 640 void WorkQueue::SetWorkTime(TimeDelta delay) { 641 worker_delay_ = delay; 642 } 643 644 void WorkQueue::SetTaskCount(int count) { 645 task_count_ = count; 646 } 647 648 void WorkQueue::SetAllowHelp(bool allow) { 649 allow_help_requests_ = allow; 650 } 651 652 void WorkQueue::SetShutdown() { 653 lock_.AssertAcquired(); 654 shutdown_ = true; 655 } 656 657 void WorkQueue::SpinUntilAllThreadsAreWaiting() { 658 while (true) { 659 { 660 base::AutoLock auto_lock(lock_); 661 if (waiting_thread_count_ == thread_count_) 662 break; 663 } 664 PlatformThread::Sleep(30); 665 } 666 } 667 668 void WorkQueue::SpinUntilTaskCountLessThan(int task_count) { 669 while (true) { 670 { 671 base::AutoLock auto_lock(lock_); 672 if (task_count_ < task_count) 673 break; 674 } 675 PlatformThread::Sleep(30); 676 } 677 } 678 679 680 //------------------------------------------------------------------------------ 681 // Define the standard worker task. Several tests will spin out many of these 682 // threads. 683 //------------------------------------------------------------------------------ 684 685 // The multithread tests involve several threads with a task to perform as 686 // directed by an instance of the class WorkQueue. 687 // The task is to: 688 // a) Check to see if there are more tasks (there is a task counter). 689 // a1) Wait on condition variable if there are no tasks currently. 690 // b) Call a function to see what should be done. 691 // c) Do some computation based on the number of milliseconds returned in (b). 692 // d) go back to (a). 693 694 // WorkQueue::ThreadMain() implements the above task for all threads. 695 // It calls the controlling object to tell the creator about progress, and to 696 // ask about tasks. 697 698 void WorkQueue::ThreadMain() { 699 int thread_id; 700 { 701 base::AutoLock auto_lock(lock_); 702 thread_id = GetThreadId(); 703 if (EveryIdWasAllocated()) 704 all_threads_have_ids()->Signal(); // Tell creator we're ready. 705 } 706 707 Lock private_lock; // Used to waste time on "our work". 708 while (1) { // This is the main consumer loop. 709 TimeDelta work_time; 710 bool could_use_help; 711 { 712 base::AutoLock auto_lock(lock_); 713 while (0 == task_count() && !shutdown()) { 714 ++waiting_thread_count_; 715 work_is_available()->Wait(); 716 --waiting_thread_count_; 717 } 718 if (shutdown()) { 719 // Ack the notification of a shutdown message back to the controller. 720 thread_shutting_down(); 721 return; // Terminate. 722 } 723 // Get our task duration from the queue. 724 work_time = GetAnAssignment(thread_id); 725 could_use_help = (task_count() > 0) && allow_help_requests(); 726 } // Release lock 727 728 // Do work (outside of locked region. 729 if (could_use_help) 730 work_is_available()->Signal(); // Get help from other threads. 731 732 if (work_time > TimeDelta::FromMilliseconds(0)) { 733 // We could just sleep(), but we'll instead further exercise the 734 // condition variable class, and do a timed wait. 735 base::AutoLock auto_lock(private_lock); 736 ConditionVariable private_cv(&private_lock); 737 private_cv.TimedWait(work_time); // Unsynchronized waiting. 738 } 739 740 { 741 base::AutoLock auto_lock(lock_); 742 // Send notification that we completed our "work." 743 WorkIsCompleted(thread_id); 744 } 745 } 746 } 747 748 } // namespace 749 750 } // namespace base 751