1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "cc/resources/worker_pool.h" 6 7 #include <algorithm> 8 #include <queue> 9 10 #include "base/bind.h" 11 #include "base/containers/hash_tables.h" 12 #include "base/debug/trace_event.h" 13 #include "base/strings/stringprintf.h" 14 #include "base/synchronization/condition_variable.h" 15 #include "base/threading/simple_thread.h" 16 #include "base/threading/thread_restrictions.h" 17 #include "cc/base/scoped_ptr_deque.h" 18 19 namespace cc { 20 21 namespace internal { 22 23 WorkerPoolTask::WorkerPoolTask() 24 : did_schedule_(false), 25 did_run_(false), 26 did_complete_(false) { 27 } 28 29 WorkerPoolTask::~WorkerPoolTask() { 30 DCHECK_EQ(did_schedule_, did_complete_); 31 DCHECK(!did_run_ || did_schedule_); 32 DCHECK(!did_run_ || did_complete_); 33 } 34 35 void WorkerPoolTask::DidSchedule() { 36 DCHECK(!did_complete_); 37 did_schedule_ = true; 38 } 39 40 void WorkerPoolTask::WillRun() { 41 DCHECK(did_schedule_); 42 DCHECK(!did_complete_); 43 DCHECK(!did_run_); 44 } 45 46 void WorkerPoolTask::DidRun() { 47 did_run_ = true; 48 } 49 50 void WorkerPoolTask::WillComplete() { 51 DCHECK(!did_complete_); 52 } 53 54 void WorkerPoolTask::DidComplete() { 55 DCHECK(did_schedule_); 56 DCHECK(!did_complete_); 57 did_complete_ = true; 58 } 59 60 bool WorkerPoolTask::HasFinishedRunning() const { 61 return did_run_; 62 } 63 64 bool WorkerPoolTask::HasCompleted() const { 65 return did_complete_; 66 } 67 68 GraphNode::GraphNode(internal::WorkerPoolTask* task, unsigned priority) 69 : task_(task), 70 priority_(priority), 71 num_dependencies_(0) { 72 } 73 74 GraphNode::~GraphNode() { 75 } 76 77 } // namespace internal 78 79 // Internal to the worker pool. Any data or logic that needs to be 80 // shared between threads lives in this class. All members are guarded 81 // by |lock_|. 82 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { 83 public: 84 Inner(size_t num_threads, const std::string& thread_name_prefix); 85 virtual ~Inner(); 86 87 void Shutdown(); 88 89 // Schedule running of tasks in |graph|. Tasks previously scheduled but 90 // no longer needed will be canceled unless already running. Canceled 91 // tasks are moved to |completed_tasks_| without being run. The result 92 // is that once scheduled, a task is guaranteed to end up in the 93 // |completed_tasks_| queue even if they later get canceled by another 94 // call to SetTaskGraph(). 95 void SetTaskGraph(TaskGraph* graph); 96 97 // Collect all completed tasks in |completed_tasks|. 98 void CollectCompletedTasks(TaskVector* completed_tasks); 99 100 private: 101 class PriorityComparator { 102 public: 103 bool operator()(const internal::GraphNode* a, 104 const internal::GraphNode* b) { 105 // In this system, numerically lower priority is run first. 106 if (a->priority() != b->priority()) 107 return a->priority() > b->priority(); 108 109 // Run task with most dependents first when priority is the same. 110 return a->dependents().size() < b->dependents().size(); 111 } 112 }; 113 114 // Overridden from base::DelegateSimpleThread: 115 virtual void Run() OVERRIDE; 116 117 // This lock protects all members of this class except 118 // |worker_pool_on_origin_thread_|. Do not read or modify anything 119 // without holding this lock. Do not block while holding this lock. 120 mutable base::Lock lock_; 121 122 // Condition variable that is waited on by worker threads until new 123 // tasks are ready to run or shutdown starts. 124 base::ConditionVariable has_ready_to_run_tasks_cv_; 125 126 // Provides each running thread loop with a unique index. First thread 127 // loop index is 0. 128 unsigned next_thread_index_; 129 130 // Set during shutdown. Tells workers to exit when no more tasks 131 // are pending. 132 bool shutdown_; 133 134 // This set contains all pending tasks. 135 GraphNodeMap pending_tasks_; 136 137 // Ordered set of tasks that are ready to run. 138 typedef std::priority_queue<internal::GraphNode*, 139 std::vector<internal::GraphNode*>, 140 PriorityComparator> TaskQueue; 141 TaskQueue ready_to_run_tasks_; 142 143 // This set contains all currently running tasks. 144 GraphNodeMap running_tasks_; 145 146 // Completed tasks not yet collected by origin thread. 147 TaskVector completed_tasks_; 148 149 ScopedPtrDeque<base::DelegateSimpleThread> workers_; 150 151 DISALLOW_COPY_AND_ASSIGN(Inner); 152 }; 153 154 WorkerPool::Inner::Inner( 155 size_t num_threads, const std::string& thread_name_prefix) 156 : lock_(), 157 has_ready_to_run_tasks_cv_(&lock_), 158 next_thread_index_(0), 159 shutdown_(false) { 160 base::AutoLock lock(lock_); 161 162 while (workers_.size() < num_threads) { 163 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( 164 new base::DelegateSimpleThread( 165 this, 166 thread_name_prefix + 167 base::StringPrintf( 168 "Worker%u", 169 static_cast<unsigned>(workers_.size() + 1)).c_str())); 170 worker->Start(); 171 #if defined(OS_ANDROID) || defined(OS_LINUX) 172 worker->SetThreadPriority(base::kThreadPriority_Background); 173 #endif 174 workers_.push_back(worker.Pass()); 175 } 176 } 177 178 WorkerPool::Inner::~Inner() { 179 base::AutoLock lock(lock_); 180 181 DCHECK(shutdown_); 182 183 DCHECK_EQ(0u, pending_tasks_.size()); 184 DCHECK_EQ(0u, ready_to_run_tasks_.size()); 185 DCHECK_EQ(0u, running_tasks_.size()); 186 DCHECK_EQ(0u, completed_tasks_.size()); 187 } 188 189 void WorkerPool::Inner::Shutdown() { 190 { 191 base::AutoLock lock(lock_); 192 193 DCHECK(!shutdown_); 194 shutdown_ = true; 195 196 // Wake up a worker so it knows it should exit. This will cause all workers 197 // to exit as each will wake up another worker before exiting. 198 has_ready_to_run_tasks_cv_.Signal(); 199 } 200 201 while (workers_.size()) { 202 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); 203 // http://crbug.com/240453 - Join() is considered IO and will block this 204 // thread. See also http://crbug.com/239423 for further ideas. 205 base::ThreadRestrictions::ScopedAllowIO allow_io; 206 worker->Join(); 207 } 208 } 209 210 void WorkerPool::Inner::SetTaskGraph(TaskGraph* graph) { 211 // It is OK to call SetTaskGraph() after shutdown if |graph| is empty. 212 DCHECK(graph->empty() || !shutdown_); 213 214 GraphNodeMap new_pending_tasks; 215 GraphNodeMap new_running_tasks; 216 TaskQueue new_ready_to_run_tasks; 217 218 new_pending_tasks.swap(*graph); 219 220 { 221 base::AutoLock lock(lock_); 222 223 // First remove all completed tasks from |new_pending_tasks| and 224 // adjust number of dependencies. 225 for (TaskVector::iterator it = completed_tasks_.begin(); 226 it != completed_tasks_.end(); ++it) { 227 internal::WorkerPoolTask* task = it->get(); 228 229 scoped_ptr<internal::GraphNode> node = new_pending_tasks.take_and_erase( 230 task); 231 if (node) { 232 for (internal::GraphNode::Vector::const_iterator it = 233 node->dependents().begin(); 234 it != node->dependents().end(); ++it) { 235 internal::GraphNode* dependent_node = *it; 236 dependent_node->remove_dependency(); 237 } 238 } 239 } 240 241 // Build new running task set. 242 for (GraphNodeMap::iterator it = running_tasks_.begin(); 243 it != running_tasks_.end(); ++it) { 244 internal::WorkerPoolTask* task = it->first; 245 // Transfer scheduled task value from |new_pending_tasks| to 246 // |new_running_tasks| if currently running. Value must be set to 247 // NULL if |new_pending_tasks| doesn't contain task. This does 248 // the right in both cases. 249 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task)); 250 } 251 252 // Build new "ready to run" tasks queue. 253 // TODO(reveman): Create this queue when building the task graph instead. 254 for (GraphNodeMap::iterator it = new_pending_tasks.begin(); 255 it != new_pending_tasks.end(); ++it) { 256 internal::WorkerPoolTask* task = it->first; 257 DCHECK(task); 258 internal::GraphNode* node = it->second; 259 260 // Completed tasks should not exist in |new_pending_tasks|. 261 DCHECK(!task->HasFinishedRunning()); 262 263 // Call DidSchedule() to indicate that this task has been scheduled. 264 // Note: This is only for debugging purposes. 265 task->DidSchedule(); 266 267 if (!node->num_dependencies()) 268 new_ready_to_run_tasks.push(node); 269 270 // Erase the task from old pending tasks. 271 pending_tasks_.erase(task); 272 } 273 274 completed_tasks_.reserve(completed_tasks_.size() + pending_tasks_.size()); 275 276 // The items left in |pending_tasks_| need to be canceled. 277 for (GraphNodeMap::const_iterator it = pending_tasks_.begin(); 278 it != pending_tasks_.end(); 279 ++it) { 280 completed_tasks_.push_back(it->first); 281 } 282 283 // Swap task sets. 284 // Note: old tasks are intentionally destroyed after releasing |lock_|. 285 pending_tasks_.swap(new_pending_tasks); 286 running_tasks_.swap(new_running_tasks); 287 std::swap(ready_to_run_tasks_, new_ready_to_run_tasks); 288 289 // If |ready_to_run_tasks_| is empty, it means we either have 290 // running tasks, or we have no pending tasks. 291 DCHECK(!ready_to_run_tasks_.empty() || 292 (pending_tasks_.empty() || !running_tasks_.empty())); 293 294 // If there is more work available, wake up worker thread. 295 if (!ready_to_run_tasks_.empty()) 296 has_ready_to_run_tasks_cv_.Signal(); 297 } 298 } 299 300 void WorkerPool::Inner::CollectCompletedTasks(TaskVector* completed_tasks) { 301 base::AutoLock lock(lock_); 302 303 DCHECK_EQ(0u, completed_tasks->size()); 304 completed_tasks->swap(completed_tasks_); 305 } 306 307 void WorkerPool::Inner::Run() { 308 base::AutoLock lock(lock_); 309 310 // Get a unique thread index. 311 int thread_index = next_thread_index_++; 312 313 while (true) { 314 if (ready_to_run_tasks_.empty()) { 315 // Exit when shutdown is set and no more tasks are pending. 316 if (shutdown_ && pending_tasks_.empty()) 317 break; 318 319 // Wait for more tasks. 320 has_ready_to_run_tasks_cv_.Wait(); 321 continue; 322 } 323 324 // Take top priority task from |ready_to_run_tasks_|. 325 scoped_refptr<internal::WorkerPoolTask> task( 326 ready_to_run_tasks_.top()->task()); 327 ready_to_run_tasks_.pop(); 328 329 // Move task from |pending_tasks_| to |running_tasks_|. 330 DCHECK(pending_tasks_.contains(task.get())); 331 DCHECK(!running_tasks_.contains(task.get())); 332 running_tasks_.set(task.get(), pending_tasks_.take_and_erase(task.get())); 333 334 // There may be more work available, so wake up another worker thread. 335 has_ready_to_run_tasks_cv_.Signal(); 336 337 // Call WillRun() before releasing |lock_| and running task. 338 task->WillRun(); 339 340 { 341 base::AutoUnlock unlock(lock_); 342 343 task->RunOnWorkerThread(thread_index); 344 } 345 346 // This will mark task as finished running. 347 task->DidRun(); 348 349 // Now iterate over all dependents to remove dependency and check 350 // if they are ready to run. 351 scoped_ptr<internal::GraphNode> node = running_tasks_.take_and_erase( 352 task.get()); 353 if (node) { 354 for (internal::GraphNode::Vector::const_iterator it = 355 node->dependents().begin(); 356 it != node->dependents().end(); ++it) { 357 internal::GraphNode* dependent_node = *it; 358 359 dependent_node->remove_dependency(); 360 // Task is ready if it has no dependencies. Add it to 361 // |ready_to_run_tasks_|. 362 if (!dependent_node->num_dependencies()) 363 ready_to_run_tasks_.push(dependent_node); 364 } 365 } 366 367 // Finally add task to |completed_tasks_|. 368 completed_tasks_.push_back(task); 369 } 370 371 // We noticed we should exit. Wake up the next worker so it knows it should 372 // exit as well (because the Shutdown() code only signals once). 373 has_ready_to_run_tasks_cv_.Signal(); 374 } 375 376 WorkerPool::WorkerPool(size_t num_threads, 377 const std::string& thread_name_prefix) 378 : in_dispatch_completion_callbacks_(false), 379 inner_(make_scoped_ptr(new Inner(num_threads, thread_name_prefix))) { 380 } 381 382 WorkerPool::~WorkerPool() { 383 } 384 385 void WorkerPool::Shutdown() { 386 TRACE_EVENT0("cc", "WorkerPool::Shutdown"); 387 388 DCHECK(!in_dispatch_completion_callbacks_); 389 390 inner_->Shutdown(); 391 } 392 393 void WorkerPool::CheckForCompletedTasks() { 394 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); 395 396 DCHECK(!in_dispatch_completion_callbacks_); 397 398 TaskVector completed_tasks; 399 inner_->CollectCompletedTasks(&completed_tasks); 400 ProcessCompletedTasks(completed_tasks); 401 } 402 403 void WorkerPool::ProcessCompletedTasks( 404 const TaskVector& completed_tasks) { 405 TRACE_EVENT1("cc", "WorkerPool::ProcessCompletedTasks", 406 "completed_task_count", completed_tasks.size()); 407 408 // Worker pool instance is not reentrant while processing completed tasks. 409 in_dispatch_completion_callbacks_ = true; 410 411 for (TaskVector::const_iterator it = completed_tasks.begin(); 412 it != completed_tasks.end(); 413 ++it) { 414 internal::WorkerPoolTask* task = it->get(); 415 416 task->WillComplete(); 417 task->CompleteOnOriginThread(); 418 task->DidComplete(); 419 } 420 421 in_dispatch_completion_callbacks_ = false; 422 } 423 424 void WorkerPool::SetTaskGraph(TaskGraph* graph) { 425 TRACE_EVENT1("cc", "WorkerPool::SetTaskGraph", 426 "num_tasks", graph->size()); 427 428 DCHECK(!in_dispatch_completion_callbacks_); 429 430 inner_->SetTaskGraph(graph); 431 } 432 433 } // namespace cc 434