1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h" 6 7 #include "base/bind.h" 8 #include "base/location.h" 9 #include "base/memory/scoped_ptr.h" 10 #include "base/sequenced_task_runner.h" 11 #include "chrome/browser/sync_file_system/drive_backend/sync_task.h" 12 #include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h" 13 #include "chrome/browser/sync_file_system/sync_file_metadata.h" 14 15 using storage::FileSystemURL; 16 17 namespace sync_file_system { 18 namespace drive_backend { 19 20 namespace { 21 22 class SyncTaskAdapter : public ExclusiveTask { 23 public: 24 explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {} 25 virtual ~SyncTaskAdapter() {} 26 27 virtual void RunExclusive(const SyncStatusCallback& callback) OVERRIDE { 28 task_.Run(callback); 29 } 30 31 private: 32 SyncTaskManager::Task task_; 33 34 DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter); 35 }; 36 37 } // namespace 38 39 SyncTaskManager::PendingTask::PendingTask() {} 40 41 SyncTaskManager::PendingTask::PendingTask( 42 const base::Closure& task, Priority pri, int seq) 43 : task(task), priority(pri), seq(seq) {} 44 45 SyncTaskManager::PendingTask::~PendingTask() {} 46 47 bool SyncTaskManager::PendingTaskComparator::operator()( 48 const PendingTask& left, 49 const PendingTask& right) const { 50 if (left.priority != right.priority) 51 return left.priority < right.priority; 52 return left.seq > right.seq; 53 } 54 55 SyncTaskManager::SyncTaskManager( 56 base::WeakPtr<Client> client, 57 size_t maximum_background_task, 58 const scoped_refptr<base::SequencedTaskRunner>& task_runner) 59 : client_(client), 60 maximum_background_task_(maximum_background_task), 61 pending_task_seq_(0), 62 task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID), 63 task_runner_(task_runner) { 64 } 65 66 SyncTaskManager::~SyncTaskManager() { 67 client_.reset(); 68 token_.reset(); 69 } 70 71 void SyncTaskManager::Initialize(SyncStatusCode status) { 72 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 73 DCHECK(!token_); 74 NotifyTaskDone( 75 SyncTaskToken::CreateForForegroundTask(AsWeakPtr(), task_runner_.get()), 76 status); 77 } 78 79 void SyncTaskManager::ScheduleTask( 80 const tracked_objects::Location& from_here, 81 const Task& task, 82 Priority priority, 83 const SyncStatusCallback& callback) { 84 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 85 86 ScheduleSyncTask(from_here, 87 scoped_ptr<SyncTask>(new SyncTaskAdapter(task)), 88 priority, 89 callback); 90 } 91 92 void SyncTaskManager::ScheduleSyncTask( 93 const tracked_objects::Location& from_here, 94 scoped_ptr<SyncTask> task, 95 Priority priority, 96 const SyncStatusCallback& callback) { 97 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 98 99 scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback)); 100 if (!token) { 101 PushPendingTask( 102 base::Bind(&SyncTaskManager::ScheduleSyncTask, AsWeakPtr(), from_here, 103 base::Passed(&task), priority, callback), 104 priority); 105 return; 106 } 107 RunTask(token.Pass(), task.Pass()); 108 } 109 110 bool SyncTaskManager::ScheduleTaskIfIdle( 111 const tracked_objects::Location& from_here, 112 const Task& task, 113 const SyncStatusCallback& callback) { 114 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 115 116 return ScheduleSyncTaskIfIdle( 117 from_here, 118 scoped_ptr<SyncTask>(new SyncTaskAdapter(task)), 119 callback); 120 } 121 122 bool SyncTaskManager::ScheduleSyncTaskIfIdle( 123 const tracked_objects::Location& from_here, 124 scoped_ptr<SyncTask> task, 125 const SyncStatusCallback& callback) { 126 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 127 128 scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback)); 129 if (!token) 130 return false; 131 RunTask(token.Pass(), task.Pass()); 132 return true; 133 } 134 135 // static 136 void SyncTaskManager::NotifyTaskDone(scoped_ptr<SyncTaskToken> token, 137 SyncStatusCode status) { 138 DCHECK(token); 139 140 SyncTaskManager* manager = token->manager(); 141 if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) { 142 DCHECK(!manager); 143 SyncStatusCallback callback = token->callback(); 144 token->clear_callback(); 145 callback.Run(status); 146 return; 147 } 148 149 if (manager) 150 manager->NotifyTaskDoneBody(token.Pass(), status); 151 } 152 153 // static 154 void SyncTaskManager::UpdateTaskBlocker( 155 scoped_ptr<SyncTaskToken> current_task_token, 156 scoped_ptr<TaskBlocker> task_blocker, 157 const Continuation& continuation) { 158 DCHECK(current_task_token); 159 160 SyncTaskManager* manager = current_task_token->manager(); 161 if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) { 162 DCHECK(!manager); 163 continuation.Run(current_task_token.Pass()); 164 return; 165 } 166 167 if (!manager) 168 return; 169 170 scoped_ptr<SyncTaskToken> foreground_task_token; 171 scoped_ptr<SyncTaskToken> background_task_token; 172 scoped_ptr<TaskLogger::TaskLog> task_log = current_task_token->PassTaskLog(); 173 if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID) 174 foreground_task_token = current_task_token.Pass(); 175 else 176 background_task_token = current_task_token.Pass(); 177 178 manager->UpdateTaskBlockerBody(foreground_task_token.Pass(), 179 background_task_token.Pass(), 180 task_log.Pass(), 181 task_blocker.Pass(), 182 continuation); 183 } 184 185 bool SyncTaskManager::IsRunningTask(int64 token_id) const { 186 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 187 188 // If the client is gone, all task should be aborted. 189 if (!client_) 190 return false; 191 192 if (token_id == SyncTaskToken::kForegroundTaskTokenID) 193 return true; 194 195 return ContainsKey(running_background_tasks_, token_id); 196 } 197 198 void SyncTaskManager::DetachFromSequence() { 199 sequence_checker_.DetachFromSequence(); 200 } 201 202 void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token, 203 SyncStatusCode status) { 204 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 205 DCHECK(token); 206 207 DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status 208 << " (" << SyncStatusCodeToString(status) << ")" 209 << " " << token->location().ToString(); 210 211 if (token->task_blocker()) { 212 dependency_manager_.Erase(token->task_blocker()); 213 token->clear_task_blocker(); 214 } 215 216 if (client_) { 217 if (token->has_task_log()) { 218 token->FinalizeTaskLog(SyncStatusCodeToString(status)); 219 client_->RecordTaskLog(token->PassTaskLog()); 220 } 221 } 222 223 scoped_ptr<SyncTask> task; 224 SyncStatusCallback callback = token->callback(); 225 token->clear_callback(); 226 if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) { 227 token_ = token.Pass(); 228 task = running_foreground_task_.Pass(); 229 } else { 230 task = running_background_tasks_.take_and_erase(token->token_id()); 231 } 232 233 // Acquire the token to prevent a new task to jump into the queue. 234 token = token_.Pass(); 235 236 bool task_used_network = false; 237 if (task) 238 task_used_network = task->used_network(); 239 240 if (client_) 241 client_->NotifyLastOperationStatus(status, task_used_network); 242 243 if (!callback.is_null()) 244 callback.Run(status); 245 246 // Post MaybeStartNextForegroundTask rather than calling it directly to avoid 247 // making the call-chaing longer. 248 task_runner_->PostTask( 249 FROM_HERE, 250 base::Bind(&SyncTaskManager::MaybeStartNextForegroundTask, 251 AsWeakPtr(), base::Passed(&token))); 252 } 253 254 void SyncTaskManager::UpdateTaskBlockerBody( 255 scoped_ptr<SyncTaskToken> foreground_task_token, 256 scoped_ptr<SyncTaskToken> background_task_token, 257 scoped_ptr<TaskLogger::TaskLog> task_log, 258 scoped_ptr<TaskBlocker> task_blocker, 259 const Continuation& continuation) { 260 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 261 262 // Run the task directly if the parallelization is disabled. 263 if (!maximum_background_task_) { 264 DCHECK(foreground_task_token); 265 DCHECK(!background_task_token); 266 foreground_task_token->SetTaskLog(task_log.Pass()); 267 continuation.Run(foreground_task_token.Pass()); 268 return; 269 } 270 271 // Clear existing |task_blocker| from |dependency_manager_| before 272 // getting |foreground_task_token|, so that we can avoid dead lock. 273 if (background_task_token && background_task_token->task_blocker()) { 274 dependency_manager_.Erase(background_task_token->task_blocker()); 275 background_task_token->clear_task_blocker(); 276 } 277 278 // Try to get |foreground_task_token|. If it's not available, wait for 279 // current foreground task to finish. 280 if (!foreground_task_token) { 281 DCHECK(background_task_token); 282 foreground_task_token = GetToken(background_task_token->location(), 283 SyncStatusCallback()); 284 if (!foreground_task_token) { 285 PushPendingTask( 286 base::Bind(&SyncTaskManager::UpdateTaskBlockerBody, 287 AsWeakPtr(), 288 base::Passed(&foreground_task_token), 289 base::Passed(&background_task_token), 290 base::Passed(&task_log), 291 base::Passed(&task_blocker), 292 continuation), 293 PRIORITY_HIGH); 294 MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>()); 295 return; 296 } 297 } 298 299 // Check if the task can run as a background task now. 300 // If there are too many task running or any other task blocks current 301 // task, wait for any other task to finish. 302 bool task_number_limit_exceeded = 303 !background_task_token && 304 running_background_tasks_.size() >= maximum_background_task_; 305 if (task_number_limit_exceeded || 306 !dependency_manager_.Insert(task_blocker.get())) { 307 DCHECK(!running_background_tasks_.empty()); 308 DCHECK(pending_backgrounding_task_.is_null()); 309 310 // Wait for NotifyTaskDone to release a |task_blocker|. 311 pending_backgrounding_task_ = 312 base::Bind(&SyncTaskManager::UpdateTaskBlockerBody, 313 AsWeakPtr(), 314 base::Passed(&foreground_task_token), 315 base::Passed(&background_task_token), 316 base::Passed(&task_log), 317 base::Passed(&task_blocker), 318 continuation); 319 return; 320 } 321 322 if (background_task_token) { 323 background_task_token->set_task_blocker(task_blocker.Pass()); 324 } else { 325 tracked_objects::Location from_here = foreground_task_token->location(); 326 SyncStatusCallback callback = foreground_task_token->callback(); 327 foreground_task_token->clear_callback(); 328 329 background_task_token = 330 SyncTaskToken::CreateForBackgroundTask(AsWeakPtr(), 331 task_runner_.get(), 332 task_token_seq_++, 333 task_blocker.Pass()); 334 background_task_token->UpdateTask(from_here, callback); 335 running_background_tasks_.set(background_task_token->token_id(), 336 running_foreground_task_.Pass()); 337 } 338 339 token_ = foreground_task_token.Pass(); 340 MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>()); 341 background_task_token->SetTaskLog(task_log.Pass()); 342 continuation.Run(background_task_token.Pass()); 343 } 344 345 scoped_ptr<SyncTaskToken> SyncTaskManager::GetToken( 346 const tracked_objects::Location& from_here, 347 const SyncStatusCallback& callback) { 348 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 349 350 if (!token_) 351 return scoped_ptr<SyncTaskToken>(); 352 token_->UpdateTask(from_here, callback); 353 return token_.Pass(); 354 } 355 356 void SyncTaskManager::PushPendingTask( 357 const base::Closure& closure, Priority priority) { 358 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 359 360 pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++)); 361 } 362 363 void SyncTaskManager::RunTask(scoped_ptr<SyncTaskToken> token, 364 scoped_ptr<SyncTask> task) { 365 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 366 DCHECK(!running_foreground_task_); 367 368 running_foreground_task_ = task.Pass(); 369 running_foreground_task_->RunPreflight(token.Pass()); 370 } 371 372 void SyncTaskManager::MaybeStartNextForegroundTask( 373 scoped_ptr<SyncTaskToken> token) { 374 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 375 376 if (token) { 377 DCHECK(!token_); 378 token_ = token.Pass(); 379 } 380 381 if (!pending_backgrounding_task_.is_null()) { 382 base::Closure closure = pending_backgrounding_task_; 383 pending_backgrounding_task_.Reset(); 384 closure.Run(); 385 return; 386 } 387 388 if (!token_) 389 return; 390 391 if (!pending_tasks_.empty()) { 392 base::Closure closure = pending_tasks_.top().task; 393 pending_tasks_.pop(); 394 closure.Run(); 395 return; 396 } 397 398 if (client_) 399 client_->MaybeScheduleNextTask(); 400 } 401 402 } // namespace drive_backend 403 } // namespace sync_file_system 404