Home | History | Annotate | Download | only in drive_backend
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/location.h"
      9 #include "base/memory/scoped_ptr.h"
     10 #include "base/sequenced_task_runner.h"
     11 #include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
     12 #include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h"
     13 #include "chrome/browser/sync_file_system/sync_file_metadata.h"
     14 
     15 using fileapi::FileSystemURL;
     16 
     17 namespace sync_file_system {
     18 namespace drive_backend {
     19 
     20 namespace {
     21 
     22 class SyncTaskAdapter : public ExclusiveTask {
     23  public:
     24   explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {}
     25   virtual ~SyncTaskAdapter() {}
     26 
     27   virtual void RunExclusive(const SyncStatusCallback& callback) OVERRIDE {
     28     task_.Run(callback);
     29   }
     30 
     31  private:
     32   SyncTaskManager::Task task_;
     33 
     34   DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter);
     35 };
     36 
     37 }  // namespace
     38 
     39 SyncTaskManager::PendingTask::PendingTask() {}
     40 
     41 SyncTaskManager::PendingTask::PendingTask(
     42     const base::Closure& task, Priority pri, int seq)
     43     : task(task), priority(pri), seq(seq) {}
     44 
     45 SyncTaskManager::PendingTask::~PendingTask() {}
     46 
     47 bool SyncTaskManager::PendingTaskComparator::operator()(
     48     const PendingTask& left,
     49     const PendingTask& right) const {
     50   if (left.priority != right.priority)
     51     return left.priority < right.priority;
     52   return left.seq > right.seq;
     53 }
     54 
     55 SyncTaskManager::SyncTaskManager(
     56     base::WeakPtr<Client> client,
     57     size_t maximum_background_task,
     58     base::SequencedTaskRunner* task_runner)
     59     : client_(client),
     60       maximum_background_task_(maximum_background_task),
     61       pending_task_seq_(0),
     62       task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID),
     63       task_runner_(task_runner) {
     64 }
     65 
     66 SyncTaskManager::~SyncTaskManager() {
     67   client_.reset();
     68   token_.reset();
     69 }
     70 
     71 void SyncTaskManager::Initialize(SyncStatusCode status) {
     72   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
     73   DCHECK(!token_);
     74   NotifyTaskDone(SyncTaskToken::CreateForForegroundTask(AsWeakPtr()),
     75                  status);
     76 }
     77 
     78 void SyncTaskManager::ScheduleTask(
     79     const tracked_objects::Location& from_here,
     80     const Task& task,
     81     Priority priority,
     82     const SyncStatusCallback& callback) {
     83   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
     84 
     85   ScheduleSyncTask(from_here,
     86                    scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
     87                    priority,
     88                    callback);
     89 }
     90 
     91 void SyncTaskManager::ScheduleSyncTask(
     92     const tracked_objects::Location& from_here,
     93     scoped_ptr<SyncTask> task,
     94     Priority priority,
     95     const SyncStatusCallback& callback) {
     96   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
     97 
     98   scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
     99   if (!token) {
    100     PushPendingTask(
    101         base::Bind(&SyncTaskManager::ScheduleSyncTask, AsWeakPtr(), from_here,
    102                    base::Passed(&task), priority, callback),
    103         priority);
    104     return;
    105   }
    106   RunTask(token.Pass(), task.Pass());
    107 }
    108 
    109 bool SyncTaskManager::ScheduleTaskIfIdle(
    110         const tracked_objects::Location& from_here,
    111         const Task& task,
    112         const SyncStatusCallback& callback) {
    113   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    114 
    115   return ScheduleSyncTaskIfIdle(
    116       from_here,
    117       scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
    118       callback);
    119 }
    120 
    121 bool SyncTaskManager::ScheduleSyncTaskIfIdle(
    122     const tracked_objects::Location& from_here,
    123     scoped_ptr<SyncTask> task,
    124     const SyncStatusCallback& callback) {
    125   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    126 
    127   scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
    128   if (!token)
    129     return false;
    130   RunTask(token.Pass(), task.Pass());
    131   return true;
    132 }
    133 
    134 // static
    135 void SyncTaskManager::NotifyTaskDone(scoped_ptr<SyncTaskToken> token,
    136                                      SyncStatusCode status) {
    137   DCHECK(token);
    138 
    139   SyncTaskManager* manager = token->manager();
    140   if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
    141     DCHECK(!manager);
    142     SyncStatusCallback callback = token->callback();
    143     token->clear_callback();
    144     callback.Run(status);
    145     return;
    146   }
    147 
    148   if (manager)
    149     manager->NotifyTaskDoneBody(token.Pass(), status);
    150 }
    151 
    152 // static
    153 void SyncTaskManager::UpdateBlockingFactor(
    154     scoped_ptr<SyncTaskToken> current_task_token,
    155     scoped_ptr<BlockingFactor> blocking_factor,
    156     const Continuation& continuation) {
    157   DCHECK(current_task_token);
    158 
    159   SyncTaskManager* manager = current_task_token->manager();
    160   if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
    161     DCHECK(!manager);
    162     continuation.Run(current_task_token.Pass());
    163     return;
    164   }
    165 
    166   if (!manager)
    167     return;
    168 
    169   scoped_ptr<SyncTaskToken> foreground_task_token;
    170   scoped_ptr<SyncTaskToken> background_task_token;
    171   scoped_ptr<TaskLogger::TaskLog> task_log = current_task_token->PassTaskLog();
    172   if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID)
    173     foreground_task_token = current_task_token.Pass();
    174   else
    175     background_task_token = current_task_token.Pass();
    176 
    177   manager->UpdateBlockingFactorBody(foreground_task_token.Pass(),
    178                                     background_task_token.Pass(),
    179                                     task_log.Pass(),
    180                                     blocking_factor.Pass(),
    181                                     continuation);
    182 }
    183 
    184 bool SyncTaskManager::IsRunningTask(int64 token_id) const {
    185   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    186 
    187   // If the client is gone, all task should be aborted.
    188   if (!client_)
    189     return false;
    190 
    191   if (token_id == SyncTaskToken::kForegroundTaskTokenID)
    192     return true;
    193 
    194   return ContainsKey(running_background_tasks_, token_id);
    195 }
    196 
    197 void SyncTaskManager::DetachFromSequence() {
    198   sequence_checker_.DetachFromSequence();
    199 }
    200 
    201 void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token,
    202                                          SyncStatusCode status) {
    203   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    204   DCHECK(token);
    205 
    206   DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status
    207            << " (" << SyncStatusCodeToString(status) << ")"
    208            << " " << token_->location().ToString();
    209 
    210   if (token->blocking_factor()) {
    211     dependency_manager_.Erase(token->blocking_factor());
    212     token->clear_blocking_factor();
    213   }
    214 
    215   if (client_) {
    216     if (token->has_task_log()) {
    217       token->FinalizeTaskLog(SyncStatusCodeToString(status));
    218       client_->RecordTaskLog(token->PassTaskLog());
    219     }
    220   }
    221 
    222   scoped_ptr<SyncTask> task;
    223   SyncStatusCallback callback = token->callback();
    224   token->clear_callback();
    225   if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) {
    226     token_ = token.Pass();
    227     task = running_foreground_task_.Pass();
    228   } else {
    229     task = running_background_tasks_.take_and_erase(token->token_id());
    230   }
    231 
    232   // Acquire the token to prevent a new task to jump into the queue.
    233   token = token_.Pass();
    234 
    235   bool task_used_network = false;
    236   if (task)
    237     task_used_network = task->used_network();
    238 
    239   if (client_)
    240     client_->NotifyLastOperationStatus(status, task_used_network);
    241 
    242   if (!callback.is_null())
    243     callback.Run(status);
    244 
    245   // Post MaybeStartNextForegroundTask rather than calling it directly to avoid
    246   // making the call-chaing longer.
    247   task_runner_->PostTask(
    248       FROM_HERE,
    249       base::Bind(&SyncTaskManager::MaybeStartNextForegroundTask,
    250                  AsWeakPtr(), base::Passed(&token)));
    251 }
    252 
    253 void SyncTaskManager::UpdateBlockingFactorBody(
    254     scoped_ptr<SyncTaskToken> foreground_task_token,
    255     scoped_ptr<SyncTaskToken> background_task_token,
    256     scoped_ptr<TaskLogger::TaskLog> task_log,
    257     scoped_ptr<BlockingFactor> blocking_factor,
    258     const Continuation& continuation) {
    259   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    260 
    261   // Run the task directly if the parallelization is disabled.
    262   if (!maximum_background_task_) {
    263     DCHECK(foreground_task_token);
    264     DCHECK(!background_task_token);
    265     foreground_task_token->SetTaskLog(task_log.Pass());
    266     continuation.Run(foreground_task_token.Pass());
    267     return;
    268   }
    269 
    270   // Clear existing |blocking_factor| from |dependency_manager_| before
    271   // getting |foreground_task_token|, so that we can avoid dead lock.
    272   if (background_task_token && background_task_token->blocking_factor()) {
    273     dependency_manager_.Erase(background_task_token->blocking_factor());
    274     background_task_token->clear_blocking_factor();
    275   }
    276 
    277   // Try to get |foreground_task_token|.  If it's not available, wait for
    278   // current foreground task to finish.
    279   if (!foreground_task_token) {
    280     DCHECK(background_task_token);
    281     foreground_task_token = GetToken(background_task_token->location(),
    282                                      SyncStatusCallback());
    283     if (!foreground_task_token) {
    284       PushPendingTask(
    285           base::Bind(&SyncTaskManager::UpdateBlockingFactorBody,
    286                      AsWeakPtr(),
    287                      base::Passed(&foreground_task_token),
    288                      base::Passed(&background_task_token),
    289                      base::Passed(&task_log),
    290                      base::Passed(&blocking_factor),
    291                      continuation),
    292           PRIORITY_HIGH);
    293       MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>());
    294       return;
    295     }
    296   }
    297 
    298   // Check if the task can run as a background task now.
    299   // If there are too many task running or any other task blocks current
    300   // task, wait for any other task to finish.
    301   bool task_number_limit_exceeded =
    302       !background_task_token &&
    303       running_background_tasks_.size() >= maximum_background_task_;
    304   if (task_number_limit_exceeded ||
    305       !dependency_manager_.Insert(blocking_factor.get())) {
    306     DCHECK(!running_background_tasks_.empty());
    307     DCHECK(pending_backgrounding_task_.is_null());
    308 
    309     // Wait for NotifyTaskDone to release a |blocking_factor|.
    310     pending_backgrounding_task_ =
    311         base::Bind(&SyncTaskManager::UpdateBlockingFactorBody,
    312                    AsWeakPtr(),
    313                    base::Passed(&foreground_task_token),
    314                    base::Passed(&background_task_token),
    315                    base::Passed(&task_log),
    316                    base::Passed(&blocking_factor),
    317                    continuation);
    318     return;
    319   }
    320 
    321   if (background_task_token) {
    322     background_task_token->set_blocking_factor(blocking_factor.Pass());
    323   } else {
    324     tracked_objects::Location from_here = foreground_task_token->location();
    325     SyncStatusCallback callback = foreground_task_token->callback();
    326     foreground_task_token->clear_callback();
    327 
    328     background_task_token =
    329         SyncTaskToken::CreateForBackgroundTask(
    330             AsWeakPtr(),
    331             task_token_seq_++,
    332             blocking_factor.Pass());
    333     background_task_token->UpdateTask(from_here, callback);
    334     running_background_tasks_.set(background_task_token->token_id(),
    335                                   running_foreground_task_.Pass());
    336   }
    337 
    338   token_ = foreground_task_token.Pass();
    339   MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>());
    340   background_task_token->SetTaskLog(task_log.Pass());
    341   continuation.Run(background_task_token.Pass());
    342 }
    343 
    344 scoped_ptr<SyncTaskToken> SyncTaskManager::GetToken(
    345     const tracked_objects::Location& from_here,
    346     const SyncStatusCallback& callback) {
    347   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    348 
    349   if (!token_)
    350     return scoped_ptr<SyncTaskToken>();
    351   token_->UpdateTask(from_here, callback);
    352   return token_.Pass();
    353 }
    354 
    355 void SyncTaskManager::PushPendingTask(
    356     const base::Closure& closure, Priority priority) {
    357   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    358 
    359   pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++));
    360 }
    361 
    362 void SyncTaskManager::RunTask(scoped_ptr<SyncTaskToken> token,
    363                               scoped_ptr<SyncTask> task) {
    364   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    365   DCHECK(!running_foreground_task_);
    366 
    367   running_foreground_task_ = task.Pass();
    368   running_foreground_task_->RunPreflight(token.Pass());
    369 }
    370 
    371 void SyncTaskManager::MaybeStartNextForegroundTask(
    372     scoped_ptr<SyncTaskToken> token) {
    373   DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    374 
    375   if (token) {
    376     DCHECK(!token_);
    377     token_ = token.Pass();
    378   }
    379 
    380   if (!pending_backgrounding_task_.is_null()) {
    381     base::Closure closure = pending_backgrounding_task_;
    382     pending_backgrounding_task_.Reset();
    383     closure.Run();
    384     return;
    385   }
    386 
    387   if (!token_)
    388     return;
    389 
    390   if (!pending_tasks_.empty()) {
    391     base::Closure closure = pending_tasks_.top().task;
    392     pending_tasks_.pop();
    393     closure.Run();
    394     return;
    395   }
    396 
    397   if (client_)
    398     client_->MaybeScheduleNextTask();
    399 }
    400 
    401 }  // namespace drive_backend
    402 }  // namespace sync_file_system
    403