1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "chrome/browser/chromeos/drive/sync_client.h" 6 7 #include <vector> 8 9 #include "base/bind.h" 10 #include "base/message_loop/message_loop_proxy.h" 11 #include "chrome/browser/chromeos/drive/drive.pb.h" 12 #include "chrome/browser/chromeos/drive/file_cache.h" 13 #include "chrome/browser/chromeos/drive/file_system/download_operation.h" 14 #include "chrome/browser/chromeos/drive/file_system/operation_delegate.h" 15 #include "chrome/browser/chromeos/drive/file_system_util.h" 16 #include "chrome/browser/chromeos/drive/job_scheduler.h" 17 #include "chrome/browser/chromeos/drive/sync/entry_update_performer.h" 18 #include "content/public/browser/browser_thread.h" 19 #include "google_apis/drive/task_util.h" 20 21 using content::BrowserThread; 22 23 namespace drive { 24 namespace internal { 25 26 namespace { 27 28 // The delay constant is used to delay processing a sync task. We should not 29 // process SyncTasks immediately for the following reasons: 30 // 31 // 1) For fetching, the user may accidentally click on "Make available 32 // offline" checkbox on a file, and immediately cancel it in a second. 33 // It's a waste to fetch the file in this case. 34 // 35 // 2) For uploading, file writing via HTML5 file system API is performed in 36 // two steps: 1) truncate a file to 0 bytes, 2) write contents. We 37 // shouldn't start uploading right after the step 1). Besides, the user 38 // may edit the same file repeatedly in a short period of time. 39 // 40 // TODO(satorux): We should find a way to handle the upload case more nicely, 41 // and shorten the delay. crbug.com/134774 42 const int kDelaySeconds = 1; 43 44 // The delay constant is used to delay retrying a sync task on server errors. 45 const int kLongDelaySeconds = 600; 46 47 // Iterates entries and appends IDs to |to_fetch| if the file is pinned but not 48 // fetched (not present locally), to |to_update| if the file needs update. 49 void CollectBacklog(ResourceMetadata* metadata, 50 std::vector<std::string>* to_fetch, 51 std::vector<std::string>* to_update) { 52 DCHECK(to_fetch); 53 DCHECK(to_update); 54 55 scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator(); 56 for (; !it->IsAtEnd(); it->Advance()) { 57 const std::string& local_id = it->GetID(); 58 const ResourceEntry& entry = it->GetValue(); 59 if (entry.parent_local_id() == util::kDriveTrashDirLocalId) { 60 to_update->push_back(local_id); 61 continue; 62 } 63 64 bool should_update = false; 65 switch (entry.metadata_edit_state()) { 66 case ResourceEntry::CLEAN: 67 break; 68 case ResourceEntry::SYNCING: 69 case ResourceEntry::DIRTY: 70 should_update = true; 71 break; 72 } 73 74 if (entry.file_specific_info().cache_state().is_pinned() && 75 !entry.file_specific_info().cache_state().is_present()) 76 to_fetch->push_back(local_id); 77 78 if (entry.file_specific_info().cache_state().is_dirty()) 79 should_update = true; 80 81 if (should_update) 82 to_update->push_back(local_id); 83 } 84 DCHECK(!it->HasError()); 85 } 86 87 // Iterates cache entries and collects IDs of ones with obsolete cache files. 88 void CheckExistingPinnedFiles(ResourceMetadata* metadata, 89 FileCache* cache, 90 std::vector<std::string>* local_ids) { 91 scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator(); 92 for (; !it->IsAtEnd(); it->Advance()) { 93 const ResourceEntry& entry = it->GetValue(); 94 const FileCacheEntry& cache_state = 95 entry.file_specific_info().cache_state(); 96 const std::string& local_id = it->GetID(); 97 if (!cache_state.is_pinned() || !cache_state.is_present()) 98 continue; 99 100 // If MD5s don't match, it indicates the local cache file is stale, unless 101 // the file is dirty (the MD5 is "local"). We should never re-fetch the 102 // file when we have a locally modified version. 103 if (entry.file_specific_info().md5() == cache_state.md5() || 104 cache_state.is_dirty()) 105 continue; 106 107 FileError error = cache->Remove(local_id); 108 if (error != FILE_ERROR_OK) { 109 LOG(WARNING) << "Failed to remove cache entry: " << local_id; 110 continue; 111 } 112 113 error = cache->Pin(local_id); 114 if (error != FILE_ERROR_OK) { 115 LOG(WARNING) << "Failed to pin cache entry: " << local_id; 116 continue; 117 } 118 119 local_ids->push_back(local_id); 120 } 121 DCHECK(!it->HasError()); 122 } 123 124 // Gets the parent entry of the entry specified by the ID. 125 FileError GetParentResourceEntry(ResourceMetadata* metadata, 126 const std::string& local_id, 127 ResourceEntry* parent) { 128 ResourceEntry entry; 129 FileError error = metadata->GetResourceEntryById(local_id, &entry); 130 if (error != FILE_ERROR_OK) 131 return error; 132 return metadata->GetResourceEntryById(entry.parent_local_id(), parent); 133 } 134 135 } // namespace 136 137 SyncClient::SyncTask::SyncTask() 138 : state(SUSPENDED), context(BACKGROUND), should_run_again(false) {} 139 SyncClient::SyncTask::~SyncTask() {} 140 141 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner, 142 file_system::OperationDelegate* delegate, 143 JobScheduler* scheduler, 144 ResourceMetadata* metadata, 145 FileCache* cache, 146 LoaderController* loader_controller, 147 const base::FilePath& temporary_file_directory) 148 : blocking_task_runner_(blocking_task_runner), 149 operation_delegate_(delegate), 150 metadata_(metadata), 151 cache_(cache), 152 download_operation_(new file_system::DownloadOperation( 153 blocking_task_runner, 154 delegate, 155 scheduler, 156 metadata, 157 cache, 158 temporary_file_directory)), 159 entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner, 160 delegate, 161 scheduler, 162 metadata, 163 cache, 164 loader_controller)), 165 delay_(base::TimeDelta::FromSeconds(kDelaySeconds)), 166 long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds)), 167 weak_ptr_factory_(this) { 168 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 169 } 170 171 SyncClient::~SyncClient() { 172 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 173 } 174 175 void SyncClient::StartProcessingBacklog() { 176 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 177 178 std::vector<std::string>* to_fetch = new std::vector<std::string>; 179 std::vector<std::string>* to_update = new std::vector<std::string>; 180 blocking_task_runner_->PostTaskAndReply( 181 FROM_HERE, 182 base::Bind(&CollectBacklog, metadata_, to_fetch, to_update), 183 base::Bind(&SyncClient::OnGetLocalIdsOfBacklog, 184 weak_ptr_factory_.GetWeakPtr(), 185 base::Owned(to_fetch), 186 base::Owned(to_update))); 187 } 188 189 void SyncClient::StartCheckingExistingPinnedFiles() { 190 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 191 192 std::vector<std::string>* local_ids = new std::vector<std::string>; 193 blocking_task_runner_->PostTaskAndReply( 194 FROM_HERE, 195 base::Bind(&CheckExistingPinnedFiles, 196 metadata_, 197 cache_, 198 local_ids), 199 base::Bind(&SyncClient::AddFetchTasks, 200 weak_ptr_factory_.GetWeakPtr(), 201 base::Owned(local_ids))); 202 } 203 204 void SyncClient::AddFetchTask(const std::string& local_id) { 205 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 206 AddFetchTaskInternal(local_id, delay_); 207 } 208 209 void SyncClient::RemoveFetchTask(const std::string& local_id) { 210 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 211 212 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id)); 213 if (it == tasks_.end()) 214 return; 215 216 SyncTask* task = &it->second; 217 switch (task->state) { 218 case SUSPENDED: 219 case PENDING: 220 OnTaskComplete(FETCH, local_id, FILE_ERROR_ABORT); 221 break; 222 case RUNNING: 223 if (!task->cancel_closure.is_null()) 224 task->cancel_closure.Run(); 225 break; 226 } 227 } 228 229 void SyncClient::AddUpdateTask(const ClientContext& context, 230 const std::string& local_id) { 231 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 232 AddUpdateTaskInternal(context, local_id, delay_); 233 } 234 235 bool SyncClient:: WaitForUpdateTaskToComplete( 236 const std::string& local_id, 237 const FileOperationCallback& callback) { 238 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 239 240 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(UPDATE, local_id)); 241 if (it == tasks_.end()) 242 return false; 243 244 SyncTask* task = &it->second; 245 task->waiting_callbacks.push_back(callback); 246 return true; 247 } 248 249 base::Closure SyncClient::PerformFetchTask(const std::string& local_id, 250 const ClientContext& context) { 251 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 252 return download_operation_->EnsureFileDownloadedByLocalId( 253 local_id, 254 context, 255 GetFileContentInitializedCallback(), 256 google_apis::GetContentCallback(), 257 base::Bind(&SyncClient::OnFetchFileComplete, 258 weak_ptr_factory_.GetWeakPtr(), 259 local_id)); 260 } 261 262 void SyncClient::AddFetchTaskInternal(const std::string& local_id, 263 const base::TimeDelta& delay) { 264 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 265 266 SyncTask task; 267 task.state = PENDING; 268 task.context = ClientContext(BACKGROUND); 269 task.task = base::Bind(&SyncClient::PerformFetchTask, 270 base::Unretained(this), 271 local_id); 272 AddTask(SyncTasks::key_type(FETCH, local_id), task, delay); 273 } 274 275 base::Closure SyncClient::PerformUpdateTask(const std::string& local_id, 276 const ClientContext& context) { 277 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 278 entry_update_performer_->UpdateEntry( 279 local_id, 280 context, 281 base::Bind(&SyncClient::OnTaskComplete, 282 weak_ptr_factory_.GetWeakPtr(), 283 UPDATE, 284 local_id)); 285 return base::Closure(); 286 } 287 288 void SyncClient::AddUpdateTaskInternal(const ClientContext& context, 289 const std::string& local_id, 290 const base::TimeDelta& delay) { 291 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 292 293 SyncTask task; 294 task.state = PENDING; 295 task.context = context; 296 task.task = base::Bind(&SyncClient::PerformUpdateTask, 297 base::Unretained(this), 298 local_id); 299 AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay); 300 } 301 302 void SyncClient::AddTask(const SyncTasks::key_type& key, 303 const SyncTask& task, 304 const base::TimeDelta& delay) { 305 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 306 307 SyncTasks::iterator it = tasks_.find(key); 308 if (it != tasks_.end()) { 309 switch (it->second.state) { 310 case SUSPENDED: 311 // Activate the task. 312 it->second.state = PENDING; 313 break; 314 case PENDING: 315 // The same task will run, do nothing. 316 return; 317 case RUNNING: 318 // Something has changed since the task started. Schedule rerun. 319 it->second.should_run_again = true; 320 return; 321 } 322 } else { 323 tasks_[key] = task; 324 } 325 DCHECK_EQ(PENDING, task.state); 326 base::MessageLoopProxy::current()->PostDelayedTask( 327 FROM_HERE, 328 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key), 329 delay); 330 } 331 332 void SyncClient::StartTask(const SyncTasks::key_type& key) { 333 ResourceEntry* parent = new ResourceEntry; 334 base::PostTaskAndReplyWithResult( 335 blocking_task_runner_.get(), 336 FROM_HERE, 337 base::Bind(&GetParentResourceEntry, metadata_, key.second, parent), 338 base::Bind(&SyncClient::StartTaskAfterGetParentResourceEntry, 339 weak_ptr_factory_.GetWeakPtr(), 340 key, 341 base::Owned(parent))); 342 } 343 344 void SyncClient::StartTaskAfterGetParentResourceEntry( 345 const SyncTasks::key_type& key, 346 const ResourceEntry* parent, 347 FileError error) { 348 const SyncType type = key.first; 349 const std::string& local_id = key.second; 350 SyncTasks::iterator it = tasks_.find(key); 351 if (it == tasks_.end()) 352 return; 353 354 SyncTask* task = &it->second; 355 switch (task->state) { 356 case SUSPENDED: 357 case PENDING: 358 break; 359 case RUNNING: // Do nothing. 360 return; 361 } 362 363 if (error != FILE_ERROR_OK) { 364 OnTaskComplete(type, local_id, error); 365 return; 366 } 367 368 if (type == UPDATE && 369 parent->resource_id().empty() && 370 parent->local_id() != util::kDriveTrashDirLocalId) { 371 // Parent entry needs to be synced to get a resource ID. 372 // Suspend the task and register it as a dependent task of the parent. 373 const SyncTasks::key_type key_parent(type, parent->local_id()); 374 SyncTasks::iterator it_parent = tasks_.find(key_parent); 375 if (it_parent == tasks_.end()) { 376 OnTaskComplete(type, local_id, FILE_ERROR_INVALID_OPERATION); 377 LOG(WARNING) << "Parent task not found: type = " << type << ", id = " 378 << local_id << ", parent_id = " << parent->local_id(); 379 return; 380 } 381 task->state = SUSPENDED; 382 it_parent->second.dependent_tasks.push_back(key); 383 return; 384 } 385 386 // Run the task. 387 task->state = RUNNING; 388 task->cancel_closure = task->task.Run(task->context); 389 } 390 391 void SyncClient::OnGetLocalIdsOfBacklog( 392 const std::vector<std::string>* to_fetch, 393 const std::vector<std::string>* to_update) { 394 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 395 396 // Give priority to upload tasks over fetch tasks, so that dirty files are 397 // uploaded as soon as possible. 398 for (size_t i = 0; i < to_update->size(); ++i) { 399 const std::string& local_id = (*to_update)[i]; 400 DVLOG(1) << "Queuing to update: " << local_id; 401 AddUpdateTask(ClientContext(BACKGROUND), local_id); 402 } 403 404 for (size_t i = 0; i < to_fetch->size(); ++i) { 405 const std::string& local_id = (*to_fetch)[i]; 406 DVLOG(1) << "Queuing to fetch: " << local_id; 407 AddFetchTaskInternal(local_id, delay_); 408 } 409 } 410 411 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) { 412 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 413 414 for (size_t i = 0; i < local_ids->size(); ++i) 415 AddFetchTask((*local_ids)[i]); 416 } 417 418 void SyncClient::OnTaskComplete(SyncType type, 419 const std::string& local_id, 420 FileError error) { 421 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 422 423 const SyncTasks::key_type key(type, local_id); 424 SyncTasks::iterator it = tasks_.find(key); 425 DCHECK(it != tasks_.end()); 426 427 base::TimeDelta retry_delay = base::TimeDelta::FromSeconds(0); 428 429 switch (error) { 430 case FILE_ERROR_OK: 431 DVLOG(1) << "Completed: type = " << type << ", id = " << local_id; 432 break; 433 case FILE_ERROR_ABORT: 434 // Ignore it because this is caused by user's cancel operations. 435 break; 436 case FILE_ERROR_NO_CONNECTION: 437 // Run the task again so that we'll retry once the connection is back. 438 it->second.should_run_again = true; 439 it->second.context = ClientContext(BACKGROUND); 440 break; 441 case FILE_ERROR_SERVICE_UNAVAILABLE: 442 // Run the task again so that we'll retry once the service is back. 443 it->second.should_run_again = true; 444 it->second.context = ClientContext(BACKGROUND); 445 retry_delay = long_delay_; 446 operation_delegate_->OnDriveSyncError( 447 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE, local_id); 448 break; 449 default: 450 operation_delegate_->OnDriveSyncError( 451 file_system::DRIVE_SYNC_ERROR_MISC, local_id); 452 LOG(WARNING) << "Failed: type = " << type << ", id = " << local_id 453 << ": " << FileErrorToString(error); 454 } 455 456 for (size_t i = 0; i < it->second.waiting_callbacks.size(); ++i) { 457 base::MessageLoopProxy::current()->PostTask( 458 FROM_HERE, base::Bind(it->second.waiting_callbacks[i], error)); 459 } 460 it->second.waiting_callbacks.clear(); 461 462 if (it->second.should_run_again) { 463 DVLOG(1) << "Running again: type = " << type << ", id = " << local_id; 464 it->second.state = PENDING; 465 it->second.should_run_again = false; 466 base::MessageLoopProxy::current()->PostDelayedTask( 467 FROM_HERE, 468 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key), 469 retry_delay); 470 } else { 471 for (size_t i = 0; i < it->second.dependent_tasks.size(); ++i) 472 StartTask(it->second.dependent_tasks[i]); 473 tasks_.erase(it); 474 } 475 } 476 477 void SyncClient::OnFetchFileComplete(const std::string& local_id, 478 FileError error, 479 const base::FilePath& local_path, 480 scoped_ptr<ResourceEntry> entry) { 481 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 482 OnTaskComplete(FETCH, local_id, error); 483 if (error == FILE_ERROR_ABORT) { 484 // If user cancels download, unpin the file so that we do not sync the file 485 // again. 486 base::PostTaskAndReplyWithResult( 487 blocking_task_runner_.get(), 488 FROM_HERE, 489 base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id), 490 base::Bind(&util::EmptyFileOperationCallback)); 491 } 492 } 493 494 } // namespace internal 495 } // namespace drive 496