1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "chrome/browser/chromeos/drive/sync_client.h" 6 7 #include <vector> 8 9 #include "base/bind.h" 10 #include "base/message_loop/message_loop_proxy.h" 11 #include "chrome/browser/chromeos/drive/drive.pb.h" 12 #include "chrome/browser/chromeos/drive/file_cache.h" 13 #include "chrome/browser/chromeos/drive/file_system/download_operation.h" 14 #include "chrome/browser/chromeos/drive/file_system/update_operation.h" 15 #include "chrome/browser/chromeos/drive/file_system_util.h" 16 #include "chrome/browser/chromeos/drive/sync/entry_update_performer.h" 17 #include "content/public/browser/browser_thread.h" 18 #include "google_apis/drive/task_util.h" 19 20 using content::BrowserThread; 21 22 namespace drive { 23 namespace internal { 24 25 namespace { 26 27 // The delay constant is used to delay processing a sync task. We should not 28 // process SyncTasks immediately for the following reasons: 29 // 30 // 1) For fetching, the user may accidentally click on "Make available 31 // offline" checkbox on a file, and immediately cancel it in a second. 32 // It's a waste to fetch the file in this case. 33 // 34 // 2) For uploading, file writing via HTML5 file system API is performed in 35 // two steps: 1) truncate a file to 0 bytes, 2) write contents. We 36 // shouldn't start uploading right after the step 1). Besides, the user 37 // may edit the same file repeatedly in a short period of time. 38 // 39 // TODO(satorux): We should find a way to handle the upload case more nicely, 40 // and shorten the delay. crbug.com/134774 41 const int kDelaySeconds = 5; 42 43 // The delay constant is used to delay retrying a sync task on server errors. 44 const int kLongDelaySeconds = 600; 45 46 // Iterates entries and appends IDs to |to_fetch| if the file is pinned but not 47 // fetched (not present locally), to |to_upload| if the file is dirty but not 48 // uploaded, or to |to_remove| if the entry is in the trash. 49 void CollectBacklog(ResourceMetadata* metadata, 50 std::vector<std::string>* to_fetch, 51 std::vector<std::string>* to_upload, 52 std::vector<std::string>* to_update) { 53 DCHECK(to_fetch); 54 DCHECK(to_upload); 55 DCHECK(to_update); 56 57 scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator(); 58 for (; !it->IsAtEnd(); it->Advance()) { 59 const std::string& local_id = it->GetID(); 60 const ResourceEntry& entry = it->GetValue(); 61 if (entry.parent_local_id() == util::kDriveTrashDirLocalId) { 62 to_update->push_back(local_id); 63 continue; 64 } 65 66 switch (entry.metadata_edit_state()) { 67 case ResourceEntry::CLEAN: 68 break; 69 case ResourceEntry::SYNCING: 70 case ResourceEntry::DIRTY: 71 to_update->push_back(local_id); 72 break; 73 } 74 75 FileCacheEntry cache_entry; 76 if (it->GetCacheEntry(&cache_entry)) { 77 if (cache_entry.is_pinned() && !cache_entry.is_present()) 78 to_fetch->push_back(local_id); 79 80 if (cache_entry.is_dirty()) 81 to_upload->push_back(local_id); 82 } 83 } 84 DCHECK(!it->HasError()); 85 } 86 87 // Iterates cache entries and collects IDs of ones with obsolete cache files. 88 void CheckExistingPinnedFiles(ResourceMetadata* metadata, 89 FileCache* cache, 90 std::vector<std::string>* local_ids) { 91 scoped_ptr<FileCache::Iterator> it = cache->GetIterator(); 92 for (; !it->IsAtEnd(); it->Advance()) { 93 const FileCacheEntry& cache_entry = it->GetValue(); 94 const std::string& local_id = it->GetID(); 95 if (!cache_entry.is_pinned() || !cache_entry.is_present()) 96 continue; 97 98 ResourceEntry entry; 99 FileError error = metadata->GetResourceEntryById(local_id, &entry); 100 if (error != FILE_ERROR_OK) { 101 LOG(WARNING) << "Entry not found: " << local_id; 102 continue; 103 } 104 105 // If MD5s don't match, it indicates the local cache file is stale, unless 106 // the file is dirty (the MD5 is "local"). We should never re-fetch the 107 // file when we have a locally modified version. 108 if (entry.file_specific_info().md5() == cache_entry.md5() || 109 cache_entry.is_dirty()) 110 continue; 111 112 error = cache->Remove(local_id); 113 if (error != FILE_ERROR_OK) { 114 LOG(WARNING) << "Failed to remove cache entry: " << local_id; 115 continue; 116 } 117 118 error = cache->Pin(local_id); 119 if (error != FILE_ERROR_OK) { 120 LOG(WARNING) << "Failed to pin cache entry: " << local_id; 121 continue; 122 } 123 124 local_ids->push_back(local_id); 125 } 126 DCHECK(!it->HasError()); 127 } 128 129 } // namespace 130 131 SyncClient::SyncTask::SyncTask() : state(PENDING), should_run_again(false) {} 132 SyncClient::SyncTask::~SyncTask() {} 133 134 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner, 135 file_system::OperationObserver* observer, 136 JobScheduler* scheduler, 137 ResourceMetadata* metadata, 138 FileCache* cache, 139 const base::FilePath& temporary_file_directory) 140 : blocking_task_runner_(blocking_task_runner), 141 metadata_(metadata), 142 cache_(cache), 143 download_operation_(new file_system::DownloadOperation( 144 blocking_task_runner, 145 observer, 146 scheduler, 147 metadata, 148 cache, 149 temporary_file_directory)), 150 update_operation_(new file_system::UpdateOperation(blocking_task_runner, 151 observer, 152 scheduler, 153 metadata, 154 cache)), 155 entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner, 156 observer, 157 scheduler, 158 metadata)), 159 delay_(base::TimeDelta::FromSeconds(kDelaySeconds)), 160 long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds)), 161 weak_ptr_factory_(this) { 162 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 163 } 164 165 SyncClient::~SyncClient() { 166 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 167 } 168 169 void SyncClient::StartProcessingBacklog() { 170 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 171 172 std::vector<std::string>* to_fetch = new std::vector<std::string>; 173 std::vector<std::string>* to_upload = new std::vector<std::string>; 174 std::vector<std::string>* to_remove = new std::vector<std::string>; 175 blocking_task_runner_->PostTaskAndReply( 176 FROM_HERE, 177 base::Bind(&CollectBacklog, metadata_, to_fetch, to_upload, to_remove), 178 base::Bind(&SyncClient::OnGetLocalIdsOfBacklog, 179 weak_ptr_factory_.GetWeakPtr(), 180 base::Owned(to_fetch), 181 base::Owned(to_upload), 182 base::Owned(to_remove))); 183 } 184 185 void SyncClient::StartCheckingExistingPinnedFiles() { 186 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 187 188 std::vector<std::string>* local_ids = new std::vector<std::string>; 189 blocking_task_runner_->PostTaskAndReply( 190 FROM_HERE, 191 base::Bind(&CheckExistingPinnedFiles, 192 metadata_, 193 cache_, 194 local_ids), 195 base::Bind(&SyncClient::AddFetchTasks, 196 weak_ptr_factory_.GetWeakPtr(), 197 base::Owned(local_ids))); 198 } 199 200 void SyncClient::AddFetchTask(const std::string& local_id) { 201 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 202 AddFetchTaskInternal(local_id, delay_); 203 } 204 205 void SyncClient::RemoveFetchTask(const std::string& local_id) { 206 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 207 208 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id)); 209 if (it == tasks_.end()) 210 return; 211 212 SyncTask* task = &it->second; 213 switch (task->state) { 214 case PENDING: 215 tasks_.erase(it); 216 break; 217 case RUNNING: 218 // TODO(kinaba): Cancel tasks in JobScheduler as well. crbug.com/248856 219 break; 220 } 221 } 222 223 void SyncClient::AddUploadTask(const ClientContext& context, 224 const std::string& local_id) { 225 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 226 AddUploadTaskInternal(context, local_id, 227 file_system::UpdateOperation::RUN_CONTENT_CHECK, 228 delay_); 229 } 230 231 void SyncClient::AddUpdateTask(const std::string& local_id) { 232 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 233 AddUpdateTaskInternal(local_id, base::TimeDelta::FromSeconds(0)); 234 } 235 236 void SyncClient::AddFetchTaskInternal(const std::string& local_id, 237 const base::TimeDelta& delay) { 238 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 239 240 SyncTask task; 241 task.task = base::Bind( 242 &file_system::DownloadOperation::EnsureFileDownloadedByLocalId, 243 base::Unretained(download_operation_.get()), 244 local_id, 245 ClientContext(BACKGROUND), 246 GetFileContentInitializedCallback(), 247 google_apis::GetContentCallback(), 248 base::Bind(&SyncClient::OnFetchFileComplete, 249 weak_ptr_factory_.GetWeakPtr(), 250 local_id)); 251 AddTask(SyncTasks::key_type(FETCH, local_id), task, delay); 252 } 253 254 void SyncClient::AddUploadTaskInternal( 255 const ClientContext& context, 256 const std::string& local_id, 257 file_system::UpdateOperation::ContentCheckMode content_check_mode, 258 const base::TimeDelta& delay) { 259 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 260 261 SyncTask task; 262 task.task = base::Bind( 263 &file_system::UpdateOperation::UpdateFileByLocalId, 264 base::Unretained(update_operation_.get()), 265 local_id, 266 context, 267 content_check_mode, 268 base::Bind(&SyncClient::OnUploadFileComplete, 269 weak_ptr_factory_.GetWeakPtr(), 270 local_id)); 271 AddTask(SyncTasks::key_type(UPLOAD, local_id), task, delay); 272 } 273 274 void SyncClient::AddUpdateTaskInternal(const std::string& local_id, 275 const base::TimeDelta& delay) { 276 SyncTask task; 277 task.task = base::Bind( 278 &EntryUpdatePerformer::UpdateEntry, 279 base::Unretained(entry_update_performer_.get()), 280 local_id, 281 base::Bind(&SyncClient::OnUpdateComplete, 282 weak_ptr_factory_.GetWeakPtr(), 283 local_id)); 284 AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay); 285 } 286 287 void SyncClient::AddTask(const SyncTasks::key_type& key, 288 const SyncTask& task, 289 const base::TimeDelta& delay) { 290 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 291 292 SyncTasks::iterator it = tasks_.find(key); 293 if (it != tasks_.end()) { 294 switch (it->second.state) { 295 case PENDING: 296 // The same task will run, do nothing. 297 break; 298 case RUNNING: 299 // Something has changed since the task started. Schedule rerun. 300 it->second.should_run_again = true; 301 break; 302 } 303 return; 304 } 305 306 DCHECK_EQ(PENDING, task.state); 307 tasks_[key] = task; 308 309 base::MessageLoopProxy::current()->PostDelayedTask( 310 FROM_HERE, 311 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key), 312 delay); 313 } 314 315 void SyncClient::StartTask(const SyncTasks::key_type& key) { 316 SyncTasks::iterator it = tasks_.find(key); 317 if (it == tasks_.end()) 318 return; 319 320 SyncTask* task = &it->second; 321 switch (task->state) { 322 case PENDING: 323 task->state = RUNNING; 324 task->task.Run(); 325 break; 326 case RUNNING: // Do nothing. 327 break; 328 } 329 } 330 331 void SyncClient::OnGetLocalIdsOfBacklog( 332 const std::vector<std::string>* to_fetch, 333 const std::vector<std::string>* to_upload, 334 const std::vector<std::string>* to_update) { 335 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 336 337 // Give priority to upload tasks over fetch tasks, so that dirty files are 338 // uploaded as soon as possible. 339 for (size_t i = 0; i < to_upload->size(); ++i) { 340 const std::string& local_id = (*to_upload)[i]; 341 DVLOG(1) << "Queuing to upload: " << local_id; 342 AddUploadTaskInternal(ClientContext(BACKGROUND), local_id, 343 file_system::UpdateOperation::NO_CONTENT_CHECK, 344 delay_); 345 } 346 347 for (size_t i = 0; i < to_fetch->size(); ++i) { 348 const std::string& local_id = (*to_fetch)[i]; 349 DVLOG(1) << "Queuing to fetch: " << local_id; 350 AddFetchTaskInternal(local_id, delay_); 351 } 352 353 for (size_t i = 0; i < to_update->size(); ++i) { 354 const std::string& local_id = (*to_update)[i]; 355 DVLOG(1) << "Queuing to update: " << local_id; 356 AddUpdateTask(local_id); 357 } 358 } 359 360 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) { 361 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 362 363 for (size_t i = 0; i < local_ids->size(); ++i) 364 AddFetchTask((*local_ids)[i]); 365 } 366 367 bool SyncClient::OnTaskComplete(SyncType type, const std::string& local_id) { 368 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 369 370 const SyncTasks::key_type key(type, local_id); 371 SyncTasks::iterator it = tasks_.find(key); 372 DCHECK(it != tasks_.end()); 373 374 if (it->second.should_run_again) { 375 DVLOG(1) << "Running again: type = " << type << ", id = " << local_id; 376 it->second.should_run_again = false; 377 it->second.task.Run(); 378 return false; 379 } 380 381 tasks_.erase(it); 382 return true; 383 } 384 385 void SyncClient::OnFetchFileComplete(const std::string& local_id, 386 FileError error, 387 const base::FilePath& local_path, 388 scoped_ptr<ResourceEntry> entry) { 389 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 390 391 if (!OnTaskComplete(FETCH, local_id)) 392 return; 393 394 if (error == FILE_ERROR_OK) { 395 DVLOG(1) << "Fetched " << local_id << ": " << local_path.value(); 396 } else { 397 switch (error) { 398 case FILE_ERROR_ABORT: 399 // If user cancels download, unpin the file so that we do not sync the 400 // file again. 401 base::PostTaskAndReplyWithResult( 402 blocking_task_runner_, 403 FROM_HERE, 404 base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id), 405 base::Bind(&util::EmptyFileOperationCallback)); 406 break; 407 case FILE_ERROR_NO_CONNECTION: 408 // Add the task again so that we'll retry once the connection is back. 409 AddFetchTaskInternal(local_id, delay_); 410 break; 411 case FILE_ERROR_SERVICE_UNAVAILABLE: 412 // Add the task again so that we'll retry once the service is back. 413 AddFetchTaskInternal(local_id, long_delay_); 414 break; 415 default: 416 LOG(WARNING) << "Failed to fetch " << local_id 417 << ": " << FileErrorToString(error); 418 } 419 } 420 } 421 422 void SyncClient::OnUploadFileComplete(const std::string& local_id, 423 FileError error) { 424 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 425 426 if (!OnTaskComplete(UPLOAD, local_id)) 427 return; 428 429 if (error == FILE_ERROR_OK) { 430 DVLOG(1) << "Uploaded " << local_id; 431 } else { 432 switch (error) { 433 case FILE_ERROR_NO_CONNECTION: 434 // Add the task again so that we'll retry once the connection is back. 435 AddUploadTaskInternal(ClientContext(BACKGROUND), local_id, 436 file_system::UpdateOperation::NO_CONTENT_CHECK, 437 delay_); 438 break; 439 case FILE_ERROR_SERVICE_UNAVAILABLE: 440 // Add the task again so that we'll retry once the service is back. 441 AddUploadTaskInternal(ClientContext(BACKGROUND), local_id, 442 file_system::UpdateOperation::NO_CONTENT_CHECK, 443 long_delay_); 444 break; 445 default: 446 LOG(WARNING) << "Failed to upload " << local_id << ": " 447 << FileErrorToString(error); 448 } 449 } 450 } 451 452 void SyncClient::OnUpdateComplete(const std::string& local_id, 453 FileError error) { 454 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 455 456 if (!OnTaskComplete(UPDATE, local_id)) 457 return; 458 459 if (error == FILE_ERROR_OK) { 460 DVLOG(1) << "Updated " << local_id; 461 } else { 462 switch (error) { 463 case FILE_ERROR_NO_CONNECTION: 464 // Add the task again so that we'll retry once the connection is back. 465 AddUpdateTaskInternal(local_id, base::TimeDelta::FromSeconds(0)); 466 break; 467 case FILE_ERROR_SERVICE_UNAVAILABLE: 468 // Add the task again so that we'll retry once the service is back. 469 AddUpdateTaskInternal(local_id, long_delay_); 470 break; 471 default: 472 LOG(WARNING) << "Failed to update " << local_id << ": " 473 << FileErrorToString(error); 474 } 475 } 476 } 477 478 } // namespace internal 479 } // namespace drive 480