1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "remoting/base/plugin_thread_task_runner.h" 6 7 #include "base/bind.h" 8 9 namespace { 10 11 base::TimeDelta CalcTimeDelta(base::TimeTicks when) { 12 return std::max(when - base::TimeTicks::Now(), base::TimeDelta()); 13 } 14 15 } // namespace 16 17 namespace remoting { 18 19 PluginThreadTaskRunner::Delegate::~Delegate() { 20 } 21 22 PluginThreadTaskRunner::PluginThreadTaskRunner(Delegate* delegate) 23 : plugin_thread_id_(base::PlatformThread::CurrentId()), 24 event_(false, false), 25 delegate_(delegate), 26 next_sequence_num_(0), 27 quit_received_(false), 28 stopped_(false) { 29 } 30 31 PluginThreadTaskRunner::~PluginThreadTaskRunner() { 32 DCHECK(delegate_ == NULL); 33 DCHECK(stopped_); 34 } 35 36 void PluginThreadTaskRunner::DetachAndRunShutdownLoop() { 37 DCHECK(BelongsToCurrentThread()); 38 39 // Detach from the plugin thread and redirect all tasks posted after this 40 // point to the shutdown task loop. 41 { 42 base::AutoLock auto_lock(lock_); 43 44 DCHECK(delegate_ != NULL); 45 DCHECK(!stopped_); 46 47 delegate_ = NULL; 48 stopped_ = quit_received_; 49 } 50 51 // When DetachAndRunShutdownLoop() is called from NPP_Destroy() all scheduled 52 // timers are cancelled. It is OK to clear |scheduled_timers_| even if 53 // the timers weren't actually cancelled (i.e. DetachAndRunShutdownLoop() is 54 // called before NPP_Destroy()). 55 scheduled_timers_.clear(); 56 57 // Run all tasks that are due. 58 ProcessIncomingTasks(); 59 RunDueTasks(base::TimeTicks::Now()); 60 61 while (!stopped_) { 62 if (delayed_queue_.empty()) { 63 event_.Wait(); 64 } else { 65 event_.TimedWait(CalcTimeDelta(delayed_queue_.top().delayed_run_time)); 66 } 67 68 // Run all tasks that are due. 69 ProcessIncomingTasks(); 70 RunDueTasks(base::TimeTicks::Now()); 71 72 base::AutoLock auto_lock(lock_); 73 stopped_ = quit_received_; 74 } 75 } 76 77 void PluginThreadTaskRunner::Quit() { 78 base::AutoLock auto_lock(lock_); 79 80 if (!quit_received_) { 81 quit_received_ = true; 82 event_.Signal(); 83 } 84 } 85 86 bool PluginThreadTaskRunner::PostDelayedTask( 87 const tracked_objects::Location& from_here, 88 const base::Closure& task, 89 base::TimeDelta delay) { 90 91 // Wrap the task into |base::PendingTask|. 92 base::TimeTicks delayed_run_time; 93 if (delay > base::TimeDelta()) { 94 delayed_run_time = base::TimeTicks::Now() + delay; 95 } else { 96 DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative"; 97 } 98 99 base::PendingTask pending_task(from_here, task, delayed_run_time, false); 100 101 // Push the task to the incoming queue. 102 base::AutoLock locked(lock_); 103 104 // Initialize the sequence number. The sequence number provides FIFO ordering 105 // for tasks with the same |delayed_run_time|. 106 pending_task.sequence_num = next_sequence_num_++; 107 108 // Post an asynchronous call on the plugin thread to process the task. 109 if (incoming_queue_.empty()) { 110 PostRunTasks(); 111 } 112 113 incoming_queue_.push(pending_task); 114 pending_task.task.Reset(); 115 116 // No tasks should be posted after Quit() has been called. 117 DCHECK(!quit_received_); 118 return true; 119 } 120 121 bool PluginThreadTaskRunner::PostNonNestableDelayedTask( 122 const tracked_objects::Location& from_here, 123 const base::Closure& task, 124 base::TimeDelta delay) { 125 // All tasks running on this task loop are non-nestable. 126 return PostDelayedTask(from_here, task, delay); 127 } 128 129 bool PluginThreadTaskRunner::RunsTasksOnCurrentThread() const { 130 // In pepper plugins ideally we should use pp::Core::IsMainThread, 131 // but it is problematic because we would need to keep reference to 132 // Core somewhere, e.g. make the delegate ref-counted. 133 return base::PlatformThread::CurrentId() == plugin_thread_id_; 134 } 135 136 void PluginThreadTaskRunner::PostRunTasks() { 137 // Post tasks to the plugin thread when it is availabe or spin the shutdown 138 // task loop. 139 if (delegate_ != NULL) { 140 base::Closure closure = base::Bind(&PluginThreadTaskRunner::RunTasks, this); 141 delegate_->RunOnPluginThread( 142 base::TimeDelta(), 143 &PluginThreadTaskRunner::TaskSpringboard, 144 new base::Closure(closure)); 145 } else { 146 event_.Signal(); 147 } 148 } 149 150 void PluginThreadTaskRunner::PostDelayedRunTasks(base::TimeTicks when) { 151 DCHECK(BelongsToCurrentThread()); 152 153 // |delegate_| is updated from the plugin thread only, so it is safe to access 154 // it here without taking the lock. 155 if (delegate_ != NULL) { 156 // Schedule RunDelayedTasks() to be called at |when| if it hasn't been 157 // scheduled already. 158 if (scheduled_timers_.insert(when).second) { 159 base::TimeDelta delay = CalcTimeDelta(when); 160 base::Closure closure = 161 base::Bind(&PluginThreadTaskRunner::RunDelayedTasks, this, when); 162 delegate_->RunOnPluginThread( 163 delay, 164 &PluginThreadTaskRunner::TaskSpringboard, 165 new base::Closure(closure)); 166 } 167 } else { 168 // Spin the shutdown loop if the task runner has already been detached. 169 // The shutdown loop will pick the tasks to run itself. 170 event_.Signal(); 171 } 172 } 173 174 void PluginThreadTaskRunner::ProcessIncomingTasks() { 175 DCHECK(BelongsToCurrentThread()); 176 177 // Grab all unsorted tasks accomulated so far. 178 base::TaskQueue work_queue; 179 { 180 base::AutoLock locked(lock_); 181 incoming_queue_.Swap(&work_queue); 182 } 183 184 while (!work_queue.empty()) { 185 base::PendingTask pending_task = work_queue.front(); 186 work_queue.pop(); 187 188 if (pending_task.delayed_run_time.is_null()) { 189 pending_task.task.Run(); 190 } else { 191 delayed_queue_.push(pending_task); 192 } 193 } 194 } 195 196 void PluginThreadTaskRunner::RunDelayedTasks(base::TimeTicks when) { 197 DCHECK(BelongsToCurrentThread()); 198 199 scheduled_timers_.erase(when); 200 201 // |stopped_| is updated by the plugin thread only, so it is safe to access 202 // it here without taking the lock. 203 if (!stopped_) { 204 ProcessIncomingTasks(); 205 RunDueTasks(base::TimeTicks::Now()); 206 } 207 } 208 209 void PluginThreadTaskRunner::RunDueTasks(base::TimeTicks now) { 210 DCHECK(BelongsToCurrentThread()); 211 212 // Run all due tasks. 213 while (!delayed_queue_.empty() && 214 delayed_queue_.top().delayed_run_time <= now) { 215 delayed_queue_.top().task.Run(); 216 delayed_queue_.pop(); 217 } 218 219 // Post a delayed asynchronous call to the plugin thread to process tasks from 220 // the delayed queue. 221 if (!delayed_queue_.empty()) { 222 base::TimeTicks when = delayed_queue_.top().delayed_run_time; 223 if (scheduled_timers_.empty() || when < *scheduled_timers_.begin()) { 224 PostDelayedRunTasks(when); 225 } 226 } 227 } 228 229 void PluginThreadTaskRunner::RunTasks() { 230 DCHECK(BelongsToCurrentThread()); 231 232 // |stopped_| is updated by the plugin thread only, so it is safe to access 233 // it here without taking the lock. 234 if (!stopped_) { 235 ProcessIncomingTasks(); 236 RunDueTasks(base::TimeTicks::Now()); 237 } 238 } 239 240 // static 241 void PluginThreadTaskRunner::TaskSpringboard(void* data) { 242 base::Closure* task = reinterpret_cast<base::Closure*>(data); 243 task->Run(); 244 delete task; 245 } 246 247 } // namespace remoting 248