Home | History | Annotate | Download | only in drive
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "chrome/browser/chromeos/drive/sync_client.h"
      6 
      7 #include <vector>
      8 
      9 #include "base/bind.h"
     10 #include "base/message_loop/message_loop_proxy.h"
     11 #include "chrome/browser/chromeos/drive/drive.pb.h"
     12 #include "chrome/browser/chromeos/drive/file_cache.h"
     13 #include "chrome/browser/chromeos/drive/file_system/download_operation.h"
     14 #include "chrome/browser/chromeos/drive/file_system/operation_delegate.h"
     15 #include "chrome/browser/chromeos/drive/file_system_util.h"
     16 #include "chrome/browser/chromeos/drive/job_scheduler.h"
     17 #include "chrome/browser/chromeos/drive/sync/entry_update_performer.h"
     18 #include "content/public/browser/browser_thread.h"
     19 #include "google_apis/drive/task_util.h"
     20 
     21 using content::BrowserThread;
     22 
     23 namespace drive {
     24 namespace internal {
     25 
     26 namespace {
     27 
     28 // The delay constant is used to delay processing a sync task. We should not
     29 // process SyncTasks immediately for the following reasons:
     30 //
     31 // 1) For fetching, the user may accidentally click on "Make available
     32 //    offline" checkbox on a file, and immediately cancel it in a second.
     33 //    It's a waste to fetch the file in this case.
     34 //
     35 // 2) For uploading, file writing via HTML5 file system API is performed in
     36 //    two steps: 1) truncate a file to 0 bytes, 2) write contents. We
     37 //    shouldn't start uploading right after the step 1). Besides, the user
     38 //    may edit the same file repeatedly in a short period of time.
     39 //
     40 // TODO(satorux): We should find a way to handle the upload case more nicely,
     41 // and shorten the delay. crbug.com/134774
     42 const int kDelaySeconds = 1;
     43 
     44 // The delay constant is used to delay retrying a sync task on server errors.
     45 const int kLongDelaySeconds = 600;
     46 
     47 // Iterates entries and appends IDs to |to_fetch| if the file is pinned but not
     48 // fetched (not present locally), to |to_update| if the file needs update.
     49 void CollectBacklog(ResourceMetadata* metadata,
     50                     std::vector<std::string>* to_fetch,
     51                     std::vector<std::string>* to_update) {
     52   DCHECK(to_fetch);
     53   DCHECK(to_update);
     54 
     55   scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
     56   for (; !it->IsAtEnd(); it->Advance()) {
     57     const std::string& local_id = it->GetID();
     58     const ResourceEntry& entry = it->GetValue();
     59     if (entry.parent_local_id() == util::kDriveTrashDirLocalId) {
     60       to_update->push_back(local_id);
     61       continue;
     62     }
     63 
     64     bool should_update = false;
     65     switch (entry.metadata_edit_state()) {
     66       case ResourceEntry::CLEAN:
     67         break;
     68       case ResourceEntry::SYNCING:
     69       case ResourceEntry::DIRTY:
     70         should_update = true;
     71         break;
     72     }
     73 
     74     if (entry.file_specific_info().cache_state().is_pinned() &&
     75         !entry.file_specific_info().cache_state().is_present())
     76       to_fetch->push_back(local_id);
     77 
     78     if (entry.file_specific_info().cache_state().is_dirty())
     79       should_update = true;
     80 
     81     if (should_update)
     82       to_update->push_back(local_id);
     83   }
     84   DCHECK(!it->HasError());
     85 }
     86 
     87 // Iterates cache entries and collects IDs of ones with obsolete cache files.
     88 void CheckExistingPinnedFiles(ResourceMetadata* metadata,
     89                               FileCache* cache,
     90                               std::vector<std::string>* local_ids) {
     91   scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
     92   for (; !it->IsAtEnd(); it->Advance()) {
     93     const ResourceEntry& entry = it->GetValue();
     94     const FileCacheEntry& cache_state =
     95         entry.file_specific_info().cache_state();
     96     const std::string& local_id = it->GetID();
     97     if (!cache_state.is_pinned() || !cache_state.is_present())
     98       continue;
     99 
    100     // If MD5s don't match, it indicates the local cache file is stale, unless
    101     // the file is dirty (the MD5 is "local"). We should never re-fetch the
    102     // file when we have a locally modified version.
    103     if (entry.file_specific_info().md5() == cache_state.md5() ||
    104         cache_state.is_dirty())
    105       continue;
    106 
    107     FileError error = cache->Remove(local_id);
    108     if (error != FILE_ERROR_OK) {
    109       LOG(WARNING) << "Failed to remove cache entry: " << local_id;
    110       continue;
    111     }
    112 
    113     error = cache->Pin(local_id);
    114     if (error != FILE_ERROR_OK) {
    115       LOG(WARNING) << "Failed to pin cache entry: " << local_id;
    116       continue;
    117     }
    118 
    119     local_ids->push_back(local_id);
    120   }
    121   DCHECK(!it->HasError());
    122 }
    123 
    124 // Gets the parent entry of the entry specified by the ID.
    125 FileError GetParentResourceEntry(ResourceMetadata* metadata,
    126                                  const std::string& local_id,
    127                                  ResourceEntry* parent) {
    128   ResourceEntry entry;
    129   FileError error = metadata->GetResourceEntryById(local_id, &entry);
    130   if (error != FILE_ERROR_OK)
    131     return error;
    132   return metadata->GetResourceEntryById(entry.parent_local_id(), parent);
    133 }
    134 
    135 }  // namespace
    136 
    137 SyncClient::SyncTask::SyncTask()
    138     : state(SUSPENDED), context(BACKGROUND), should_run_again(false) {}
    139 SyncClient::SyncTask::~SyncTask() {}
    140 
    141 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner,
    142                        file_system::OperationDelegate* delegate,
    143                        JobScheduler* scheduler,
    144                        ResourceMetadata* metadata,
    145                        FileCache* cache,
    146                        LoaderController* loader_controller,
    147                        const base::FilePath& temporary_file_directory)
    148     : blocking_task_runner_(blocking_task_runner),
    149       operation_delegate_(delegate),
    150       metadata_(metadata),
    151       cache_(cache),
    152       download_operation_(new file_system::DownloadOperation(
    153           blocking_task_runner,
    154           delegate,
    155           scheduler,
    156           metadata,
    157           cache,
    158           temporary_file_directory)),
    159       entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner,
    160                                                        delegate,
    161                                                        scheduler,
    162                                                        metadata,
    163                                                        cache,
    164                                                        loader_controller)),
    165       delay_(base::TimeDelta::FromSeconds(kDelaySeconds)),
    166       long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds)),
    167       weak_ptr_factory_(this) {
    168   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    169 }
    170 
    171 SyncClient::~SyncClient() {
    172   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    173 }
    174 
    175 void SyncClient::StartProcessingBacklog() {
    176   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    177 
    178   std::vector<std::string>* to_fetch = new std::vector<std::string>;
    179   std::vector<std::string>* to_update = new std::vector<std::string>;
    180   blocking_task_runner_->PostTaskAndReply(
    181       FROM_HERE,
    182       base::Bind(&CollectBacklog, metadata_, to_fetch, to_update),
    183       base::Bind(&SyncClient::OnGetLocalIdsOfBacklog,
    184                  weak_ptr_factory_.GetWeakPtr(),
    185                  base::Owned(to_fetch),
    186                  base::Owned(to_update)));
    187 }
    188 
    189 void SyncClient::StartCheckingExistingPinnedFiles() {
    190   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    191 
    192   std::vector<std::string>* local_ids = new std::vector<std::string>;
    193   blocking_task_runner_->PostTaskAndReply(
    194       FROM_HERE,
    195       base::Bind(&CheckExistingPinnedFiles,
    196                  metadata_,
    197                  cache_,
    198                  local_ids),
    199       base::Bind(&SyncClient::AddFetchTasks,
    200                  weak_ptr_factory_.GetWeakPtr(),
    201                  base::Owned(local_ids)));
    202 }
    203 
    204 void SyncClient::AddFetchTask(const std::string& local_id) {
    205   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    206   AddFetchTaskInternal(local_id, delay_);
    207 }
    208 
    209 void SyncClient::RemoveFetchTask(const std::string& local_id) {
    210   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    211 
    212   SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id));
    213   if (it == tasks_.end())
    214     return;
    215 
    216   SyncTask* task = &it->second;
    217   switch (task->state) {
    218     case SUSPENDED:
    219     case PENDING:
    220       OnTaskComplete(FETCH, local_id, FILE_ERROR_ABORT);
    221       break;
    222     case RUNNING:
    223       if (!task->cancel_closure.is_null())
    224         task->cancel_closure.Run();
    225       break;
    226   }
    227 }
    228 
    229 void SyncClient::AddUpdateTask(const ClientContext& context,
    230                                const std::string& local_id) {
    231   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    232   AddUpdateTaskInternal(context, local_id, delay_);
    233 }
    234 
    235 bool SyncClient:: WaitForUpdateTaskToComplete(
    236     const std::string& local_id,
    237     const FileOperationCallback& callback) {
    238   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    239 
    240   SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(UPDATE, local_id));
    241   if (it == tasks_.end())
    242     return false;
    243 
    244   SyncTask* task = &it->second;
    245   task->waiting_callbacks.push_back(callback);
    246   return true;
    247 }
    248 
    249 base::Closure SyncClient::PerformFetchTask(const std::string& local_id,
    250                                            const ClientContext& context) {
    251   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    252   return download_operation_->EnsureFileDownloadedByLocalId(
    253       local_id,
    254       context,
    255       GetFileContentInitializedCallback(),
    256       google_apis::GetContentCallback(),
    257       base::Bind(&SyncClient::OnFetchFileComplete,
    258                  weak_ptr_factory_.GetWeakPtr(),
    259                  local_id));
    260 }
    261 
    262 void SyncClient::AddFetchTaskInternal(const std::string& local_id,
    263                                       const base::TimeDelta& delay) {
    264   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    265 
    266   SyncTask task;
    267   task.state = PENDING;
    268   task.context = ClientContext(BACKGROUND);
    269   task.task = base::Bind(&SyncClient::PerformFetchTask,
    270                          base::Unretained(this),
    271                          local_id);
    272   AddTask(SyncTasks::key_type(FETCH, local_id), task, delay);
    273 }
    274 
    275 base::Closure SyncClient::PerformUpdateTask(const std::string& local_id,
    276                                             const ClientContext& context) {
    277   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    278   entry_update_performer_->UpdateEntry(
    279       local_id,
    280       context,
    281       base::Bind(&SyncClient::OnTaskComplete,
    282                  weak_ptr_factory_.GetWeakPtr(),
    283                  UPDATE,
    284                  local_id));
    285   return base::Closure();
    286 }
    287 
    288 void SyncClient::AddUpdateTaskInternal(const ClientContext& context,
    289                                        const std::string& local_id,
    290                                        const base::TimeDelta& delay) {
    291   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    292 
    293   SyncTask task;
    294   task.state = PENDING;
    295   task.context = context;
    296   task.task = base::Bind(&SyncClient::PerformUpdateTask,
    297                          base::Unretained(this),
    298                          local_id);
    299   AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay);
    300 }
    301 
    302 void SyncClient::AddTask(const SyncTasks::key_type& key,
    303                          const SyncTask& task,
    304                          const base::TimeDelta& delay) {
    305   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    306 
    307   SyncTasks::iterator it = tasks_.find(key);
    308   if (it != tasks_.end()) {
    309     switch (it->second.state) {
    310       case SUSPENDED:
    311         // Activate the task.
    312         it->second.state = PENDING;
    313         break;
    314       case PENDING:
    315         // The same task will run, do nothing.
    316         return;
    317       case RUNNING:
    318         // Something has changed since the task started. Schedule rerun.
    319         it->second.should_run_again = true;
    320         return;
    321     }
    322   } else {
    323     tasks_[key] = task;
    324   }
    325   DCHECK_EQ(PENDING, task.state);
    326   base::MessageLoopProxy::current()->PostDelayedTask(
    327       FROM_HERE,
    328       base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
    329       delay);
    330 }
    331 
    332 void SyncClient::StartTask(const SyncTasks::key_type& key) {
    333   ResourceEntry* parent = new ResourceEntry;
    334   base::PostTaskAndReplyWithResult(
    335       blocking_task_runner_.get(),
    336       FROM_HERE,
    337       base::Bind(&GetParentResourceEntry, metadata_, key.second, parent),
    338       base::Bind(&SyncClient::StartTaskAfterGetParentResourceEntry,
    339                  weak_ptr_factory_.GetWeakPtr(),
    340                  key,
    341                  base::Owned(parent)));
    342 }
    343 
    344 void SyncClient::StartTaskAfterGetParentResourceEntry(
    345     const SyncTasks::key_type& key,
    346     const ResourceEntry* parent,
    347     FileError error) {
    348   const SyncType type = key.first;
    349   const std::string& local_id = key.second;
    350   SyncTasks::iterator it = tasks_.find(key);
    351   if (it == tasks_.end())
    352     return;
    353 
    354   SyncTask* task = &it->second;
    355   switch (task->state) {
    356     case SUSPENDED:
    357     case PENDING:
    358       break;
    359     case RUNNING:  // Do nothing.
    360       return;
    361   }
    362 
    363   if (error != FILE_ERROR_OK) {
    364     OnTaskComplete(type, local_id, error);
    365     return;
    366   }
    367 
    368   if (type == UPDATE &&
    369       parent->resource_id().empty() &&
    370       parent->local_id() != util::kDriveTrashDirLocalId) {
    371     // Parent entry needs to be synced to get a resource ID.
    372     // Suspend the task and register it as a dependent task of the parent.
    373     const SyncTasks::key_type key_parent(type, parent->local_id());
    374     SyncTasks::iterator it_parent = tasks_.find(key_parent);
    375     if (it_parent == tasks_.end()) {
    376       OnTaskComplete(type, local_id, FILE_ERROR_INVALID_OPERATION);
    377       LOG(WARNING) << "Parent task not found: type = " << type << ", id = "
    378                    << local_id << ", parent_id = " << parent->local_id();
    379       return;
    380     }
    381     task->state = SUSPENDED;
    382     it_parent->second.dependent_tasks.push_back(key);
    383     return;
    384   }
    385 
    386   // Run the task.
    387   task->state = RUNNING;
    388   task->cancel_closure = task->task.Run(task->context);
    389 }
    390 
    391 void SyncClient::OnGetLocalIdsOfBacklog(
    392     const std::vector<std::string>* to_fetch,
    393     const std::vector<std::string>* to_update) {
    394   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    395 
    396   // Give priority to upload tasks over fetch tasks, so that dirty files are
    397   // uploaded as soon as possible.
    398   for (size_t i = 0; i < to_update->size(); ++i) {
    399     const std::string& local_id = (*to_update)[i];
    400     DVLOG(1) << "Queuing to update: " << local_id;
    401     AddUpdateTask(ClientContext(BACKGROUND), local_id);
    402   }
    403 
    404   for (size_t i = 0; i < to_fetch->size(); ++i) {
    405     const std::string& local_id = (*to_fetch)[i];
    406     DVLOG(1) << "Queuing to fetch: " << local_id;
    407     AddFetchTaskInternal(local_id, delay_);
    408   }
    409 }
    410 
    411 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) {
    412   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    413 
    414   for (size_t i = 0; i < local_ids->size(); ++i)
    415     AddFetchTask((*local_ids)[i]);
    416 }
    417 
    418 void SyncClient::OnTaskComplete(SyncType type,
    419                                 const std::string& local_id,
    420                                 FileError error) {
    421   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    422 
    423   const SyncTasks::key_type key(type, local_id);
    424   SyncTasks::iterator it = tasks_.find(key);
    425   DCHECK(it != tasks_.end());
    426 
    427   base::TimeDelta retry_delay = base::TimeDelta::FromSeconds(0);
    428 
    429   switch (error) {
    430     case FILE_ERROR_OK:
    431       DVLOG(1) << "Completed: type = " << type << ", id = " << local_id;
    432       break;
    433     case FILE_ERROR_ABORT:
    434       // Ignore it because this is caused by user's cancel operations.
    435       break;
    436     case FILE_ERROR_NO_CONNECTION:
    437       // Run the task again so that we'll retry once the connection is back.
    438       it->second.should_run_again = true;
    439       it->second.context = ClientContext(BACKGROUND);
    440       break;
    441     case FILE_ERROR_SERVICE_UNAVAILABLE:
    442       // Run the task again so that we'll retry once the service is back.
    443       it->second.should_run_again = true;
    444       it->second.context = ClientContext(BACKGROUND);
    445       retry_delay = long_delay_;
    446       operation_delegate_->OnDriveSyncError(
    447           file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE, local_id);
    448       break;
    449     default:
    450       operation_delegate_->OnDriveSyncError(
    451           file_system::DRIVE_SYNC_ERROR_MISC, local_id);
    452       LOG(WARNING) << "Failed: type = " << type << ", id = " << local_id
    453                    << ": " << FileErrorToString(error);
    454   }
    455 
    456   for (size_t i = 0; i < it->second.waiting_callbacks.size(); ++i) {
    457     base::MessageLoopProxy::current()->PostTask(
    458         FROM_HERE, base::Bind(it->second.waiting_callbacks[i], error));
    459   }
    460   it->second.waiting_callbacks.clear();
    461 
    462   if (it->second.should_run_again) {
    463     DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
    464     it->second.state = PENDING;
    465     it->second.should_run_again = false;
    466     base::MessageLoopProxy::current()->PostDelayedTask(
    467         FROM_HERE,
    468         base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
    469         retry_delay);
    470   } else {
    471     for (size_t i = 0; i < it->second.dependent_tasks.size(); ++i)
    472       StartTask(it->second.dependent_tasks[i]);
    473     tasks_.erase(it);
    474   }
    475 }
    476 
    477 void SyncClient::OnFetchFileComplete(const std::string& local_id,
    478                                      FileError error,
    479                                      const base::FilePath& local_path,
    480                                      scoped_ptr<ResourceEntry> entry) {
    481   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    482   OnTaskComplete(FETCH, local_id, error);
    483   if (error == FILE_ERROR_ABORT) {
    484     // If user cancels download, unpin the file so that we do not sync the file
    485     // again.
    486     base::PostTaskAndReplyWithResult(
    487         blocking_task_runner_.get(),
    488         FROM_HERE,
    489         base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
    490         base::Bind(&util::EmptyFileOperationCallback));
    491   }
    492 }
    493 
    494 }  // namespace internal
    495 }  // namespace drive
    496