Home | History | Annotate | Download | only in drive
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "chrome/browser/chromeos/drive/sync_client.h"
      6 
      7 #include <vector>
      8 
      9 #include "base/bind.h"
     10 #include "base/message_loop/message_loop_proxy.h"
     11 #include "chrome/browser/chromeos/drive/drive.pb.h"
     12 #include "chrome/browser/chromeos/drive/file_cache.h"
     13 #include "chrome/browser/chromeos/drive/file_system/download_operation.h"
     14 #include "chrome/browser/chromeos/drive/file_system/update_operation.h"
     15 #include "chrome/browser/chromeos/drive/file_system_util.h"
     16 #include "chrome/browser/chromeos/drive/sync/entry_update_performer.h"
     17 #include "content/public/browser/browser_thread.h"
     18 #include "google_apis/drive/task_util.h"
     19 
     20 using content::BrowserThread;
     21 
     22 namespace drive {
     23 namespace internal {
     24 
     25 namespace {
     26 
     27 // The delay constant is used to delay processing a sync task. We should not
     28 // process SyncTasks immediately for the following reasons:
     29 //
     30 // 1) For fetching, the user may accidentally click on "Make available
     31 //    offline" checkbox on a file, and immediately cancel it in a second.
     32 //    It's a waste to fetch the file in this case.
     33 //
     34 // 2) For uploading, file writing via HTML5 file system API is performed in
     35 //    two steps: 1) truncate a file to 0 bytes, 2) write contents. We
     36 //    shouldn't start uploading right after the step 1). Besides, the user
     37 //    may edit the same file repeatedly in a short period of time.
     38 //
     39 // TODO(satorux): We should find a way to handle the upload case more nicely,
     40 // and shorten the delay. crbug.com/134774
     41 const int kDelaySeconds = 5;
     42 
     43 // The delay constant is used to delay retrying a sync task on server errors.
     44 const int kLongDelaySeconds = 600;
     45 
     46 // Iterates entries and appends IDs to |to_fetch| if the file is pinned but not
     47 // fetched (not present locally), to |to_upload| if the file is dirty but not
     48 // uploaded, or to |to_remove| if the entry is in the trash.
     49 void CollectBacklog(ResourceMetadata* metadata,
     50                     std::vector<std::string>* to_fetch,
     51                     std::vector<std::string>* to_upload,
     52                     std::vector<std::string>* to_update) {
     53   DCHECK(to_fetch);
     54   DCHECK(to_upload);
     55   DCHECK(to_update);
     56 
     57   scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
     58   for (; !it->IsAtEnd(); it->Advance()) {
     59     const std::string& local_id = it->GetID();
     60     const ResourceEntry& entry = it->GetValue();
     61     if (entry.parent_local_id() == util::kDriveTrashDirLocalId) {
     62       to_update->push_back(local_id);
     63       continue;
     64     }
     65 
     66     switch (entry.metadata_edit_state()) {
     67       case ResourceEntry::CLEAN:
     68         break;
     69       case ResourceEntry::SYNCING:
     70       case ResourceEntry::DIRTY:
     71         to_update->push_back(local_id);
     72         break;
     73     }
     74 
     75     FileCacheEntry cache_entry;
     76     if (it->GetCacheEntry(&cache_entry)) {
     77       if (cache_entry.is_pinned() && !cache_entry.is_present())
     78         to_fetch->push_back(local_id);
     79 
     80       if (cache_entry.is_dirty())
     81         to_upload->push_back(local_id);
     82     }
     83   }
     84   DCHECK(!it->HasError());
     85 }
     86 
     87 // Iterates cache entries and collects IDs of ones with obsolete cache files.
     88 void CheckExistingPinnedFiles(ResourceMetadata* metadata,
     89                               FileCache* cache,
     90                               std::vector<std::string>* local_ids) {
     91   scoped_ptr<FileCache::Iterator> it = cache->GetIterator();
     92   for (; !it->IsAtEnd(); it->Advance()) {
     93     const FileCacheEntry& cache_entry = it->GetValue();
     94     const std::string& local_id = it->GetID();
     95     if (!cache_entry.is_pinned() || !cache_entry.is_present())
     96       continue;
     97 
     98     ResourceEntry entry;
     99     FileError error = metadata->GetResourceEntryById(local_id, &entry);
    100     if (error != FILE_ERROR_OK) {
    101       LOG(WARNING) << "Entry not found: " << local_id;
    102       continue;
    103     }
    104 
    105     // If MD5s don't match, it indicates the local cache file is stale, unless
    106     // the file is dirty (the MD5 is "local"). We should never re-fetch the
    107     // file when we have a locally modified version.
    108     if (entry.file_specific_info().md5() == cache_entry.md5() ||
    109         cache_entry.is_dirty())
    110       continue;
    111 
    112     error = cache->Remove(local_id);
    113     if (error != FILE_ERROR_OK) {
    114       LOG(WARNING) << "Failed to remove cache entry: " << local_id;
    115       continue;
    116     }
    117 
    118     error = cache->Pin(local_id);
    119     if (error != FILE_ERROR_OK) {
    120       LOG(WARNING) << "Failed to pin cache entry: " << local_id;
    121       continue;
    122     }
    123 
    124     local_ids->push_back(local_id);
    125   }
    126   DCHECK(!it->HasError());
    127 }
    128 
    129 }  // namespace
    130 
    131 SyncClient::SyncTask::SyncTask() : state(PENDING), should_run_again(false) {}
    132 SyncClient::SyncTask::~SyncTask() {}
    133 
    134 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner,
    135                        file_system::OperationObserver* observer,
    136                        JobScheduler* scheduler,
    137                        ResourceMetadata* metadata,
    138                        FileCache* cache,
    139                        const base::FilePath& temporary_file_directory)
    140     : blocking_task_runner_(blocking_task_runner),
    141       metadata_(metadata),
    142       cache_(cache),
    143       download_operation_(new file_system::DownloadOperation(
    144           blocking_task_runner,
    145           observer,
    146           scheduler,
    147           metadata,
    148           cache,
    149           temporary_file_directory)),
    150       update_operation_(new file_system::UpdateOperation(blocking_task_runner,
    151                                                          observer,
    152                                                          scheduler,
    153                                                          metadata,
    154                                                          cache)),
    155       entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner,
    156                                                        observer,
    157                                                        scheduler,
    158                                                        metadata)),
    159       delay_(base::TimeDelta::FromSeconds(kDelaySeconds)),
    160       long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds)),
    161       weak_ptr_factory_(this) {
    162   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    163 }
    164 
    165 SyncClient::~SyncClient() {
    166   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    167 }
    168 
    169 void SyncClient::StartProcessingBacklog() {
    170   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    171 
    172   std::vector<std::string>* to_fetch = new std::vector<std::string>;
    173   std::vector<std::string>* to_upload = new std::vector<std::string>;
    174   std::vector<std::string>* to_remove = new std::vector<std::string>;
    175   blocking_task_runner_->PostTaskAndReply(
    176       FROM_HERE,
    177       base::Bind(&CollectBacklog, metadata_, to_fetch, to_upload, to_remove),
    178       base::Bind(&SyncClient::OnGetLocalIdsOfBacklog,
    179                  weak_ptr_factory_.GetWeakPtr(),
    180                  base::Owned(to_fetch),
    181                  base::Owned(to_upload),
    182                  base::Owned(to_remove)));
    183 }
    184 
    185 void SyncClient::StartCheckingExistingPinnedFiles() {
    186   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    187 
    188   std::vector<std::string>* local_ids = new std::vector<std::string>;
    189   blocking_task_runner_->PostTaskAndReply(
    190       FROM_HERE,
    191       base::Bind(&CheckExistingPinnedFiles,
    192                  metadata_,
    193                  cache_,
    194                  local_ids),
    195       base::Bind(&SyncClient::AddFetchTasks,
    196                  weak_ptr_factory_.GetWeakPtr(),
    197                  base::Owned(local_ids)));
    198 }
    199 
    200 void SyncClient::AddFetchTask(const std::string& local_id) {
    201   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    202   AddFetchTaskInternal(local_id, delay_);
    203 }
    204 
    205 void SyncClient::RemoveFetchTask(const std::string& local_id) {
    206   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    207 
    208   SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id));
    209   if (it == tasks_.end())
    210     return;
    211 
    212   SyncTask* task = &it->second;
    213   switch (task->state) {
    214     case PENDING:
    215       tasks_.erase(it);
    216       break;
    217     case RUNNING:
    218       // TODO(kinaba): Cancel tasks in JobScheduler as well. crbug.com/248856
    219       break;
    220   }
    221 }
    222 
    223 void SyncClient::AddUploadTask(const ClientContext& context,
    224                                const std::string& local_id) {
    225   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    226   AddUploadTaskInternal(context, local_id,
    227                         file_system::UpdateOperation::RUN_CONTENT_CHECK,
    228                         delay_);
    229 }
    230 
    231 void SyncClient::AddUpdateTask(const std::string& local_id) {
    232   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    233   AddUpdateTaskInternal(local_id, base::TimeDelta::FromSeconds(0));
    234 }
    235 
    236 void SyncClient::AddFetchTaskInternal(const std::string& local_id,
    237                                       const base::TimeDelta& delay) {
    238   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    239 
    240   SyncTask task;
    241   task.task = base::Bind(
    242       &file_system::DownloadOperation::EnsureFileDownloadedByLocalId,
    243       base::Unretained(download_operation_.get()),
    244       local_id,
    245       ClientContext(BACKGROUND),
    246       GetFileContentInitializedCallback(),
    247       google_apis::GetContentCallback(),
    248       base::Bind(&SyncClient::OnFetchFileComplete,
    249                  weak_ptr_factory_.GetWeakPtr(),
    250                  local_id));
    251   AddTask(SyncTasks::key_type(FETCH, local_id), task, delay);
    252 }
    253 
    254 void SyncClient::AddUploadTaskInternal(
    255     const ClientContext& context,
    256     const std::string& local_id,
    257     file_system::UpdateOperation::ContentCheckMode content_check_mode,
    258     const base::TimeDelta& delay) {
    259   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    260 
    261   SyncTask task;
    262   task.task = base::Bind(
    263       &file_system::UpdateOperation::UpdateFileByLocalId,
    264       base::Unretained(update_operation_.get()),
    265       local_id,
    266       context,
    267       content_check_mode,
    268       base::Bind(&SyncClient::OnUploadFileComplete,
    269                  weak_ptr_factory_.GetWeakPtr(),
    270                  local_id));
    271   AddTask(SyncTasks::key_type(UPLOAD, local_id), task, delay);
    272 }
    273 
    274 void SyncClient::AddUpdateTaskInternal(const std::string& local_id,
    275                                        const base::TimeDelta& delay) {
    276   SyncTask task;
    277   task.task = base::Bind(
    278       &EntryUpdatePerformer::UpdateEntry,
    279       base::Unretained(entry_update_performer_.get()),
    280       local_id,
    281       base::Bind(&SyncClient::OnUpdateComplete,
    282                  weak_ptr_factory_.GetWeakPtr(),
    283                  local_id));
    284   AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay);
    285 }
    286 
    287 void SyncClient::AddTask(const SyncTasks::key_type& key,
    288                          const SyncTask& task,
    289                          const base::TimeDelta& delay) {
    290   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    291 
    292   SyncTasks::iterator it = tasks_.find(key);
    293   if (it != tasks_.end()) {
    294     switch (it->second.state) {
    295       case PENDING:
    296         // The same task will run, do nothing.
    297         break;
    298       case RUNNING:
    299         // Something has changed since the task started. Schedule rerun.
    300         it->second.should_run_again = true;
    301         break;
    302     }
    303     return;
    304   }
    305 
    306   DCHECK_EQ(PENDING, task.state);
    307   tasks_[key] = task;
    308 
    309   base::MessageLoopProxy::current()->PostDelayedTask(
    310       FROM_HERE,
    311       base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
    312       delay);
    313 }
    314 
    315 void SyncClient::StartTask(const SyncTasks::key_type& key) {
    316   SyncTasks::iterator it = tasks_.find(key);
    317   if (it == tasks_.end())
    318     return;
    319 
    320   SyncTask* task = &it->second;
    321   switch (task->state) {
    322     case PENDING:
    323       task->state = RUNNING;
    324       task->task.Run();
    325       break;
    326     case RUNNING:  // Do nothing.
    327       break;
    328   }
    329 }
    330 
    331 void SyncClient::OnGetLocalIdsOfBacklog(
    332     const std::vector<std::string>* to_fetch,
    333     const std::vector<std::string>* to_upload,
    334     const std::vector<std::string>* to_update) {
    335   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    336 
    337   // Give priority to upload tasks over fetch tasks, so that dirty files are
    338   // uploaded as soon as possible.
    339   for (size_t i = 0; i < to_upload->size(); ++i) {
    340     const std::string& local_id = (*to_upload)[i];
    341     DVLOG(1) << "Queuing to upload: " << local_id;
    342     AddUploadTaskInternal(ClientContext(BACKGROUND), local_id,
    343                           file_system::UpdateOperation::NO_CONTENT_CHECK,
    344                           delay_);
    345   }
    346 
    347   for (size_t i = 0; i < to_fetch->size(); ++i) {
    348     const std::string& local_id = (*to_fetch)[i];
    349     DVLOG(1) << "Queuing to fetch: " << local_id;
    350     AddFetchTaskInternal(local_id, delay_);
    351   }
    352 
    353   for (size_t i = 0; i < to_update->size(); ++i) {
    354     const std::string& local_id = (*to_update)[i];
    355     DVLOG(1) << "Queuing to update: " << local_id;
    356     AddUpdateTask(local_id);
    357   }
    358 }
    359 
    360 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) {
    361   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    362 
    363   for (size_t i = 0; i < local_ids->size(); ++i)
    364     AddFetchTask((*local_ids)[i]);
    365 }
    366 
    367 bool SyncClient::OnTaskComplete(SyncType type, const std::string& local_id) {
    368   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    369 
    370   const SyncTasks::key_type key(type, local_id);
    371   SyncTasks::iterator it = tasks_.find(key);
    372   DCHECK(it != tasks_.end());
    373 
    374   if (it->second.should_run_again) {
    375     DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
    376     it->second.should_run_again = false;
    377     it->second.task.Run();
    378     return false;
    379   }
    380 
    381   tasks_.erase(it);
    382   return true;
    383 }
    384 
    385 void SyncClient::OnFetchFileComplete(const std::string& local_id,
    386                                      FileError error,
    387                                      const base::FilePath& local_path,
    388                                      scoped_ptr<ResourceEntry> entry) {
    389   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    390 
    391   if (!OnTaskComplete(FETCH, local_id))
    392     return;
    393 
    394   if (error == FILE_ERROR_OK) {
    395     DVLOG(1) << "Fetched " << local_id << ": " << local_path.value();
    396   } else {
    397     switch (error) {
    398       case FILE_ERROR_ABORT:
    399         // If user cancels download, unpin the file so that we do not sync the
    400         // file again.
    401         base::PostTaskAndReplyWithResult(
    402             blocking_task_runner_,
    403             FROM_HERE,
    404             base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
    405             base::Bind(&util::EmptyFileOperationCallback));
    406         break;
    407       case FILE_ERROR_NO_CONNECTION:
    408         // Add the task again so that we'll retry once the connection is back.
    409         AddFetchTaskInternal(local_id, delay_);
    410         break;
    411       case FILE_ERROR_SERVICE_UNAVAILABLE:
    412         // Add the task again so that we'll retry once the service is back.
    413         AddFetchTaskInternal(local_id, long_delay_);
    414         break;
    415       default:
    416         LOG(WARNING) << "Failed to fetch " << local_id
    417                      << ": " << FileErrorToString(error);
    418     }
    419   }
    420 }
    421 
    422 void SyncClient::OnUploadFileComplete(const std::string& local_id,
    423                                       FileError error) {
    424   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    425 
    426   if (!OnTaskComplete(UPLOAD, local_id))
    427     return;
    428 
    429   if (error == FILE_ERROR_OK) {
    430     DVLOG(1) << "Uploaded " << local_id;
    431   } else {
    432     switch (error) {
    433       case FILE_ERROR_NO_CONNECTION:
    434         // Add the task again so that we'll retry once the connection is back.
    435         AddUploadTaskInternal(ClientContext(BACKGROUND), local_id,
    436                               file_system::UpdateOperation::NO_CONTENT_CHECK,
    437                               delay_);
    438         break;
    439       case FILE_ERROR_SERVICE_UNAVAILABLE:
    440         // Add the task again so that we'll retry once the service is back.
    441         AddUploadTaskInternal(ClientContext(BACKGROUND), local_id,
    442                               file_system::UpdateOperation::NO_CONTENT_CHECK,
    443                               long_delay_);
    444         break;
    445       default:
    446         LOG(WARNING) << "Failed to upload " << local_id << ": "
    447                      << FileErrorToString(error);
    448     }
    449   }
    450 }
    451 
    452 void SyncClient::OnUpdateComplete(const std::string& local_id,
    453                                   FileError error) {
    454   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    455 
    456   if (!OnTaskComplete(UPDATE, local_id))
    457     return;
    458 
    459   if (error == FILE_ERROR_OK) {
    460     DVLOG(1) << "Updated " << local_id;
    461   } else {
    462     switch (error) {
    463       case FILE_ERROR_NO_CONNECTION:
    464         // Add the task again so that we'll retry once the connection is back.
    465         AddUpdateTaskInternal(local_id, base::TimeDelta::FromSeconds(0));
    466         break;
    467       case FILE_ERROR_SERVICE_UNAVAILABLE:
    468         // Add the task again so that we'll retry once the service is back.
    469         AddUpdateTaskInternal(local_id, long_delay_);
    470         break;
    471       default:
    472         LOG(WARNING) << "Failed to update " << local_id << ": "
    473                      << FileErrorToString(error);
    474     }
    475   }
    476 }
    477 
    478 }  // namespace internal
    479 }  // namespace drive
    480