Home | History | Annotate | Download | only in resources
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "cc/resources/worker_pool.h"
      6 
      7 #include <algorithm>
      8 #include <queue>
      9 
     10 #include "base/bind.h"
     11 #include "base/containers/hash_tables.h"
     12 #include "base/debug/trace_event.h"
     13 #include "base/strings/stringprintf.h"
     14 #include "base/synchronization/condition_variable.h"
     15 #include "base/threading/simple_thread.h"
     16 #include "base/threading/thread_restrictions.h"
     17 #include "cc/base/scoped_ptr_deque.h"
     18 
     19 namespace cc {
     20 
     21 namespace internal {
     22 
     23 WorkerPoolTask::WorkerPoolTask()
     24     : did_schedule_(false),
     25       did_run_(false),
     26       did_complete_(false) {
     27 }
     28 
     29 WorkerPoolTask::~WorkerPoolTask() {
     30   DCHECK_EQ(did_schedule_, did_complete_);
     31   DCHECK(!did_run_ || did_schedule_);
     32   DCHECK(!did_run_ || did_complete_);
     33 }
     34 
     35 void WorkerPoolTask::DidSchedule() {
     36   DCHECK(!did_complete_);
     37   did_schedule_ = true;
     38 }
     39 
     40 void WorkerPoolTask::WillRun() {
     41   DCHECK(did_schedule_);
     42   DCHECK(!did_complete_);
     43   DCHECK(!did_run_);
     44 }
     45 
     46 void WorkerPoolTask::DidRun() {
     47   did_run_ = true;
     48 }
     49 
     50 void WorkerPoolTask::WillComplete() {
     51   DCHECK(!did_complete_);
     52 }
     53 
     54 void WorkerPoolTask::DidComplete() {
     55   DCHECK(did_schedule_);
     56   DCHECK(!did_complete_);
     57   did_complete_ = true;
     58 }
     59 
     60 bool WorkerPoolTask::HasFinishedRunning() const {
     61   return did_run_;
     62 }
     63 
     64 bool WorkerPoolTask::HasCompleted() const {
     65   return did_complete_;
     66 }
     67 
     68 GraphNode::GraphNode(internal::WorkerPoolTask* task, unsigned priority)
     69     : task_(task),
     70       priority_(priority),
     71       num_dependencies_(0) {
     72 }
     73 
     74 GraphNode::~GraphNode() {
     75 }
     76 
     77 }  // namespace internal
     78 
     79 // Internal to the worker pool. Any data or logic that needs to be
     80 // shared between threads lives in this class. All members are guarded
     81 // by |lock_|.
     82 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
     83  public:
     84   Inner(size_t num_threads, const std::string& thread_name_prefix);
     85   virtual ~Inner();
     86 
     87   void Shutdown();
     88 
     89   // Schedule running of tasks in |graph|. Tasks previously scheduled but
     90   // no longer needed will be canceled unless already running. Canceled
     91   // tasks are moved to |completed_tasks_| without being run. The result
     92   // is that once scheduled, a task is guaranteed to end up in the
     93   // |completed_tasks_| queue even if they later get canceled by another
     94   // call to SetTaskGraph().
     95   void SetTaskGraph(TaskGraph* graph);
     96 
     97   // Collect all completed tasks in |completed_tasks|.
     98   void CollectCompletedTasks(TaskVector* completed_tasks);
     99 
    100  private:
    101   class PriorityComparator {
    102    public:
    103     bool operator()(const internal::GraphNode* a,
    104                     const internal::GraphNode* b) {
    105       // In this system, numerically lower priority is run first.
    106       if (a->priority() != b->priority())
    107         return a->priority() > b->priority();
    108 
    109       // Run task with most dependents first when priority is the same.
    110       return a->dependents().size() < b->dependents().size();
    111     }
    112   };
    113 
    114   // Overridden from base::DelegateSimpleThread:
    115   virtual void Run() OVERRIDE;
    116 
    117   // This lock protects all members of this class except
    118   // |worker_pool_on_origin_thread_|. Do not read or modify anything
    119   // without holding this lock. Do not block while holding this lock.
    120   mutable base::Lock lock_;
    121 
    122   // Condition variable that is waited on by worker threads until new
    123   // tasks are ready to run or shutdown starts.
    124   base::ConditionVariable has_ready_to_run_tasks_cv_;
    125 
    126   // Provides each running thread loop with a unique index. First thread
    127   // loop index is 0.
    128   unsigned next_thread_index_;
    129 
    130   // Set during shutdown. Tells workers to exit when no more tasks
    131   // are pending.
    132   bool shutdown_;
    133 
    134   // This set contains all pending tasks.
    135   GraphNodeMap pending_tasks_;
    136 
    137   // Ordered set of tasks that are ready to run.
    138   typedef std::priority_queue<internal::GraphNode*,
    139                               std::vector<internal::GraphNode*>,
    140                               PriorityComparator> TaskQueue;
    141   TaskQueue ready_to_run_tasks_;
    142 
    143   // This set contains all currently running tasks.
    144   GraphNodeMap running_tasks_;
    145 
    146   // Completed tasks not yet collected by origin thread.
    147   TaskVector completed_tasks_;
    148 
    149   ScopedPtrDeque<base::DelegateSimpleThread> workers_;
    150 
    151   DISALLOW_COPY_AND_ASSIGN(Inner);
    152 };
    153 
    154 WorkerPool::Inner::Inner(
    155     size_t num_threads, const std::string& thread_name_prefix)
    156     : lock_(),
    157       has_ready_to_run_tasks_cv_(&lock_),
    158       next_thread_index_(0),
    159       shutdown_(false) {
    160   base::AutoLock lock(lock_);
    161 
    162   while (workers_.size() < num_threads) {
    163     scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
    164         new base::DelegateSimpleThread(
    165             this,
    166             thread_name_prefix +
    167             base::StringPrintf(
    168                 "Worker%u",
    169                 static_cast<unsigned>(workers_.size() + 1)).c_str()));
    170     worker->Start();
    171 #if defined(OS_ANDROID) || defined(OS_LINUX)
    172     worker->SetThreadPriority(base::kThreadPriority_Background);
    173 #endif
    174     workers_.push_back(worker.Pass());
    175   }
    176 }
    177 
    178 WorkerPool::Inner::~Inner() {
    179   base::AutoLock lock(lock_);
    180 
    181   DCHECK(shutdown_);
    182 
    183   DCHECK_EQ(0u, pending_tasks_.size());
    184   DCHECK_EQ(0u, ready_to_run_tasks_.size());
    185   DCHECK_EQ(0u, running_tasks_.size());
    186   DCHECK_EQ(0u, completed_tasks_.size());
    187 }
    188 
    189 void WorkerPool::Inner::Shutdown() {
    190   {
    191     base::AutoLock lock(lock_);
    192 
    193     DCHECK(!shutdown_);
    194     shutdown_ = true;
    195 
    196     // Wake up a worker so it knows it should exit. This will cause all workers
    197     // to exit as each will wake up another worker before exiting.
    198     has_ready_to_run_tasks_cv_.Signal();
    199   }
    200 
    201   while (workers_.size()) {
    202     scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
    203     // http://crbug.com/240453 - Join() is considered IO and will block this
    204     // thread. See also http://crbug.com/239423 for further ideas.
    205     base::ThreadRestrictions::ScopedAllowIO allow_io;
    206     worker->Join();
    207   }
    208 }
    209 
    210 void WorkerPool::Inner::SetTaskGraph(TaskGraph* graph) {
    211   // It is OK to call SetTaskGraph() after shutdown if |graph| is empty.
    212   DCHECK(graph->empty() || !shutdown_);
    213 
    214   GraphNodeMap new_pending_tasks;
    215   GraphNodeMap new_running_tasks;
    216   TaskQueue new_ready_to_run_tasks;
    217 
    218   new_pending_tasks.swap(*graph);
    219 
    220   {
    221     base::AutoLock lock(lock_);
    222 
    223     // First remove all completed tasks from |new_pending_tasks| and
    224     // adjust number of dependencies.
    225     for (TaskVector::iterator it = completed_tasks_.begin();
    226          it != completed_tasks_.end(); ++it) {
    227       internal::WorkerPoolTask* task = it->get();
    228 
    229       scoped_ptr<internal::GraphNode> node = new_pending_tasks.take_and_erase(
    230           task);
    231       if (node) {
    232         for (internal::GraphNode::Vector::const_iterator it =
    233                  node->dependents().begin();
    234              it != node->dependents().end(); ++it) {
    235           internal::GraphNode* dependent_node = *it;
    236           dependent_node->remove_dependency();
    237         }
    238       }
    239     }
    240 
    241     // Build new running task set.
    242     for (GraphNodeMap::iterator it = running_tasks_.begin();
    243          it != running_tasks_.end(); ++it) {
    244       internal::WorkerPoolTask* task = it->first;
    245       // Transfer scheduled task value from |new_pending_tasks| to
    246       // |new_running_tasks| if currently running. Value must be set to
    247       // NULL if |new_pending_tasks| doesn't contain task. This does
    248       // the right in both cases.
    249       new_running_tasks.set(task, new_pending_tasks.take_and_erase(task));
    250     }
    251 
    252     // Build new "ready to run" tasks queue.
    253     // TODO(reveman): Create this queue when building the task graph instead.
    254     for (GraphNodeMap::iterator it = new_pending_tasks.begin();
    255          it != new_pending_tasks.end(); ++it) {
    256       internal::WorkerPoolTask* task = it->first;
    257       DCHECK(task);
    258       internal::GraphNode* node = it->second;
    259 
    260       // Completed tasks should not exist in |new_pending_tasks|.
    261       DCHECK(!task->HasFinishedRunning());
    262 
    263       // Call DidSchedule() to indicate that this task has been scheduled.
    264       // Note: This is only for debugging purposes.
    265       task->DidSchedule();
    266 
    267       if (!node->num_dependencies())
    268         new_ready_to_run_tasks.push(node);
    269 
    270       // Erase the task from old pending tasks.
    271       pending_tasks_.erase(task);
    272     }
    273 
    274     completed_tasks_.reserve(completed_tasks_.size() + pending_tasks_.size());
    275 
    276     // The items left in |pending_tasks_| need to be canceled.
    277     for (GraphNodeMap::const_iterator it = pending_tasks_.begin();
    278          it != pending_tasks_.end();
    279          ++it) {
    280       completed_tasks_.push_back(it->first);
    281     }
    282 
    283     // Swap task sets.
    284     // Note: old tasks are intentionally destroyed after releasing |lock_|.
    285     pending_tasks_.swap(new_pending_tasks);
    286     running_tasks_.swap(new_running_tasks);
    287     std::swap(ready_to_run_tasks_, new_ready_to_run_tasks);
    288 
    289     // If |ready_to_run_tasks_| is empty, it means we either have
    290     // running tasks, or we have no pending tasks.
    291     DCHECK(!ready_to_run_tasks_.empty() ||
    292            (pending_tasks_.empty() || !running_tasks_.empty()));
    293 
    294     // If there is more work available, wake up worker thread.
    295     if (!ready_to_run_tasks_.empty())
    296       has_ready_to_run_tasks_cv_.Signal();
    297   }
    298 }
    299 
    300 void WorkerPool::Inner::CollectCompletedTasks(TaskVector* completed_tasks) {
    301   base::AutoLock lock(lock_);
    302 
    303   DCHECK_EQ(0u, completed_tasks->size());
    304   completed_tasks->swap(completed_tasks_);
    305 }
    306 
    307 void WorkerPool::Inner::Run() {
    308   base::AutoLock lock(lock_);
    309 
    310   // Get a unique thread index.
    311   int thread_index = next_thread_index_++;
    312 
    313   while (true) {
    314     if (ready_to_run_tasks_.empty()) {
    315       // Exit when shutdown is set and no more tasks are pending.
    316       if (shutdown_ && pending_tasks_.empty())
    317         break;
    318 
    319       // Wait for more tasks.
    320       has_ready_to_run_tasks_cv_.Wait();
    321       continue;
    322     }
    323 
    324     // Take top priority task from |ready_to_run_tasks_|.
    325     scoped_refptr<internal::WorkerPoolTask> task(
    326         ready_to_run_tasks_.top()->task());
    327     ready_to_run_tasks_.pop();
    328 
    329     // Move task from |pending_tasks_| to |running_tasks_|.
    330     DCHECK(pending_tasks_.contains(task.get()));
    331     DCHECK(!running_tasks_.contains(task.get()));
    332     running_tasks_.set(task.get(), pending_tasks_.take_and_erase(task.get()));
    333 
    334     // There may be more work available, so wake up another worker thread.
    335     has_ready_to_run_tasks_cv_.Signal();
    336 
    337     // Call WillRun() before releasing |lock_| and running task.
    338     task->WillRun();
    339 
    340     {
    341       base::AutoUnlock unlock(lock_);
    342 
    343       task->RunOnWorkerThread(thread_index);
    344     }
    345 
    346     // This will mark task as finished running.
    347     task->DidRun();
    348 
    349     // Now iterate over all dependents to remove dependency and check
    350     // if they are ready to run.
    351     scoped_ptr<internal::GraphNode> node = running_tasks_.take_and_erase(
    352         task.get());
    353     if (node) {
    354       for (internal::GraphNode::Vector::const_iterator it =
    355                node->dependents().begin();
    356            it != node->dependents().end(); ++it) {
    357         internal::GraphNode* dependent_node = *it;
    358 
    359         dependent_node->remove_dependency();
    360         // Task is ready if it has no dependencies. Add it to
    361         // |ready_to_run_tasks_|.
    362         if (!dependent_node->num_dependencies())
    363           ready_to_run_tasks_.push(dependent_node);
    364       }
    365     }
    366 
    367     // Finally add task to |completed_tasks_|.
    368     completed_tasks_.push_back(task);
    369   }
    370 
    371   // We noticed we should exit. Wake up the next worker so it knows it should
    372   // exit as well (because the Shutdown() code only signals once).
    373   has_ready_to_run_tasks_cv_.Signal();
    374 }
    375 
    376 WorkerPool::WorkerPool(size_t num_threads,
    377                        const std::string& thread_name_prefix)
    378     : in_dispatch_completion_callbacks_(false),
    379       inner_(make_scoped_ptr(new Inner(num_threads, thread_name_prefix))) {
    380 }
    381 
    382 WorkerPool::~WorkerPool() {
    383 }
    384 
    385 void WorkerPool::Shutdown() {
    386   TRACE_EVENT0("cc", "WorkerPool::Shutdown");
    387 
    388   DCHECK(!in_dispatch_completion_callbacks_);
    389 
    390   inner_->Shutdown();
    391 }
    392 
    393 void WorkerPool::CheckForCompletedTasks() {
    394   TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
    395 
    396   DCHECK(!in_dispatch_completion_callbacks_);
    397 
    398   TaskVector completed_tasks;
    399   inner_->CollectCompletedTasks(&completed_tasks);
    400   ProcessCompletedTasks(completed_tasks);
    401 }
    402 
    403 void WorkerPool::ProcessCompletedTasks(
    404     const TaskVector& completed_tasks) {
    405   TRACE_EVENT1("cc", "WorkerPool::ProcessCompletedTasks",
    406                "completed_task_count", completed_tasks.size());
    407 
    408   // Worker pool instance is not reentrant while processing completed tasks.
    409   in_dispatch_completion_callbacks_ = true;
    410 
    411   for (TaskVector::const_iterator it = completed_tasks.begin();
    412        it != completed_tasks.end();
    413        ++it) {
    414     internal::WorkerPoolTask* task = it->get();
    415 
    416     task->WillComplete();
    417     task->CompleteOnOriginThread();
    418     task->DidComplete();
    419   }
    420 
    421   in_dispatch_completion_callbacks_ = false;
    422 }
    423 
    424 void WorkerPool::SetTaskGraph(TaskGraph* graph) {
    425   TRACE_EVENT1("cc", "WorkerPool::SetTaskGraph",
    426                "num_tasks", graph->size());
    427 
    428   DCHECK(!in_dispatch_completion_callbacks_);
    429 
    430   inner_->SetTaskGraph(graph);
    431 }
    432 
    433 }  // namespace cc
    434