Home | History | Annotate | Download | only in base
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "remoting/base/plugin_thread_task_runner.h"
      6 
      7 #include "base/bind.h"
      8 
      9 namespace {
     10 
     11 base::TimeDelta CalcTimeDelta(base::TimeTicks when) {
     12   return std::max(when - base::TimeTicks::Now(), base::TimeDelta());
     13 }
     14 
     15 }  // namespace
     16 
     17 namespace remoting {
     18 
     19 PluginThreadTaskRunner::Delegate::~Delegate() {
     20 }
     21 
     22 PluginThreadTaskRunner::PluginThreadTaskRunner(Delegate* delegate)
     23     : plugin_thread_id_(base::PlatformThread::CurrentId()),
     24       event_(false, false),
     25       delegate_(delegate),
     26       next_sequence_num_(0),
     27       quit_received_(false),
     28       stopped_(false) {
     29 }
     30 
     31 PluginThreadTaskRunner::~PluginThreadTaskRunner() {
     32   DCHECK(delegate_ == NULL);
     33   DCHECK(stopped_);
     34 }
     35 
     36 void PluginThreadTaskRunner::DetachAndRunShutdownLoop() {
     37   DCHECK(BelongsToCurrentThread());
     38 
     39   // Detach from the plugin thread and redirect all tasks posted after this
     40   // point to the shutdown task loop.
     41   {
     42     base::AutoLock auto_lock(lock_);
     43 
     44     DCHECK(delegate_ != NULL);
     45     DCHECK(!stopped_);
     46 
     47     delegate_ = NULL;
     48     stopped_ = quit_received_;
     49   }
     50 
     51   // When DetachAndRunShutdownLoop() is called from NPP_Destroy() all scheduled
     52   // timers are cancelled. It is OK to clear |scheduled_timers_| even if
     53   // the timers weren't actually cancelled (i.e. DetachAndRunShutdownLoop() is
     54   // called before NPP_Destroy()).
     55   scheduled_timers_.clear();
     56 
     57   // Run all tasks that are due.
     58   ProcessIncomingTasks();
     59   RunDueTasks(base::TimeTicks::Now());
     60 
     61   while (!stopped_) {
     62     if (delayed_queue_.empty()) {
     63       event_.Wait();
     64     } else {
     65       event_.TimedWait(CalcTimeDelta(delayed_queue_.top().delayed_run_time));
     66     }
     67 
     68     // Run all tasks that are due.
     69     ProcessIncomingTasks();
     70     RunDueTasks(base::TimeTicks::Now());
     71 
     72     base::AutoLock auto_lock(lock_);
     73     stopped_ = quit_received_;
     74   }
     75 }
     76 
     77 void PluginThreadTaskRunner::Quit() {
     78   base::AutoLock auto_lock(lock_);
     79 
     80   if (!quit_received_) {
     81     quit_received_ = true;
     82     event_.Signal();
     83   }
     84 }
     85 
     86 bool PluginThreadTaskRunner::PostDelayedTask(
     87     const tracked_objects::Location& from_here,
     88     const base::Closure& task,
     89     base::TimeDelta delay) {
     90 
     91   // Wrap the task into |base::PendingTask|.
     92   base::TimeTicks delayed_run_time;
     93   if (delay > base::TimeDelta()) {
     94     delayed_run_time = base::TimeTicks::Now() + delay;
     95   } else {
     96     DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative";
     97   }
     98 
     99   base::PendingTask pending_task(from_here, task, delayed_run_time, false);
    100 
    101   // Push the task to the incoming queue.
    102   base::AutoLock locked(lock_);
    103 
    104   // Initialize the sequence number. The sequence number provides FIFO ordering
    105   // for tasks with the same |delayed_run_time|.
    106   pending_task.sequence_num = next_sequence_num_++;
    107 
    108   // Post an asynchronous call on the plugin thread to process the task.
    109   if (incoming_queue_.empty()) {
    110     PostRunTasks();
    111   }
    112 
    113   incoming_queue_.push(pending_task);
    114   pending_task.task.Reset();
    115 
    116   // No tasks should be posted after Quit() has been called.
    117   DCHECK(!quit_received_);
    118   return true;
    119 }
    120 
    121 bool PluginThreadTaskRunner::PostNonNestableDelayedTask(
    122     const tracked_objects::Location& from_here,
    123     const base::Closure& task,
    124     base::TimeDelta delay) {
    125   // All tasks running on this task loop are non-nestable.
    126   return PostDelayedTask(from_here, task, delay);
    127 }
    128 
    129 bool PluginThreadTaskRunner::RunsTasksOnCurrentThread() const {
    130   // In pepper plugins ideally we should use pp::Core::IsMainThread,
    131   // but it is problematic because we would need to keep reference to
    132   // Core somewhere, e.g. make the delegate ref-counted.
    133   return base::PlatformThread::CurrentId() == plugin_thread_id_;
    134 }
    135 
    136 void PluginThreadTaskRunner::PostRunTasks() {
    137   // Post tasks to the plugin thread when it is availabe or spin the shutdown
    138   // task loop.
    139   if (delegate_ != NULL) {
    140     base::Closure closure = base::Bind(&PluginThreadTaskRunner::RunTasks, this);
    141     delegate_->RunOnPluginThread(
    142         base::TimeDelta(),
    143         &PluginThreadTaskRunner::TaskSpringboard,
    144         new base::Closure(closure));
    145   } else {
    146     event_.Signal();
    147   }
    148 }
    149 
    150 void PluginThreadTaskRunner::PostDelayedRunTasks(base::TimeTicks when) {
    151   DCHECK(BelongsToCurrentThread());
    152 
    153   // |delegate_| is updated from the plugin thread only, so it is safe to access
    154   // it here without taking the lock.
    155   if (delegate_ != NULL) {
    156     // Schedule RunDelayedTasks() to be called at |when| if it hasn't been
    157     // scheduled already.
    158     if (scheduled_timers_.insert(when).second) {
    159       base::TimeDelta delay = CalcTimeDelta(when);
    160       base::Closure closure =
    161           base::Bind(&PluginThreadTaskRunner::RunDelayedTasks, this, when);
    162       delegate_->RunOnPluginThread(
    163           delay,
    164           &PluginThreadTaskRunner::TaskSpringboard,
    165           new base::Closure(closure));
    166     }
    167   } else {
    168     // Spin the shutdown loop if the task runner has already been detached.
    169     // The shutdown loop will pick the tasks to run itself.
    170     event_.Signal();
    171   }
    172 }
    173 
    174 void PluginThreadTaskRunner::ProcessIncomingTasks() {
    175   DCHECK(BelongsToCurrentThread());
    176 
    177   // Grab all unsorted tasks accomulated so far.
    178   base::TaskQueue work_queue;
    179   {
    180     base::AutoLock locked(lock_);
    181     incoming_queue_.Swap(&work_queue);
    182   }
    183 
    184   while (!work_queue.empty()) {
    185     base::PendingTask pending_task = work_queue.front();
    186     work_queue.pop();
    187 
    188     if (pending_task.delayed_run_time.is_null()) {
    189       pending_task.task.Run();
    190     } else {
    191       delayed_queue_.push(pending_task);
    192     }
    193   }
    194 }
    195 
    196 void PluginThreadTaskRunner::RunDelayedTasks(base::TimeTicks when) {
    197   DCHECK(BelongsToCurrentThread());
    198 
    199   scheduled_timers_.erase(when);
    200 
    201   // |stopped_| is updated by the plugin thread only, so it is safe to access
    202   // it here without taking the lock.
    203   if (!stopped_) {
    204     ProcessIncomingTasks();
    205     RunDueTasks(base::TimeTicks::Now());
    206   }
    207 }
    208 
    209 void PluginThreadTaskRunner::RunDueTasks(base::TimeTicks now) {
    210   DCHECK(BelongsToCurrentThread());
    211 
    212   // Run all due tasks.
    213   while (!delayed_queue_.empty() &&
    214          delayed_queue_.top().delayed_run_time <= now) {
    215     delayed_queue_.top().task.Run();
    216     delayed_queue_.pop();
    217   }
    218 
    219   // Post a delayed asynchronous call to the plugin thread to process tasks from
    220   // the delayed queue.
    221   if (!delayed_queue_.empty()) {
    222     base::TimeTicks when = delayed_queue_.top().delayed_run_time;
    223     if (scheduled_timers_.empty() || when < *scheduled_timers_.begin()) {
    224       PostDelayedRunTasks(when);
    225     }
    226   }
    227 }
    228 
    229 void PluginThreadTaskRunner::RunTasks() {
    230   DCHECK(BelongsToCurrentThread());
    231 
    232   // |stopped_| is updated by the plugin thread only, so it is safe to access
    233   // it here without taking the lock.
    234   if (!stopped_) {
    235     ProcessIncomingTasks();
    236     RunDueTasks(base::TimeTicks::Now());
    237   }
    238 }
    239 
    240 // static
    241 void PluginThreadTaskRunner::TaskSpringboard(void* data) {
    242   base::Closure* task = reinterpret_cast<base::Closure*>(data);
    243   task->Run();
    244   delete task;
    245 }
    246 
    247 }  // namespace remoting
    248