1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "cc/resources/task_graph_runner.h" 6 7 #include <algorithm> 8 9 #include "base/debug/trace_event.h" 10 #include "base/strings/stringprintf.h" 11 #include "base/threading/thread_restrictions.h" 12 13 namespace cc { 14 namespace { 15 16 // Helper class for iterating over all dependents of a task. 17 class DependentIterator { 18 public: 19 DependentIterator(TaskGraph* graph, const Task* task) 20 : graph_(graph), 21 task_(task), 22 current_index_(static_cast<size_t>(-1)), 23 current_node_(NULL) { 24 ++(*this); 25 } 26 27 TaskGraph::Node& operator->() const { 28 DCHECK_LT(current_index_, graph_->edges.size()); 29 DCHECK_EQ(graph_->edges[current_index_].task, task_); 30 DCHECK(current_node_); 31 return *current_node_; 32 } 33 34 TaskGraph::Node& operator*() const { 35 DCHECK_LT(current_index_, graph_->edges.size()); 36 DCHECK_EQ(graph_->edges[current_index_].task, task_); 37 DCHECK(current_node_); 38 return *current_node_; 39 } 40 41 // Note: Performance can be improved by keeping edges sorted. 42 DependentIterator& operator++() { 43 // Find next dependency edge for |task_|. 44 do { 45 ++current_index_; 46 if (current_index_ == graph_->edges.size()) 47 return *this; 48 } while (graph_->edges[current_index_].task != task_); 49 50 // Now find the node for the dependent of this edge. 51 TaskGraph::Node::Vector::iterator it = 52 std::find_if(graph_->nodes.begin(), 53 graph_->nodes.end(), 54 TaskGraph::Node::TaskComparator( 55 graph_->edges[current_index_].dependent)); 56 DCHECK(it != graph_->nodes.end()); 57 current_node_ = &(*it); 58 59 return *this; 60 } 61 62 operator bool() const { return current_index_ < graph_->edges.size(); } 63 64 private: 65 TaskGraph* graph_; 66 const Task* task_; 67 size_t current_index_; 68 TaskGraph::Node* current_node_; 69 }; 70 71 class DependencyMismatchComparator { 72 public: 73 explicit DependencyMismatchComparator(const TaskGraph* graph) 74 : graph_(graph) {} 75 76 bool operator()(const TaskGraph::Node& node) const { 77 return static_cast<size_t>(std::count_if(graph_->edges.begin(), 78 graph_->edges.end(), 79 DependentComparator(node.task))) != 80 node.dependencies; 81 } 82 83 private: 84 class DependentComparator { 85 public: 86 explicit DependentComparator(const Task* dependent) 87 : dependent_(dependent) {} 88 89 bool operator()(const TaskGraph::Edge& edge) const { 90 return edge.dependent == dependent_; 91 } 92 93 private: 94 const Task* dependent_; 95 }; 96 97 const TaskGraph* graph_; 98 }; 99 100 } // namespace 101 102 Task::Task() : will_run_(false), did_run_(false) { 103 } 104 105 Task::~Task() { 106 DCHECK(!will_run_); 107 } 108 109 void Task::WillRun() { 110 DCHECK(!will_run_); 111 DCHECK(!did_run_); 112 will_run_ = true; 113 } 114 115 void Task::DidRun() { 116 DCHECK(will_run_); 117 will_run_ = false; 118 did_run_ = true; 119 } 120 121 bool Task::HasFinishedRunning() const { return did_run_; } 122 123 TaskGraph::TaskGraph() {} 124 125 TaskGraph::~TaskGraph() {} 126 127 void TaskGraph::Swap(TaskGraph* other) { 128 nodes.swap(other->nodes); 129 edges.swap(other->edges); 130 } 131 132 void TaskGraph::Reset() { 133 nodes.clear(); 134 edges.clear(); 135 } 136 137 TaskGraphRunner::TaskNamespace::TaskNamespace() {} 138 139 TaskGraphRunner::TaskNamespace::~TaskNamespace() {} 140 141 TaskGraphRunner::TaskGraphRunner() 142 : lock_(), 143 has_ready_to_run_tasks_cv_(&lock_), 144 has_namespaces_with_finished_running_tasks_cv_(&lock_), 145 next_namespace_id_(1), 146 shutdown_(false) {} 147 148 TaskGraphRunner::~TaskGraphRunner() { 149 { 150 base::AutoLock lock(lock_); 151 152 DCHECK_EQ(0u, ready_to_run_namespaces_.size()); 153 DCHECK_EQ(0u, namespaces_.size()); 154 } 155 } 156 157 NamespaceToken TaskGraphRunner::GetNamespaceToken() { 158 base::AutoLock lock(lock_); 159 160 NamespaceToken token(next_namespace_id_++); 161 DCHECK(namespaces_.find(token.id_) == namespaces_.end()); 162 return token; 163 } 164 165 void TaskGraphRunner::ScheduleTasks(NamespaceToken token, TaskGraph* graph) { 166 TRACE_EVENT2("cc", 167 "TaskGraphRunner::ScheduleTasks", 168 "num_nodes", 169 graph->nodes.size(), 170 "num_edges", 171 graph->edges.size()); 172 173 DCHECK(token.IsValid()); 174 DCHECK(std::find_if(graph->nodes.begin(), 175 graph->nodes.end(), 176 DependencyMismatchComparator(graph)) == 177 graph->nodes.end()); 178 179 { 180 base::AutoLock lock(lock_); 181 182 DCHECK(!shutdown_); 183 184 TaskNamespace& task_namespace = namespaces_[token.id_]; 185 186 // First adjust number of dependencies to reflect completed tasks. 187 for (Task::Vector::iterator it = task_namespace.completed_tasks.begin(); 188 it != task_namespace.completed_tasks.end(); 189 ++it) { 190 for (DependentIterator node_it(graph, it->get()); node_it; ++node_it) { 191 TaskGraph::Node& node = *node_it; 192 DCHECK_LT(0u, node.dependencies); 193 node.dependencies--; 194 } 195 } 196 197 // Build new "ready to run" queue and remove nodes from old graph. 198 task_namespace.ready_to_run_tasks.clear(); 199 for (TaskGraph::Node::Vector::iterator it = graph->nodes.begin(); 200 it != graph->nodes.end(); 201 ++it) { 202 TaskGraph::Node& node = *it; 203 204 // Remove any old nodes that are associated with this task. The result is 205 // that the old graph is left with all nodes not present in this graph, 206 // which we use below to determine what tasks need to be canceled. 207 TaskGraph::Node::Vector::iterator old_it = 208 std::find_if(task_namespace.graph.nodes.begin(), 209 task_namespace.graph.nodes.end(), 210 TaskGraph::Node::TaskComparator(node.task)); 211 if (old_it != task_namespace.graph.nodes.end()) { 212 std::swap(*old_it, task_namespace.graph.nodes.back()); 213 task_namespace.graph.nodes.pop_back(); 214 } 215 216 // Task is not ready to run if dependencies are not yet satisfied. 217 if (node.dependencies) 218 continue; 219 220 // Skip if already finished running task. 221 if (node.task->HasFinishedRunning()) 222 continue; 223 224 // Skip if already running. 225 if (std::find(task_namespace.running_tasks.begin(), 226 task_namespace.running_tasks.end(), 227 node.task) != task_namespace.running_tasks.end()) 228 continue; 229 230 task_namespace.ready_to_run_tasks.push_back( 231 PrioritizedTask(node.task, node.priority)); 232 } 233 234 // Rearrange the elements in |ready_to_run_tasks| in such a way that they 235 // form a heap. 236 std::make_heap(task_namespace.ready_to_run_tasks.begin(), 237 task_namespace.ready_to_run_tasks.end(), 238 CompareTaskPriority); 239 240 // Swap task graph. 241 task_namespace.graph.Swap(graph); 242 243 // Determine what tasks in old graph need to be canceled. 244 for (TaskGraph::Node::Vector::iterator it = graph->nodes.begin(); 245 it != graph->nodes.end(); 246 ++it) { 247 TaskGraph::Node& node = *it; 248 249 // Skip if already finished running task. 250 if (node.task->HasFinishedRunning()) 251 continue; 252 253 // Skip if already running. 254 if (std::find(task_namespace.running_tasks.begin(), 255 task_namespace.running_tasks.end(), 256 node.task) != task_namespace.running_tasks.end()) 257 continue; 258 259 DCHECK(std::find(task_namespace.completed_tasks.begin(), 260 task_namespace.completed_tasks.end(), 261 node.task) == task_namespace.completed_tasks.end()); 262 task_namespace.completed_tasks.push_back(node.task); 263 } 264 265 // Build new "ready to run" task namespaces queue. 266 ready_to_run_namespaces_.clear(); 267 for (TaskNamespaceMap::iterator it = namespaces_.begin(); 268 it != namespaces_.end(); 269 ++it) { 270 if (!it->second.ready_to_run_tasks.empty()) 271 ready_to_run_namespaces_.push_back(&it->second); 272 } 273 274 // Rearrange the task namespaces in |ready_to_run_namespaces_| in such a way 275 // that they form a heap. 276 std::make_heap(ready_to_run_namespaces_.begin(), 277 ready_to_run_namespaces_.end(), 278 CompareTaskNamespacePriority); 279 280 // If there is more work available, wake up worker thread. 281 if (!ready_to_run_namespaces_.empty()) 282 has_ready_to_run_tasks_cv_.Signal(); 283 } 284 } 285 286 void TaskGraphRunner::WaitForTasksToFinishRunning(NamespaceToken token) { 287 TRACE_EVENT0("cc", "TaskGraphRunner::WaitForTasksToFinishRunning"); 288 289 DCHECK(token.IsValid()); 290 291 { 292 base::AutoLock lock(lock_); 293 294 TaskNamespaceMap::const_iterator it = namespaces_.find(token.id_); 295 if (it == namespaces_.end()) 296 return; 297 298 const TaskNamespace& task_namespace = it->second; 299 300 while (!HasFinishedRunningTasksInNamespace(&task_namespace)) 301 has_namespaces_with_finished_running_tasks_cv_.Wait(); 302 303 // There may be other namespaces that have finished running tasks, so wake 304 // up another origin thread. 305 has_namespaces_with_finished_running_tasks_cv_.Signal(); 306 } 307 } 308 309 void TaskGraphRunner::CollectCompletedTasks(NamespaceToken token, 310 Task::Vector* completed_tasks) { 311 TRACE_EVENT0("cc", "TaskGraphRunner::CollectCompletedTasks"); 312 313 DCHECK(token.IsValid()); 314 315 { 316 base::AutoLock lock(lock_); 317 318 TaskNamespaceMap::iterator it = namespaces_.find(token.id_); 319 if (it == namespaces_.end()) 320 return; 321 322 TaskNamespace& task_namespace = it->second; 323 324 DCHECK_EQ(0u, completed_tasks->size()); 325 completed_tasks->swap(task_namespace.completed_tasks); 326 if (!HasFinishedRunningTasksInNamespace(&task_namespace)) 327 return; 328 329 // Remove namespace if finished running tasks. 330 DCHECK_EQ(0u, task_namespace.completed_tasks.size()); 331 DCHECK_EQ(0u, task_namespace.ready_to_run_tasks.size()); 332 DCHECK_EQ(0u, task_namespace.running_tasks.size()); 333 namespaces_.erase(it); 334 } 335 } 336 337 void TaskGraphRunner::Shutdown() { 338 base::AutoLock lock(lock_); 339 340 DCHECK_EQ(0u, ready_to_run_namespaces_.size()); 341 DCHECK_EQ(0u, namespaces_.size()); 342 343 DCHECK(!shutdown_); 344 shutdown_ = true; 345 346 // Wake up a worker so it knows it should exit. This will cause all workers 347 // to exit as each will wake up another worker before exiting. 348 has_ready_to_run_tasks_cv_.Signal(); 349 } 350 351 void TaskGraphRunner::Run() { 352 base::AutoLock lock(lock_); 353 354 while (true) { 355 if (ready_to_run_namespaces_.empty()) { 356 // Exit when shutdown is set and no more tasks are pending. 357 if (shutdown_) 358 break; 359 360 // Wait for more tasks. 361 has_ready_to_run_tasks_cv_.Wait(); 362 continue; 363 } 364 365 RunTaskWithLockAcquired(); 366 } 367 368 // We noticed we should exit. Wake up the next worker so it knows it should 369 // exit as well (because the Shutdown() code only signals once). 370 has_ready_to_run_tasks_cv_.Signal(); 371 } 372 373 void TaskGraphRunner::RunUntilIdle() { 374 base::AutoLock lock(lock_); 375 376 while (!ready_to_run_namespaces_.empty()) 377 RunTaskWithLockAcquired(); 378 } 379 380 void TaskGraphRunner::RunTaskWithLockAcquired() { 381 TRACE_EVENT0("toplevel", "TaskGraphRunner::RunTask"); 382 383 lock_.AssertAcquired(); 384 DCHECK(!ready_to_run_namespaces_.empty()); 385 386 // Take top priority TaskNamespace from |ready_to_run_namespaces_|. 387 std::pop_heap(ready_to_run_namespaces_.begin(), 388 ready_to_run_namespaces_.end(), 389 CompareTaskNamespacePriority); 390 TaskNamespace* task_namespace = ready_to_run_namespaces_.back(); 391 ready_to_run_namespaces_.pop_back(); 392 DCHECK(!task_namespace->ready_to_run_tasks.empty()); 393 394 // Take top priority task from |ready_to_run_tasks|. 395 std::pop_heap(task_namespace->ready_to_run_tasks.begin(), 396 task_namespace->ready_to_run_tasks.end(), 397 CompareTaskPriority); 398 scoped_refptr<Task> task(task_namespace->ready_to_run_tasks.back().task); 399 task_namespace->ready_to_run_tasks.pop_back(); 400 401 // Add task namespace back to |ready_to_run_namespaces_| if not empty after 402 // taking top priority task. 403 if (!task_namespace->ready_to_run_tasks.empty()) { 404 ready_to_run_namespaces_.push_back(task_namespace); 405 std::push_heap(ready_to_run_namespaces_.begin(), 406 ready_to_run_namespaces_.end(), 407 CompareTaskNamespacePriority); 408 } 409 410 // Add task to |running_tasks|. 411 task_namespace->running_tasks.push_back(task.get()); 412 413 // There may be more work available, so wake up another worker thread. 414 has_ready_to_run_tasks_cv_.Signal(); 415 416 // Call WillRun() before releasing |lock_| and running task. 417 task->WillRun(); 418 419 { 420 base::AutoUnlock unlock(lock_); 421 422 task->RunOnWorkerThread(); 423 } 424 425 // This will mark task as finished running. 426 task->DidRun(); 427 428 // Remove task from |running_tasks|. 429 TaskVector::iterator it = std::find(task_namespace->running_tasks.begin(), 430 task_namespace->running_tasks.end(), 431 task.get()); 432 DCHECK(it != task_namespace->running_tasks.end()); 433 std::swap(*it, task_namespace->running_tasks.back()); 434 task_namespace->running_tasks.pop_back(); 435 436 // Now iterate over all dependents to decrement dependencies and check if they 437 // are ready to run. 438 bool ready_to_run_namespaces_has_heap_properties = true; 439 for (DependentIterator it(&task_namespace->graph, task.get()); it; ++it) { 440 TaskGraph::Node& dependent_node = *it; 441 442 DCHECK_LT(0u, dependent_node.dependencies); 443 dependent_node.dependencies--; 444 // Task is ready if it has no dependencies. Add it to |ready_to_run_tasks_|. 445 if (!dependent_node.dependencies) { 446 bool was_empty = task_namespace->ready_to_run_tasks.empty(); 447 task_namespace->ready_to_run_tasks.push_back( 448 PrioritizedTask(dependent_node.task, dependent_node.priority)); 449 std::push_heap(task_namespace->ready_to_run_tasks.begin(), 450 task_namespace->ready_to_run_tasks.end(), 451 CompareTaskPriority); 452 // Task namespace is ready if it has at least one ready to run task. Add 453 // it to |ready_to_run_namespaces_| if it just become ready. 454 if (was_empty) { 455 DCHECK(std::find(ready_to_run_namespaces_.begin(), 456 ready_to_run_namespaces_.end(), 457 task_namespace) == ready_to_run_namespaces_.end()); 458 ready_to_run_namespaces_.push_back(task_namespace); 459 } 460 ready_to_run_namespaces_has_heap_properties = false; 461 } 462 } 463 464 // Rearrange the task namespaces in |ready_to_run_namespaces_| in such a way 465 // that they yet again form a heap. 466 if (!ready_to_run_namespaces_has_heap_properties) { 467 std::make_heap(ready_to_run_namespaces_.begin(), 468 ready_to_run_namespaces_.end(), 469 CompareTaskNamespacePriority); 470 } 471 472 // Finally add task to |completed_tasks_|. 473 task_namespace->completed_tasks.push_back(task); 474 475 // If namespace has finished running all tasks, wake up origin thread. 476 if (HasFinishedRunningTasksInNamespace(task_namespace)) 477 has_namespaces_with_finished_running_tasks_cv_.Signal(); 478 } 479 480 } // namespace cc 481