Home | History | Annotate | Download | only in drive
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "chrome/browser/chromeos/drive/sync_client.h"
      6 
      7 #include <vector>
      8 
      9 #include "base/bind.h"
     10 #include "base/message_loop/message_loop_proxy.h"
     11 #include "chrome/browser/chromeos/drive/drive.pb.h"
     12 #include "chrome/browser/chromeos/drive/file_cache.h"
     13 #include "chrome/browser/chromeos/drive/file_system/download_operation.h"
     14 #include "chrome/browser/chromeos/drive/file_system/operation_observer.h"
     15 #include "chrome/browser/chromeos/drive/file_system_util.h"
     16 #include "chrome/browser/chromeos/drive/job_scheduler.h"
     17 #include "chrome/browser/chromeos/drive/sync/entry_update_performer.h"
     18 #include "content/public/browser/browser_thread.h"
     19 #include "google_apis/drive/task_util.h"
     20 
     21 using content::BrowserThread;
     22 
     23 namespace drive {
     24 namespace internal {
     25 
     26 namespace {
     27 
     28 // The delay constant is used to delay processing a sync task. We should not
     29 // process SyncTasks immediately for the following reasons:
     30 //
     31 // 1) For fetching, the user may accidentally click on "Make available
     32 //    offline" checkbox on a file, and immediately cancel it in a second.
     33 //    It's a waste to fetch the file in this case.
     34 //
     35 // 2) For uploading, file writing via HTML5 file system API is performed in
     36 //    two steps: 1) truncate a file to 0 bytes, 2) write contents. We
     37 //    shouldn't start uploading right after the step 1). Besides, the user
     38 //    may edit the same file repeatedly in a short period of time.
     39 //
     40 // TODO(satorux): We should find a way to handle the upload case more nicely,
     41 // and shorten the delay. crbug.com/134774
     42 const int kDelaySeconds = 1;
     43 
     44 // The delay constant is used to delay retrying a sync task on server errors.
     45 const int kLongDelaySeconds = 600;
     46 
     47 // Iterates entries and appends IDs to |to_fetch| if the file is pinned but not
     48 // fetched (not present locally), to |to_update| if the file needs update.
     49 void CollectBacklog(ResourceMetadata* metadata,
     50                     std::vector<std::string>* to_fetch,
     51                     std::vector<std::string>* to_update) {
     52   DCHECK(to_fetch);
     53   DCHECK(to_update);
     54 
     55   scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
     56   for (; !it->IsAtEnd(); it->Advance()) {
     57     const std::string& local_id = it->GetID();
     58     const ResourceEntry& entry = it->GetValue();
     59     if (entry.parent_local_id() == util::kDriveTrashDirLocalId) {
     60       to_update->push_back(local_id);
     61       continue;
     62     }
     63 
     64     bool should_update = false;
     65     switch (entry.metadata_edit_state()) {
     66       case ResourceEntry::CLEAN:
     67         break;
     68       case ResourceEntry::SYNCING:
     69       case ResourceEntry::DIRTY:
     70         should_update = true;
     71         break;
     72     }
     73 
     74     if (entry.file_specific_info().cache_state().is_pinned() &&
     75         !entry.file_specific_info().cache_state().is_present())
     76       to_fetch->push_back(local_id);
     77 
     78     if (entry.file_specific_info().cache_state().is_dirty())
     79       should_update = true;
     80 
     81     if (should_update)
     82       to_update->push_back(local_id);
     83   }
     84   DCHECK(!it->HasError());
     85 }
     86 
     87 // Iterates cache entries and collects IDs of ones with obsolete cache files.
     88 void CheckExistingPinnedFiles(ResourceMetadata* metadata,
     89                               FileCache* cache,
     90                               std::vector<std::string>* local_ids) {
     91   scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
     92   for (; !it->IsAtEnd(); it->Advance()) {
     93     const ResourceEntry& entry = it->GetValue();
     94     const FileCacheEntry& cache_state =
     95         entry.file_specific_info().cache_state();
     96     const std::string& local_id = it->GetID();
     97     if (!cache_state.is_pinned() || !cache_state.is_present())
     98       continue;
     99 
    100     // If MD5s don't match, it indicates the local cache file is stale, unless
    101     // the file is dirty (the MD5 is "local"). We should never re-fetch the
    102     // file when we have a locally modified version.
    103     if (entry.file_specific_info().md5() == cache_state.md5() ||
    104         cache_state.is_dirty())
    105       continue;
    106 
    107     FileError error = cache->Remove(local_id);
    108     if (error != FILE_ERROR_OK) {
    109       LOG(WARNING) << "Failed to remove cache entry: " << local_id;
    110       continue;
    111     }
    112 
    113     error = cache->Pin(local_id);
    114     if (error != FILE_ERROR_OK) {
    115       LOG(WARNING) << "Failed to pin cache entry: " << local_id;
    116       continue;
    117     }
    118 
    119     local_ids->push_back(local_id);
    120   }
    121   DCHECK(!it->HasError());
    122 }
    123 
    124 // Runs the task and returns a dummy cancel closure.
    125 base::Closure RunTaskAndReturnDummyCancelClosure(const base::Closure& task) {
    126   task.Run();
    127   return base::Closure();
    128 }
    129 
    130 }  // namespace
    131 
    132 SyncClient::SyncTask::SyncTask() : state(PENDING), should_run_again(false) {}
    133 SyncClient::SyncTask::~SyncTask() {}
    134 
    135 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner,
    136                        file_system::OperationObserver* observer,
    137                        JobScheduler* scheduler,
    138                        ResourceMetadata* metadata,
    139                        FileCache* cache,
    140                        LoaderController* loader_controller,
    141                        const base::FilePath& temporary_file_directory)
    142     : blocking_task_runner_(blocking_task_runner),
    143       operation_observer_(observer),
    144       metadata_(metadata),
    145       cache_(cache),
    146       download_operation_(new file_system::DownloadOperation(
    147           blocking_task_runner,
    148           observer,
    149           scheduler,
    150           metadata,
    151           cache,
    152           temporary_file_directory)),
    153       entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner,
    154                                                        observer,
    155                                                        scheduler,
    156                                                        metadata,
    157                                                        cache,
    158                                                        loader_controller)),
    159       delay_(base::TimeDelta::FromSeconds(kDelaySeconds)),
    160       long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds)),
    161       weak_ptr_factory_(this) {
    162   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    163 }
    164 
    165 SyncClient::~SyncClient() {
    166   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    167 }
    168 
    169 void SyncClient::StartProcessingBacklog() {
    170   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    171 
    172   std::vector<std::string>* to_fetch = new std::vector<std::string>;
    173   std::vector<std::string>* to_update = new std::vector<std::string>;
    174   blocking_task_runner_->PostTaskAndReply(
    175       FROM_HERE,
    176       base::Bind(&CollectBacklog, metadata_, to_fetch, to_update),
    177       base::Bind(&SyncClient::OnGetLocalIdsOfBacklog,
    178                  weak_ptr_factory_.GetWeakPtr(),
    179                  base::Owned(to_fetch),
    180                  base::Owned(to_update)));
    181 }
    182 
    183 void SyncClient::StartCheckingExistingPinnedFiles() {
    184   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    185 
    186   std::vector<std::string>* local_ids = new std::vector<std::string>;
    187   blocking_task_runner_->PostTaskAndReply(
    188       FROM_HERE,
    189       base::Bind(&CheckExistingPinnedFiles,
    190                  metadata_,
    191                  cache_,
    192                  local_ids),
    193       base::Bind(&SyncClient::AddFetchTasks,
    194                  weak_ptr_factory_.GetWeakPtr(),
    195                  base::Owned(local_ids)));
    196 }
    197 
    198 void SyncClient::AddFetchTask(const std::string& local_id) {
    199   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    200   AddFetchTaskInternal(local_id, delay_);
    201 }
    202 
    203 void SyncClient::RemoveFetchTask(const std::string& local_id) {
    204   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    205 
    206   SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id));
    207   if (it == tasks_.end())
    208     return;
    209 
    210   SyncTask* task = &it->second;
    211   switch (task->state) {
    212     case PENDING:
    213       tasks_.erase(it);
    214       break;
    215     case RUNNING:
    216       if (!task->cancel_closure.is_null())
    217         task->cancel_closure.Run();
    218       break;
    219   }
    220 }
    221 
    222 void SyncClient::AddUpdateTask(const ClientContext& context,
    223                                const std::string& local_id) {
    224   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    225   AddUpdateTaskInternal(context, local_id, delay_);
    226 }
    227 
    228 void SyncClient::AddFetchTaskInternal(const std::string& local_id,
    229                                       const base::TimeDelta& delay) {
    230   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    231 
    232   SyncTask task;
    233   task.task = base::Bind(
    234       &file_system::DownloadOperation::EnsureFileDownloadedByLocalId,
    235       base::Unretained(download_operation_.get()),
    236       local_id,
    237       ClientContext(BACKGROUND),
    238       GetFileContentInitializedCallback(),
    239       google_apis::GetContentCallback(),
    240       base::Bind(&SyncClient::OnFetchFileComplete,
    241                  weak_ptr_factory_.GetWeakPtr(),
    242                  local_id));
    243   AddTask(SyncTasks::key_type(FETCH, local_id), task, delay);
    244 }
    245 
    246 void SyncClient::AddUpdateTaskInternal(const ClientContext& context,
    247                                        const std::string& local_id,
    248                                        const base::TimeDelta& delay) {
    249   SyncTask task;
    250   task.task = base::Bind(
    251       &RunTaskAndReturnDummyCancelClosure,
    252       base::Bind(&EntryUpdatePerformer::UpdateEntry,
    253                  base::Unretained(entry_update_performer_.get()),
    254                  local_id,
    255                  context,
    256                  base::Bind(&SyncClient::OnUpdateComplete,
    257                             weak_ptr_factory_.GetWeakPtr(),
    258                             local_id)));
    259   AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay);
    260 }
    261 
    262 void SyncClient::AddTask(const SyncTasks::key_type& key,
    263                          const SyncTask& task,
    264                          const base::TimeDelta& delay) {
    265   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    266 
    267   SyncTasks::iterator it = tasks_.find(key);
    268   if (it != tasks_.end()) {
    269     switch (it->second.state) {
    270       case PENDING:
    271         // The same task will run, do nothing.
    272         break;
    273       case RUNNING:
    274         // Something has changed since the task started. Schedule rerun.
    275         it->second.should_run_again = true;
    276         break;
    277     }
    278     return;
    279   }
    280 
    281   DCHECK_EQ(PENDING, task.state);
    282   tasks_[key] = task;
    283 
    284   base::MessageLoopProxy::current()->PostDelayedTask(
    285       FROM_HERE,
    286       base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
    287       delay);
    288 }
    289 
    290 void SyncClient::StartTask(const SyncTasks::key_type& key) {
    291   SyncTasks::iterator it = tasks_.find(key);
    292   if (it == tasks_.end())
    293     return;
    294 
    295   SyncTask* task = &it->second;
    296   switch (task->state) {
    297     case PENDING:
    298       task->state = RUNNING;
    299       task->cancel_closure = task->task.Run();
    300       break;
    301     case RUNNING:  // Do nothing.
    302       break;
    303   }
    304 }
    305 
    306 void SyncClient::OnGetLocalIdsOfBacklog(
    307     const std::vector<std::string>* to_fetch,
    308     const std::vector<std::string>* to_update) {
    309   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    310 
    311   // Give priority to upload tasks over fetch tasks, so that dirty files are
    312   // uploaded as soon as possible.
    313   for (size_t i = 0; i < to_update->size(); ++i) {
    314     const std::string& local_id = (*to_update)[i];
    315     DVLOG(1) << "Queuing to update: " << local_id;
    316     AddUpdateTask(ClientContext(BACKGROUND), local_id);
    317   }
    318 
    319   for (size_t i = 0; i < to_fetch->size(); ++i) {
    320     const std::string& local_id = (*to_fetch)[i];
    321     DVLOG(1) << "Queuing to fetch: " << local_id;
    322     AddFetchTaskInternal(local_id, delay_);
    323   }
    324 }
    325 
    326 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) {
    327   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    328 
    329   for (size_t i = 0; i < local_ids->size(); ++i)
    330     AddFetchTask((*local_ids)[i]);
    331 }
    332 
    333 bool SyncClient::OnTaskComplete(SyncType type, const std::string& local_id) {
    334   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    335 
    336   const SyncTasks::key_type key(type, local_id);
    337   SyncTasks::iterator it = tasks_.find(key);
    338   DCHECK(it != tasks_.end());
    339 
    340   if (it->second.should_run_again) {
    341     DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
    342     it->second.should_run_again = false;
    343     it->second.task.Run();
    344     return false;
    345   }
    346 
    347   tasks_.erase(it);
    348   return true;
    349 }
    350 
    351 void SyncClient::OnFetchFileComplete(const std::string& local_id,
    352                                      FileError error,
    353                                      const base::FilePath& local_path,
    354                                      scoped_ptr<ResourceEntry> entry) {
    355   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    356 
    357   if (!OnTaskComplete(FETCH, local_id))
    358     return;
    359 
    360   if (error == FILE_ERROR_OK) {
    361     DVLOG(1) << "Fetched " << local_id << ": " << local_path.value();
    362   } else {
    363     switch (error) {
    364       case FILE_ERROR_ABORT:
    365         // If user cancels download, unpin the file so that we do not sync the
    366         // file again.
    367         base::PostTaskAndReplyWithResult(
    368             blocking_task_runner_,
    369             FROM_HERE,
    370             base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
    371             base::Bind(&util::EmptyFileOperationCallback));
    372         break;
    373       case FILE_ERROR_NO_CONNECTION:
    374         // Add the task again so that we'll retry once the connection is back.
    375         AddFetchTaskInternal(local_id, delay_);
    376         break;
    377       case FILE_ERROR_SERVICE_UNAVAILABLE:
    378         // Add the task again so that we'll retry once the service is back.
    379         AddFetchTaskInternal(local_id, long_delay_);
    380         operation_observer_->OnDriveSyncError(
    381             file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
    382             local_id);
    383         break;
    384       default:
    385         operation_observer_->OnDriveSyncError(
    386             file_system::DRIVE_SYNC_ERROR_MISC,
    387             local_id);
    388         LOG(WARNING) << "Failed to fetch " << local_id
    389                      << ": " << FileErrorToString(error);
    390     }
    391   }
    392 }
    393 
    394 void SyncClient::OnUpdateComplete(const std::string& local_id,
    395                                   FileError error) {
    396   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    397 
    398   if (!OnTaskComplete(UPDATE, local_id))
    399     return;
    400 
    401   if (error == FILE_ERROR_OK) {
    402     DVLOG(1) << "Updated " << local_id;
    403 
    404     // Add update tasks for child entries which may be waiting for the parent to
    405     // be updated.
    406     ResourceEntryVector* entries = new ResourceEntryVector;
    407     base::PostTaskAndReplyWithResult(
    408         blocking_task_runner_.get(),
    409         FROM_HERE,
    410         base::Bind(&ResourceMetadata::ReadDirectoryById,
    411                    base::Unretained(metadata_), local_id, entries),
    412         base::Bind(&SyncClient::AddChildUpdateTasks,
    413                    weak_ptr_factory_.GetWeakPtr(), base::Owned(entries)));
    414   } else {
    415     switch (error) {
    416       case FILE_ERROR_ABORT:
    417         // Ignore it because this is caused by user's cancel operations.
    418         break;
    419       case FILE_ERROR_NO_CONNECTION:
    420         // Add the task again so that we'll retry once the connection is back.
    421         AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id,
    422                               base::TimeDelta::FromSeconds(0));
    423         break;
    424       case FILE_ERROR_SERVICE_UNAVAILABLE:
    425         // Add the task again so that we'll retry once the service is back.
    426         AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id, long_delay_);
    427         operation_observer_->OnDriveSyncError(
    428             file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE,
    429             local_id);
    430         break;
    431       default:
    432         operation_observer_->OnDriveSyncError(
    433             file_system::DRIVE_SYNC_ERROR_MISC,
    434             local_id);
    435         LOG(WARNING) << "Failed to update " << local_id << ": "
    436                      << FileErrorToString(error);
    437     }
    438   }
    439 }
    440 
    441 void SyncClient::AddChildUpdateTasks(const ResourceEntryVector* entries,
    442                                      FileError error) {
    443   if (error != FILE_ERROR_OK)
    444     return;
    445 
    446   for (size_t i = 0; i < entries->size(); ++i) {
    447     const ResourceEntry& entry = (*entries)[i];
    448     if (entry.metadata_edit_state() != ResourceEntry::CLEAN) {
    449       AddUpdateTaskInternal(ClientContext(BACKGROUND), entry.local_id(),
    450                             base::TimeDelta::FromSeconds(0));
    451     }
    452   }
    453 }
    454 
    455 }  // namespace internal
    456 }  // namespace drive
    457