Home | History | Annotate | Download | only in drive_backend
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/location.h"
      9 #include "base/memory/scoped_ptr.h"
     10 #include "base/sequenced_task_runner.h"
     11 #include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
     12 #include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h"
     13 #include "chrome/browser/sync_file_system/sync_file_metadata.h"
     14 
     15 using storage::FileSystemURL;
     16 
     17 namespace sync_file_system {
     18 namespace drive_backend {
     19 
     20 namespace {
     21 
     22 class SyncTaskAdapter : public ExclusiveTask {
     23  public:
     24   explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {}
     25   virtual ~SyncTaskAdapter() {}
     26 
     27   virtual void RunExclusive(const SyncStatusCallback& callback) OVERRIDE {
     28     task_.Run(callback);
     29   }
     30 
     31  private:
     32   SyncTaskManager::Task task_;
     33 
     34   DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter);
     35 };
     36 
     37 }  // namespace
     38 
     39 SyncTaskManager::PendingTask::PendingTask() {}
     40 
     41 SyncTaskManager::PendingTask::PendingTask(
     42     const base::Closure& task, Priority pri, int seq)
     43     : task(task), priority(pri), seq(seq) {}
     44 
     45 SyncTaskManager::PendingTask::~PendingTask() {}
     46 
     47 bool SyncTaskManager::PendingTaskComparator::operator()(
     48     const PendingTask& left,
     49     const PendingTask& right) const {
     50   if (left.priority != right.priority)
     51     return left.priority < right.priority;
     52   return left.seq > right.seq;
     53 }
     54 
     55 SyncTaskManager::SyncTaskManager(
     56     base::WeakPtr<Client> client,
     57     size_t maximum_background_task,
     58     const scoped_refptr<base::SequencedTaskRunner>& task_runner)
     59     : client_(client),
     60       maximum_background_task_(maximum_background_task),
     61       pending_task_seq_(0),
     62       task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID),
     63       task_runner_(task_runner) {
     64 }
     65 
     66 SyncTaskManager::~SyncTaskManager() {
     67   client_.reset();
     68   token_.reset();
     69 }
     70 
     71 void SyncTaskManager::Initialize(SyncStatusCode status) {
     72   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
     73   DCHECK(!token_);
     74   NotifyTaskDone(
     75       SyncTaskToken::CreateForForegroundTask(AsWeakPtr(), task_runner_.get()),
     76       status);
     77 }
     78 
     79 void SyncTaskManager::ScheduleTask(
     80     const tracked_objects::Location& from_here,
     81     const Task& task,
     82     Priority priority,
     83     const SyncStatusCallback& callback) {
     84   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
     85 
     86   ScheduleSyncTask(from_here,
     87                    scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
     88                    priority,
     89                    callback);
     90 }
     91 
     92 void SyncTaskManager::ScheduleSyncTask(
     93     const tracked_objects::Location& from_here,
     94     scoped_ptr<SyncTask> task,
     95     Priority priority,
     96     const SyncStatusCallback& callback) {
     97   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
     98 
     99   scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
    100   if (!token) {
    101     PushPendingTask(
    102         base::Bind(&SyncTaskManager::ScheduleSyncTask, AsWeakPtr(), from_here,
    103                    base::Passed(&task), priority, callback),
    104         priority);
    105     return;
    106   }
    107   RunTask(token.Pass(), task.Pass());
    108 }
    109 
    110 bool SyncTaskManager::ScheduleTaskIfIdle(
    111         const tracked_objects::Location& from_here,
    112         const Task& task,
    113         const SyncStatusCallback& callback) {
    114   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    115 
    116   return ScheduleSyncTaskIfIdle(
    117       from_here,
    118       scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
    119       callback);
    120 }
    121 
    122 bool SyncTaskManager::ScheduleSyncTaskIfIdle(
    123     const tracked_objects::Location& from_here,
    124     scoped_ptr<SyncTask> task,
    125     const SyncStatusCallback& callback) {
    126   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    127 
    128   scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
    129   if (!token)
    130     return false;
    131   RunTask(token.Pass(), task.Pass());
    132   return true;
    133 }
    134 
    135 // static
    136 void SyncTaskManager::NotifyTaskDone(scoped_ptr<SyncTaskToken> token,
    137                                      SyncStatusCode status) {
    138   DCHECK(token);
    139 
    140   SyncTaskManager* manager = token->manager();
    141   if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
    142     DCHECK(!manager);
    143     SyncStatusCallback callback = token->callback();
    144     token->clear_callback();
    145     callback.Run(status);
    146     return;
    147   }
    148 
    149   if (manager)
    150     manager->NotifyTaskDoneBody(token.Pass(), status);
    151 }
    152 
    153 // static
    154 void SyncTaskManager::UpdateTaskBlocker(
    155     scoped_ptr<SyncTaskToken> current_task_token,
    156     scoped_ptr<TaskBlocker> task_blocker,
    157     const Continuation& continuation) {
    158   DCHECK(current_task_token);
    159 
    160   SyncTaskManager* manager = current_task_token->manager();
    161   if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
    162     DCHECK(!manager);
    163     continuation.Run(current_task_token.Pass());
    164     return;
    165   }
    166 
    167   if (!manager)
    168     return;
    169 
    170   scoped_ptr<SyncTaskToken> foreground_task_token;
    171   scoped_ptr<SyncTaskToken> background_task_token;
    172   scoped_ptr<TaskLogger::TaskLog> task_log = current_task_token->PassTaskLog();
    173   if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID)
    174     foreground_task_token = current_task_token.Pass();
    175   else
    176     background_task_token = current_task_token.Pass();
    177 
    178   manager->UpdateTaskBlockerBody(foreground_task_token.Pass(),
    179                                  background_task_token.Pass(),
    180                                  task_log.Pass(),
    181                                  task_blocker.Pass(),
    182                                  continuation);
    183 }
    184 
    185 bool SyncTaskManager::IsRunningTask(int64 token_id) const {
    186   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    187 
    188   // If the client is gone, all task should be aborted.
    189   if (!client_)
    190     return false;
    191 
    192   if (token_id == SyncTaskToken::kForegroundTaskTokenID)
    193     return true;
    194 
    195   return ContainsKey(running_background_tasks_, token_id);
    196 }
    197 
    198 void SyncTaskManager::DetachFromSequence() {
    199   sequence_checker_.DetachFromSequence();
    200 }
    201 
    202 void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token,
    203                                          SyncStatusCode status) {
    204   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    205   DCHECK(token);
    206 
    207   DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status
    208            << " (" << SyncStatusCodeToString(status) << ")"
    209            << " " << token->location().ToString();
    210 
    211   if (token->task_blocker()) {
    212     dependency_manager_.Erase(token->task_blocker());
    213     token->clear_task_blocker();
    214   }
    215 
    216   if (client_) {
    217     if (token->has_task_log()) {
    218       token->FinalizeTaskLog(SyncStatusCodeToString(status));
    219       client_->RecordTaskLog(token->PassTaskLog());
    220     }
    221   }
    222 
    223   scoped_ptr<SyncTask> task;
    224   SyncStatusCallback callback = token->callback();
    225   token->clear_callback();
    226   if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) {
    227     token_ = token.Pass();
    228     task = running_foreground_task_.Pass();
    229   } else {
    230     task = running_background_tasks_.take_and_erase(token->token_id());
    231   }
    232 
    233   // Acquire the token to prevent a new task to jump into the queue.
    234   token = token_.Pass();
    235 
    236   bool task_used_network = false;
    237   if (task)
    238     task_used_network = task->used_network();
    239 
    240   if (client_)
    241     client_->NotifyLastOperationStatus(status, task_used_network);
    242 
    243   if (!callback.is_null())
    244     callback.Run(status);
    245 
    246   // Post MaybeStartNextForegroundTask rather than calling it directly to avoid
    247   // making the call-chaing longer.
    248   task_runner_->PostTask(
    249       FROM_HERE,
    250       base::Bind(&SyncTaskManager::MaybeStartNextForegroundTask,
    251                  AsWeakPtr(), base::Passed(&token)));
    252 }
    253 
    254 void SyncTaskManager::UpdateTaskBlockerBody(
    255     scoped_ptr<SyncTaskToken> foreground_task_token,
    256     scoped_ptr<SyncTaskToken> background_task_token,
    257     scoped_ptr<TaskLogger::TaskLog> task_log,
    258     scoped_ptr<TaskBlocker> task_blocker,
    259     const Continuation& continuation) {
    260   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    261 
    262   // Run the task directly if the parallelization is disabled.
    263   if (!maximum_background_task_) {
    264     DCHECK(foreground_task_token);
    265     DCHECK(!background_task_token);
    266     foreground_task_token->SetTaskLog(task_log.Pass());
    267     continuation.Run(foreground_task_token.Pass());
    268     return;
    269   }
    270 
    271   // Clear existing |task_blocker| from |dependency_manager_| before
    272   // getting |foreground_task_token|, so that we can avoid dead lock.
    273   if (background_task_token && background_task_token->task_blocker()) {
    274     dependency_manager_.Erase(background_task_token->task_blocker());
    275     background_task_token->clear_task_blocker();
    276   }
    277 
    278   // Try to get |foreground_task_token|.  If it's not available, wait for
    279   // current foreground task to finish.
    280   if (!foreground_task_token) {
    281     DCHECK(background_task_token);
    282     foreground_task_token = GetToken(background_task_token->location(),
    283                                      SyncStatusCallback());
    284     if (!foreground_task_token) {
    285       PushPendingTask(
    286           base::Bind(&SyncTaskManager::UpdateTaskBlockerBody,
    287                      AsWeakPtr(),
    288                      base::Passed(&foreground_task_token),
    289                      base::Passed(&background_task_token),
    290                      base::Passed(&task_log),
    291                      base::Passed(&task_blocker),
    292                      continuation),
    293           PRIORITY_HIGH);
    294       MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>());
    295       return;
    296     }
    297   }
    298 
    299   // Check if the task can run as a background task now.
    300   // If there are too many task running or any other task blocks current
    301   // task, wait for any other task to finish.
    302   bool task_number_limit_exceeded =
    303       !background_task_token &&
    304       running_background_tasks_.size() >= maximum_background_task_;
    305   if (task_number_limit_exceeded ||
    306       !dependency_manager_.Insert(task_blocker.get())) {
    307     DCHECK(!running_background_tasks_.empty());
    308     DCHECK(pending_backgrounding_task_.is_null());
    309 
    310     // Wait for NotifyTaskDone to release a |task_blocker|.
    311     pending_backgrounding_task_ =
    312         base::Bind(&SyncTaskManager::UpdateTaskBlockerBody,
    313                    AsWeakPtr(),
    314                    base::Passed(&foreground_task_token),
    315                    base::Passed(&background_task_token),
    316                    base::Passed(&task_log),
    317                    base::Passed(&task_blocker),
    318                    continuation);
    319     return;
    320   }
    321 
    322   if (background_task_token) {
    323     background_task_token->set_task_blocker(task_blocker.Pass());
    324   } else {
    325     tracked_objects::Location from_here = foreground_task_token->location();
    326     SyncStatusCallback callback = foreground_task_token->callback();
    327     foreground_task_token->clear_callback();
    328 
    329     background_task_token =
    330         SyncTaskToken::CreateForBackgroundTask(AsWeakPtr(),
    331                                                task_runner_.get(),
    332                                                task_token_seq_++,
    333                                                task_blocker.Pass());
    334     background_task_token->UpdateTask(from_here, callback);
    335     running_background_tasks_.set(background_task_token->token_id(),
    336                                   running_foreground_task_.Pass());
    337   }
    338 
    339   token_ = foreground_task_token.Pass();
    340   MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>());
    341   background_task_token->SetTaskLog(task_log.Pass());
    342   continuation.Run(background_task_token.Pass());
    343 }
    344 
    345 scoped_ptr<SyncTaskToken> SyncTaskManager::GetToken(
    346     const tracked_objects::Location& from_here,
    347     const SyncStatusCallback& callback) {
    348   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    349 
    350   if (!token_)
    351     return scoped_ptr<SyncTaskToken>();
    352   token_->UpdateTask(from_here, callback);
    353   return token_.Pass();
    354 }
    355 
    356 void SyncTaskManager::PushPendingTask(
    357     const base::Closure& closure, Priority priority) {
    358   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    359 
    360   pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++));
    361 }
    362 
    363 void SyncTaskManager::RunTask(scoped_ptr<SyncTaskToken> token,
    364                               scoped_ptr<SyncTask> task) {
    365   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    366   DCHECK(!running_foreground_task_);
    367 
    368   running_foreground_task_ = task.Pass();
    369   running_foreground_task_->RunPreflight(token.Pass());
    370 }
    371 
    372 void SyncTaskManager::MaybeStartNextForegroundTask(
    373     scoped_ptr<SyncTaskToken> token) {
    374   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    375 
    376   if (token) {
    377     DCHECK(!token_);
    378     token_ = token.Pass();
    379   }
    380 
    381   if (!pending_backgrounding_task_.is_null()) {
    382     base::Closure closure = pending_backgrounding_task_;
    383     pending_backgrounding_task_.Reset();
    384     closure.Run();
    385     return;
    386   }
    387 
    388   if (!token_)
    389     return;
    390 
    391   if (!pending_tasks_.empty()) {
    392     base::Closure closure = pending_tasks_.top().task;
    393     pending_tasks_.pop();
    394     closure.Run();
    395     return;
    396   }
    397 
    398   if (client_)
    399     client_->MaybeScheduleNextTask();
    400 }
    401 
    402 }  // namespace drive_backend
    403 }  // namespace sync_file_system
    404