1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "chrome/browser/chromeos/drive/sync_client.h" 6 7 #include <vector> 8 9 #include "base/bind.h" 10 #include "base/message_loop/message_loop_proxy.h" 11 #include "chrome/browser/chromeos/drive/drive.pb.h" 12 #include "chrome/browser/chromeos/drive/file_cache.h" 13 #include "chrome/browser/chromeos/drive/file_system/download_operation.h" 14 #include "chrome/browser/chromeos/drive/file_system/operation_observer.h" 15 #include "chrome/browser/chromeos/drive/file_system_util.h" 16 #include "chrome/browser/chromeos/drive/job_scheduler.h" 17 #include "chrome/browser/chromeos/drive/sync/entry_update_performer.h" 18 #include "content/public/browser/browser_thread.h" 19 #include "google_apis/drive/task_util.h" 20 21 using content::BrowserThread; 22 23 namespace drive { 24 namespace internal { 25 26 namespace { 27 28 // The delay constant is used to delay processing a sync task. We should not 29 // process SyncTasks immediately for the following reasons: 30 // 31 // 1) For fetching, the user may accidentally click on "Make available 32 // offline" checkbox on a file, and immediately cancel it in a second. 33 // It's a waste to fetch the file in this case. 34 // 35 // 2) For uploading, file writing via HTML5 file system API is performed in 36 // two steps: 1) truncate a file to 0 bytes, 2) write contents. We 37 // shouldn't start uploading right after the step 1). Besides, the user 38 // may edit the same file repeatedly in a short period of time. 39 // 40 // TODO(satorux): We should find a way to handle the upload case more nicely, 41 // and shorten the delay. crbug.com/134774 42 const int kDelaySeconds = 1; 43 44 // The delay constant is used to delay retrying a sync task on server errors. 45 const int kLongDelaySeconds = 600; 46 47 // Iterates entries and appends IDs to |to_fetch| if the file is pinned but not 48 // fetched (not present locally), to |to_update| if the file needs update. 49 void CollectBacklog(ResourceMetadata* metadata, 50 std::vector<std::string>* to_fetch, 51 std::vector<std::string>* to_update) { 52 DCHECK(to_fetch); 53 DCHECK(to_update); 54 55 scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator(); 56 for (; !it->IsAtEnd(); it->Advance()) { 57 const std::string& local_id = it->GetID(); 58 const ResourceEntry& entry = it->GetValue(); 59 if (entry.parent_local_id() == util::kDriveTrashDirLocalId) { 60 to_update->push_back(local_id); 61 continue; 62 } 63 64 bool should_update = false; 65 switch (entry.metadata_edit_state()) { 66 case ResourceEntry::CLEAN: 67 break; 68 case ResourceEntry::SYNCING: 69 case ResourceEntry::DIRTY: 70 should_update = true; 71 break; 72 } 73 74 if (entry.file_specific_info().cache_state().is_pinned() && 75 !entry.file_specific_info().cache_state().is_present()) 76 to_fetch->push_back(local_id); 77 78 if (entry.file_specific_info().cache_state().is_dirty()) 79 should_update = true; 80 81 if (should_update) 82 to_update->push_back(local_id); 83 } 84 DCHECK(!it->HasError()); 85 } 86 87 // Iterates cache entries and collects IDs of ones with obsolete cache files. 88 void CheckExistingPinnedFiles(ResourceMetadata* metadata, 89 FileCache* cache, 90 std::vector<std::string>* local_ids) { 91 scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator(); 92 for (; !it->IsAtEnd(); it->Advance()) { 93 const ResourceEntry& entry = it->GetValue(); 94 const FileCacheEntry& cache_state = 95 entry.file_specific_info().cache_state(); 96 const std::string& local_id = it->GetID(); 97 if (!cache_state.is_pinned() || !cache_state.is_present()) 98 continue; 99 100 // If MD5s don't match, it indicates the local cache file is stale, unless 101 // the file is dirty (the MD5 is "local"). We should never re-fetch the 102 // file when we have a locally modified version. 103 if (entry.file_specific_info().md5() == cache_state.md5() || 104 cache_state.is_dirty()) 105 continue; 106 107 FileError error = cache->Remove(local_id); 108 if (error != FILE_ERROR_OK) { 109 LOG(WARNING) << "Failed to remove cache entry: " << local_id; 110 continue; 111 } 112 113 error = cache->Pin(local_id); 114 if (error != FILE_ERROR_OK) { 115 LOG(WARNING) << "Failed to pin cache entry: " << local_id; 116 continue; 117 } 118 119 local_ids->push_back(local_id); 120 } 121 DCHECK(!it->HasError()); 122 } 123 124 // Runs the task and returns a dummy cancel closure. 125 base::Closure RunTaskAndReturnDummyCancelClosure(const base::Closure& task) { 126 task.Run(); 127 return base::Closure(); 128 } 129 130 } // namespace 131 132 SyncClient::SyncTask::SyncTask() : state(PENDING), should_run_again(false) {} 133 SyncClient::SyncTask::~SyncTask() {} 134 135 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner, 136 file_system::OperationObserver* observer, 137 JobScheduler* scheduler, 138 ResourceMetadata* metadata, 139 FileCache* cache, 140 LoaderController* loader_controller, 141 const base::FilePath& temporary_file_directory) 142 : blocking_task_runner_(blocking_task_runner), 143 operation_observer_(observer), 144 metadata_(metadata), 145 cache_(cache), 146 download_operation_(new file_system::DownloadOperation( 147 blocking_task_runner, 148 observer, 149 scheduler, 150 metadata, 151 cache, 152 temporary_file_directory)), 153 entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner, 154 observer, 155 scheduler, 156 metadata, 157 cache, 158 loader_controller)), 159 delay_(base::TimeDelta::FromSeconds(kDelaySeconds)), 160 long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds)), 161 weak_ptr_factory_(this) { 162 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 163 } 164 165 SyncClient::~SyncClient() { 166 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 167 } 168 169 void SyncClient::StartProcessingBacklog() { 170 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 171 172 std::vector<std::string>* to_fetch = new std::vector<std::string>; 173 std::vector<std::string>* to_update = new std::vector<std::string>; 174 blocking_task_runner_->PostTaskAndReply( 175 FROM_HERE, 176 base::Bind(&CollectBacklog, metadata_, to_fetch, to_update), 177 base::Bind(&SyncClient::OnGetLocalIdsOfBacklog, 178 weak_ptr_factory_.GetWeakPtr(), 179 base::Owned(to_fetch), 180 base::Owned(to_update))); 181 } 182 183 void SyncClient::StartCheckingExistingPinnedFiles() { 184 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 185 186 std::vector<std::string>* local_ids = new std::vector<std::string>; 187 blocking_task_runner_->PostTaskAndReply( 188 FROM_HERE, 189 base::Bind(&CheckExistingPinnedFiles, 190 metadata_, 191 cache_, 192 local_ids), 193 base::Bind(&SyncClient::AddFetchTasks, 194 weak_ptr_factory_.GetWeakPtr(), 195 base::Owned(local_ids))); 196 } 197 198 void SyncClient::AddFetchTask(const std::string& local_id) { 199 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 200 AddFetchTaskInternal(local_id, delay_); 201 } 202 203 void SyncClient::RemoveFetchTask(const std::string& local_id) { 204 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 205 206 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id)); 207 if (it == tasks_.end()) 208 return; 209 210 SyncTask* task = &it->second; 211 switch (task->state) { 212 case PENDING: 213 tasks_.erase(it); 214 break; 215 case RUNNING: 216 if (!task->cancel_closure.is_null()) 217 task->cancel_closure.Run(); 218 break; 219 } 220 } 221 222 void SyncClient::AddUpdateTask(const ClientContext& context, 223 const std::string& local_id) { 224 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 225 AddUpdateTaskInternal(context, local_id, delay_); 226 } 227 228 void SyncClient::AddFetchTaskInternal(const std::string& local_id, 229 const base::TimeDelta& delay) { 230 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 231 232 SyncTask task; 233 task.task = base::Bind( 234 &file_system::DownloadOperation::EnsureFileDownloadedByLocalId, 235 base::Unretained(download_operation_.get()), 236 local_id, 237 ClientContext(BACKGROUND), 238 GetFileContentInitializedCallback(), 239 google_apis::GetContentCallback(), 240 base::Bind(&SyncClient::OnFetchFileComplete, 241 weak_ptr_factory_.GetWeakPtr(), 242 local_id)); 243 AddTask(SyncTasks::key_type(FETCH, local_id), task, delay); 244 } 245 246 void SyncClient::AddUpdateTaskInternal(const ClientContext& context, 247 const std::string& local_id, 248 const base::TimeDelta& delay) { 249 SyncTask task; 250 task.task = base::Bind( 251 &RunTaskAndReturnDummyCancelClosure, 252 base::Bind(&EntryUpdatePerformer::UpdateEntry, 253 base::Unretained(entry_update_performer_.get()), 254 local_id, 255 context, 256 base::Bind(&SyncClient::OnUpdateComplete, 257 weak_ptr_factory_.GetWeakPtr(), 258 local_id))); 259 AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay); 260 } 261 262 void SyncClient::AddTask(const SyncTasks::key_type& key, 263 const SyncTask& task, 264 const base::TimeDelta& delay) { 265 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 266 267 SyncTasks::iterator it = tasks_.find(key); 268 if (it != tasks_.end()) { 269 switch (it->second.state) { 270 case PENDING: 271 // The same task will run, do nothing. 272 break; 273 case RUNNING: 274 // Something has changed since the task started. Schedule rerun. 275 it->second.should_run_again = true; 276 break; 277 } 278 return; 279 } 280 281 DCHECK_EQ(PENDING, task.state); 282 tasks_[key] = task; 283 284 base::MessageLoopProxy::current()->PostDelayedTask( 285 FROM_HERE, 286 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key), 287 delay); 288 } 289 290 void SyncClient::StartTask(const SyncTasks::key_type& key) { 291 SyncTasks::iterator it = tasks_.find(key); 292 if (it == tasks_.end()) 293 return; 294 295 SyncTask* task = &it->second; 296 switch (task->state) { 297 case PENDING: 298 task->state = RUNNING; 299 task->cancel_closure = task->task.Run(); 300 break; 301 case RUNNING: // Do nothing. 302 break; 303 } 304 } 305 306 void SyncClient::OnGetLocalIdsOfBacklog( 307 const std::vector<std::string>* to_fetch, 308 const std::vector<std::string>* to_update) { 309 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 310 311 // Give priority to upload tasks over fetch tasks, so that dirty files are 312 // uploaded as soon as possible. 313 for (size_t i = 0; i < to_update->size(); ++i) { 314 const std::string& local_id = (*to_update)[i]; 315 DVLOG(1) << "Queuing to update: " << local_id; 316 AddUpdateTask(ClientContext(BACKGROUND), local_id); 317 } 318 319 for (size_t i = 0; i < to_fetch->size(); ++i) { 320 const std::string& local_id = (*to_fetch)[i]; 321 DVLOG(1) << "Queuing to fetch: " << local_id; 322 AddFetchTaskInternal(local_id, delay_); 323 } 324 } 325 326 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) { 327 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 328 329 for (size_t i = 0; i < local_ids->size(); ++i) 330 AddFetchTask((*local_ids)[i]); 331 } 332 333 bool SyncClient::OnTaskComplete(SyncType type, const std::string& local_id) { 334 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 335 336 const SyncTasks::key_type key(type, local_id); 337 SyncTasks::iterator it = tasks_.find(key); 338 DCHECK(it != tasks_.end()); 339 340 if (it->second.should_run_again) { 341 DVLOG(1) << "Running again: type = " << type << ", id = " << local_id; 342 it->second.should_run_again = false; 343 it->second.task.Run(); 344 return false; 345 } 346 347 tasks_.erase(it); 348 return true; 349 } 350 351 void SyncClient::OnFetchFileComplete(const std::string& local_id, 352 FileError error, 353 const base::FilePath& local_path, 354 scoped_ptr<ResourceEntry> entry) { 355 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 356 357 if (!OnTaskComplete(FETCH, local_id)) 358 return; 359 360 if (error == FILE_ERROR_OK) { 361 DVLOG(1) << "Fetched " << local_id << ": " << local_path.value(); 362 } else { 363 switch (error) { 364 case FILE_ERROR_ABORT: 365 // If user cancels download, unpin the file so that we do not sync the 366 // file again. 367 base::PostTaskAndReplyWithResult( 368 blocking_task_runner_, 369 FROM_HERE, 370 base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id), 371 base::Bind(&util::EmptyFileOperationCallback)); 372 break; 373 case FILE_ERROR_NO_CONNECTION: 374 // Add the task again so that we'll retry once the connection is back. 375 AddFetchTaskInternal(local_id, delay_); 376 break; 377 case FILE_ERROR_SERVICE_UNAVAILABLE: 378 // Add the task again so that we'll retry once the service is back. 379 AddFetchTaskInternal(local_id, long_delay_); 380 operation_observer_->OnDriveSyncError( 381 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE, 382 local_id); 383 break; 384 default: 385 operation_observer_->OnDriveSyncError( 386 file_system::DRIVE_SYNC_ERROR_MISC, 387 local_id); 388 LOG(WARNING) << "Failed to fetch " << local_id 389 << ": " << FileErrorToString(error); 390 } 391 } 392 } 393 394 void SyncClient::OnUpdateComplete(const std::string& local_id, 395 FileError error) { 396 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 397 398 if (!OnTaskComplete(UPDATE, local_id)) 399 return; 400 401 if (error == FILE_ERROR_OK) { 402 DVLOG(1) << "Updated " << local_id; 403 404 // Add update tasks for child entries which may be waiting for the parent to 405 // be updated. 406 ResourceEntryVector* entries = new ResourceEntryVector; 407 base::PostTaskAndReplyWithResult( 408 blocking_task_runner_.get(), 409 FROM_HERE, 410 base::Bind(&ResourceMetadata::ReadDirectoryById, 411 base::Unretained(metadata_), local_id, entries), 412 base::Bind(&SyncClient::AddChildUpdateTasks, 413 weak_ptr_factory_.GetWeakPtr(), base::Owned(entries))); 414 } else { 415 switch (error) { 416 case FILE_ERROR_ABORT: 417 // Ignore it because this is caused by user's cancel operations. 418 break; 419 case FILE_ERROR_NO_CONNECTION: 420 // Add the task again so that we'll retry once the connection is back. 421 AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id, 422 base::TimeDelta::FromSeconds(0)); 423 break; 424 case FILE_ERROR_SERVICE_UNAVAILABLE: 425 // Add the task again so that we'll retry once the service is back. 426 AddUpdateTaskInternal(ClientContext(BACKGROUND), local_id, long_delay_); 427 operation_observer_->OnDriveSyncError( 428 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE, 429 local_id); 430 break; 431 default: 432 operation_observer_->OnDriveSyncError( 433 file_system::DRIVE_SYNC_ERROR_MISC, 434 local_id); 435 LOG(WARNING) << "Failed to update " << local_id << ": " 436 << FileErrorToString(error); 437 } 438 } 439 } 440 441 void SyncClient::AddChildUpdateTasks(const ResourceEntryVector* entries, 442 FileError error) { 443 if (error != FILE_ERROR_OK) 444 return; 445 446 for (size_t i = 0; i < entries->size(); ++i) { 447 const ResourceEntry& entry = (*entries)[i]; 448 if (entry.metadata_edit_state() != ResourceEntry::CLEAN) { 449 AddUpdateTaskInternal(ClientContext(BACKGROUND), entry.local_id(), 450 base::TimeDelta::FromSeconds(0)); 451 } 452 } 453 } 454 455 } // namespace internal 456 } // namespace drive 457