1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "chrome/browser/chromeos/drive/job_scheduler.h" 6 7 #include "base/message_loop/message_loop.h" 8 #include "base/prefs/pref_service.h" 9 #include "base/rand_util.h" 10 #include "base/strings/string_number_conversions.h" 11 #include "base/strings/stringprintf.h" 12 #include "chrome/browser/drive/event_logger.h" 13 #include "chrome/common/pref_names.h" 14 #include "content/public/browser/browser_thread.h" 15 #include "google_apis/drive/drive_api_parser.h" 16 17 using content::BrowserThread; 18 19 namespace drive { 20 21 namespace { 22 23 // All jobs are retried at maximum of kMaxRetryCount when they fail due to 24 // throttling or server error. The delay before retrying a job is shared among 25 // jobs. It doubles in length on each failure, upto 2^kMaxThrottleCount seconds. 26 // 27 // According to the API documentation, kMaxRetryCount should be the same as 28 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors). 29 // But currently multiplied by 2 to ensure upload related jobs retried for a 30 // sufficient number of times. crbug.com/269918 31 const int kMaxThrottleCount = 4; 32 const int kMaxRetryCount = 2 * kMaxThrottleCount; 33 34 // GetDefaultValue returns a value constructed by the default constructor. 35 template<typename T> struct DefaultValueCreator { 36 static T GetDefaultValue() { return T(); } 37 }; 38 template<typename T> struct DefaultValueCreator<const T&> { 39 static T GetDefaultValue() { return T(); } 40 }; 41 42 // Helper of CreateErrorRunCallback implementation. 43 // Provides: 44 // - ResultType; the type of the Callback which should be returned by 45 // CreateErrorRunCallback. 46 // - Run(): a static function which takes the original |callback| and |error|, 47 // and runs the |callback|.Run() with the error code and default values 48 // for remaining arguments. 49 template<typename CallbackType> struct CreateErrorRunCallbackHelper; 50 51 // CreateErrorRunCallback with two arguments. 52 template<typename P1> 53 struct CreateErrorRunCallbackHelper<void(google_apis::GDataErrorCode, P1)> { 54 static void Run( 55 const base::Callback<void(google_apis::GDataErrorCode, P1)>& callback, 56 google_apis::GDataErrorCode error) { 57 callback.Run(error, DefaultValueCreator<P1>::GetDefaultValue()); 58 } 59 }; 60 61 // Returns a callback with the tail parameter bound to its default value. 62 // In other words, returned_callback.Run(error) runs callback.Run(error, T()). 63 template<typename CallbackType> 64 base::Callback<void(google_apis::GDataErrorCode)> 65 CreateErrorRunCallback(const base::Callback<CallbackType>& callback) { 66 return base::Bind(&CreateErrorRunCallbackHelper<CallbackType>::Run, callback); 67 } 68 69 // Parameter struct for RunUploadNewFile. 70 struct UploadNewFileParams { 71 std::string parent_resource_id; 72 base::FilePath local_file_path; 73 std::string title; 74 std::string content_type; 75 DriveUploader::UploadNewFileOptions options; 76 UploadCompletionCallback callback; 77 google_apis::ProgressCallback progress_callback; 78 }; 79 80 // Helper function to work around the arity limitation of base::Bind. 81 google_apis::CancelCallback RunUploadNewFile( 82 DriveUploaderInterface* uploader, 83 const UploadNewFileParams& params) { 84 return uploader->UploadNewFile(params.parent_resource_id, 85 params.local_file_path, 86 params.title, 87 params.content_type, 88 params.options, 89 params.callback, 90 params.progress_callback); 91 } 92 93 // Parameter struct for RunUploadExistingFile. 94 struct UploadExistingFileParams { 95 std::string resource_id; 96 base::FilePath local_file_path; 97 std::string content_type; 98 DriveUploader::UploadExistingFileOptions options; 99 std::string etag; 100 UploadCompletionCallback callback; 101 google_apis::ProgressCallback progress_callback; 102 }; 103 104 // Helper function to work around the arity limitation of base::Bind. 105 google_apis::CancelCallback RunUploadExistingFile( 106 DriveUploaderInterface* uploader, 107 const UploadExistingFileParams& params) { 108 return uploader->UploadExistingFile(params.resource_id, 109 params.local_file_path, 110 params.content_type, 111 params.options, 112 params.callback, 113 params.progress_callback); 114 } 115 116 // Parameter struct for RunResumeUploadFile. 117 struct ResumeUploadFileParams { 118 GURL upload_location; 119 base::FilePath local_file_path; 120 std::string content_type; 121 UploadCompletionCallback callback; 122 google_apis::ProgressCallback progress_callback; 123 }; 124 125 // Helper function to adjust the return type. 126 google_apis::CancelCallback RunResumeUploadFile( 127 DriveUploaderInterface* uploader, 128 const ResumeUploadFileParams& params) { 129 return uploader->ResumeUploadFile(params.upload_location, 130 params.local_file_path, 131 params.content_type, 132 params.callback, 133 params.progress_callback); 134 } 135 136 } // namespace 137 138 // Metadata jobs are cheap, so we run them concurrently. File jobs run serially. 139 const int JobScheduler::kMaxJobCount[] = { 140 5, // METADATA_QUEUE 141 1, // FILE_QUEUE 142 }; 143 144 JobScheduler::JobEntry::JobEntry(JobType type) 145 : job_info(type), 146 context(ClientContext(USER_INITIATED)), 147 retry_count(0) { 148 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 149 } 150 151 JobScheduler::JobEntry::~JobEntry() { 152 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 153 } 154 155 struct JobScheduler::ResumeUploadParams { 156 base::FilePath drive_file_path; 157 base::FilePath local_file_path; 158 std::string content_type; 159 }; 160 161 JobScheduler::JobScheduler( 162 PrefService* pref_service, 163 EventLogger* logger, 164 DriveServiceInterface* drive_service, 165 base::SequencedTaskRunner* blocking_task_runner) 166 : throttle_count_(0), 167 wait_until_(base::Time::Now()), 168 disable_throttling_(false), 169 logger_(logger), 170 drive_service_(drive_service), 171 uploader_(new DriveUploader(drive_service, blocking_task_runner)), 172 pref_service_(pref_service), 173 weak_ptr_factory_(this) { 174 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 175 176 for (int i = 0; i < NUM_QUEUES; ++i) 177 queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES)); 178 179 net::NetworkChangeNotifier::AddConnectionTypeObserver(this); 180 } 181 182 JobScheduler::~JobScheduler() { 183 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 184 185 size_t num_queued_jobs = 0; 186 for (int i = 0; i < NUM_QUEUES; ++i) 187 num_queued_jobs += queue_[i]->GetNumberOfJobs(); 188 DCHECK_EQ(num_queued_jobs, job_map_.size()); 189 190 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this); 191 } 192 193 std::vector<JobInfo> JobScheduler::GetJobInfoList() { 194 std::vector<JobInfo> job_info_list; 195 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance()) 196 job_info_list.push_back(iter.GetCurrentValue()->job_info); 197 return job_info_list; 198 } 199 200 void JobScheduler::AddObserver(JobListObserver* observer) { 201 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 202 observer_list_.AddObserver(observer); 203 } 204 205 void JobScheduler::RemoveObserver(JobListObserver* observer) { 206 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 207 observer_list_.RemoveObserver(observer); 208 } 209 210 void JobScheduler::CancelJob(JobID job_id) { 211 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 212 213 JobEntry* job = job_map_.Lookup(job_id); 214 if (job) { 215 if (job->job_info.state == STATE_RUNNING) { 216 // If the job is running an HTTP request, cancel it via |cancel_callback| 217 // returned from the request, and wait for termination in the normal 218 // callback handler, OnJobDone. 219 if (!job->cancel_callback.is_null()) 220 job->cancel_callback.Run(); 221 } else { 222 AbortNotRunningJob(job, google_apis::GDATA_CANCELLED); 223 } 224 } 225 } 226 227 void JobScheduler::CancelAllJobs() { 228 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 229 230 // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports 231 // removable during iteration. 232 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance()) 233 CancelJob(iter.GetCurrentKey()); 234 } 235 236 void JobScheduler::GetAboutResource( 237 const google_apis::AboutResourceCallback& callback) { 238 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 239 DCHECK(!callback.is_null()); 240 241 JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE); 242 new_job->task = base::Bind( 243 &DriveServiceInterface::GetAboutResource, 244 base::Unretained(drive_service_), 245 base::Bind(&JobScheduler::OnGetAboutResourceJobDone, 246 weak_ptr_factory_.GetWeakPtr(), 247 new_job->job_info.job_id, 248 callback)); 249 new_job->abort_callback = CreateErrorRunCallback(callback); 250 StartJob(new_job); 251 } 252 253 void JobScheduler::GetAppList(const google_apis::AppListCallback& callback) { 254 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 255 DCHECK(!callback.is_null()); 256 257 JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST); 258 new_job->task = base::Bind( 259 &DriveServiceInterface::GetAppList, 260 base::Unretained(drive_service_), 261 base::Bind(&JobScheduler::OnGetAppListJobDone, 262 weak_ptr_factory_.GetWeakPtr(), 263 new_job->job_info.job_id, 264 callback)); 265 new_job->abort_callback = CreateErrorRunCallback(callback); 266 StartJob(new_job); 267 } 268 269 void JobScheduler::GetAllFileList( 270 const google_apis::FileListCallback& callback) { 271 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 272 DCHECK(!callback.is_null()); 273 274 JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST); 275 new_job->task = base::Bind( 276 &DriveServiceInterface::GetAllFileList, 277 base::Unretained(drive_service_), 278 base::Bind(&JobScheduler::OnGetFileListJobDone, 279 weak_ptr_factory_.GetWeakPtr(), 280 new_job->job_info.job_id, 281 callback)); 282 new_job->abort_callback = CreateErrorRunCallback(callback); 283 StartJob(new_job); 284 } 285 286 void JobScheduler::GetFileListInDirectory( 287 const std::string& directory_resource_id, 288 const google_apis::FileListCallback& callback) { 289 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 290 DCHECK(!callback.is_null()); 291 292 JobEntry* new_job = CreateNewJob( 293 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY); 294 new_job->task = base::Bind( 295 &DriveServiceInterface::GetFileListInDirectory, 296 base::Unretained(drive_service_), 297 directory_resource_id, 298 base::Bind(&JobScheduler::OnGetFileListJobDone, 299 weak_ptr_factory_.GetWeakPtr(), 300 new_job->job_info.job_id, 301 callback)); 302 new_job->abort_callback = CreateErrorRunCallback(callback); 303 StartJob(new_job); 304 } 305 306 void JobScheduler::Search(const std::string& search_query, 307 const google_apis::FileListCallback& callback) { 308 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 309 DCHECK(!callback.is_null()); 310 311 JobEntry* new_job = CreateNewJob(TYPE_SEARCH); 312 new_job->task = base::Bind( 313 &DriveServiceInterface::Search, 314 base::Unretained(drive_service_), 315 search_query, 316 base::Bind(&JobScheduler::OnGetFileListJobDone, 317 weak_ptr_factory_.GetWeakPtr(), 318 new_job->job_info.job_id, 319 callback)); 320 new_job->abort_callback = CreateErrorRunCallback(callback); 321 StartJob(new_job); 322 } 323 324 void JobScheduler::GetChangeList( 325 int64 start_changestamp, 326 const google_apis::ChangeListCallback& callback) { 327 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 328 DCHECK(!callback.is_null()); 329 330 JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST); 331 new_job->task = base::Bind( 332 &DriveServiceInterface::GetChangeList, 333 base::Unretained(drive_service_), 334 start_changestamp, 335 base::Bind(&JobScheduler::OnGetChangeListJobDone, 336 weak_ptr_factory_.GetWeakPtr(), 337 new_job->job_info.job_id, 338 callback)); 339 new_job->abort_callback = CreateErrorRunCallback(callback); 340 StartJob(new_job); 341 } 342 343 void JobScheduler::GetRemainingChangeList( 344 const GURL& next_link, 345 const google_apis::ChangeListCallback& callback) { 346 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 347 DCHECK(!callback.is_null()); 348 349 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST); 350 new_job->task = base::Bind( 351 &DriveServiceInterface::GetRemainingChangeList, 352 base::Unretained(drive_service_), 353 next_link, 354 base::Bind(&JobScheduler::OnGetChangeListJobDone, 355 weak_ptr_factory_.GetWeakPtr(), 356 new_job->job_info.job_id, 357 callback)); 358 new_job->abort_callback = CreateErrorRunCallback(callback); 359 StartJob(new_job); 360 } 361 362 void JobScheduler::GetRemainingFileList( 363 const GURL& next_link, 364 const google_apis::FileListCallback& callback) { 365 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 366 DCHECK(!callback.is_null()); 367 368 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_FILE_LIST); 369 new_job->task = base::Bind( 370 &DriveServiceInterface::GetRemainingFileList, 371 base::Unretained(drive_service_), 372 next_link, 373 base::Bind(&JobScheduler::OnGetFileListJobDone, 374 weak_ptr_factory_.GetWeakPtr(), 375 new_job->job_info.job_id, 376 callback)); 377 new_job->abort_callback = CreateErrorRunCallback(callback); 378 StartJob(new_job); 379 } 380 381 void JobScheduler::GetFileResource( 382 const std::string& resource_id, 383 const ClientContext& context, 384 const google_apis::FileResourceCallback& callback) { 385 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 386 DCHECK(!callback.is_null()); 387 388 JobEntry* new_job = CreateNewJob(TYPE_GET_RESOURCE_ENTRY); 389 new_job->context = context; 390 new_job->task = base::Bind( 391 &DriveServiceInterface::GetFileResource, 392 base::Unretained(drive_service_), 393 resource_id, 394 base::Bind(&JobScheduler::OnGetFileResourceJobDone, 395 weak_ptr_factory_.GetWeakPtr(), 396 new_job->job_info.job_id, 397 callback)); 398 new_job->abort_callback = CreateErrorRunCallback(callback); 399 StartJob(new_job); 400 } 401 402 void JobScheduler::GetShareUrl( 403 const std::string& resource_id, 404 const GURL& embed_origin, 405 const ClientContext& context, 406 const google_apis::GetShareUrlCallback& callback) { 407 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 408 DCHECK(!callback.is_null()); 409 410 JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL); 411 new_job->context = context; 412 new_job->task = base::Bind( 413 &DriveServiceInterface::GetShareUrl, 414 base::Unretained(drive_service_), 415 resource_id, 416 embed_origin, 417 base::Bind(&JobScheduler::OnGetShareUrlJobDone, 418 weak_ptr_factory_.GetWeakPtr(), 419 new_job->job_info.job_id, 420 callback)); 421 new_job->abort_callback = CreateErrorRunCallback(callback); 422 StartJob(new_job); 423 } 424 425 void JobScheduler::TrashResource( 426 const std::string& resource_id, 427 const ClientContext& context, 428 const google_apis::EntryActionCallback& callback) { 429 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 430 DCHECK(!callback.is_null()); 431 432 JobEntry* new_job = CreateNewJob(TYPE_TRASH_RESOURCE); 433 new_job->context = context; 434 new_job->task = base::Bind( 435 &DriveServiceInterface::TrashResource, 436 base::Unretained(drive_service_), 437 resource_id, 438 base::Bind(&JobScheduler::OnEntryActionJobDone, 439 weak_ptr_factory_.GetWeakPtr(), 440 new_job->job_info.job_id, 441 callback)); 442 new_job->abort_callback = callback; 443 StartJob(new_job); 444 } 445 446 void JobScheduler::CopyResource( 447 const std::string& resource_id, 448 const std::string& parent_resource_id, 449 const std::string& new_title, 450 const base::Time& last_modified, 451 const google_apis::FileResourceCallback& callback) { 452 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 453 DCHECK(!callback.is_null()); 454 455 JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE); 456 new_job->task = base::Bind( 457 &DriveServiceInterface::CopyResource, 458 base::Unretained(drive_service_), 459 resource_id, 460 parent_resource_id, 461 new_title, 462 last_modified, 463 base::Bind(&JobScheduler::OnGetFileResourceJobDone, 464 weak_ptr_factory_.GetWeakPtr(), 465 new_job->job_info.job_id, 466 callback)); 467 new_job->abort_callback = CreateErrorRunCallback(callback); 468 StartJob(new_job); 469 } 470 471 void JobScheduler::UpdateResource( 472 const std::string& resource_id, 473 const std::string& parent_resource_id, 474 const std::string& new_title, 475 const base::Time& last_modified, 476 const base::Time& last_viewed_by_me, 477 const ClientContext& context, 478 const google_apis::FileResourceCallback& callback) { 479 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 480 DCHECK(!callback.is_null()); 481 482 JobEntry* new_job = CreateNewJob(TYPE_UPDATE_RESOURCE); 483 new_job->context = context; 484 new_job->task = base::Bind( 485 &DriveServiceInterface::UpdateResource, 486 base::Unretained(drive_service_), 487 resource_id, 488 parent_resource_id, 489 new_title, 490 last_modified, 491 last_viewed_by_me, 492 base::Bind(&JobScheduler::OnGetFileResourceJobDone, 493 weak_ptr_factory_.GetWeakPtr(), 494 new_job->job_info.job_id, 495 callback)); 496 new_job->abort_callback = CreateErrorRunCallback(callback); 497 StartJob(new_job); 498 } 499 500 void JobScheduler::AddResourceToDirectory( 501 const std::string& parent_resource_id, 502 const std::string& resource_id, 503 const google_apis::EntryActionCallback& callback) { 504 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 505 DCHECK(!callback.is_null()); 506 507 JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY); 508 new_job->task = base::Bind( 509 &DriveServiceInterface::AddResourceToDirectory, 510 base::Unretained(drive_service_), 511 parent_resource_id, 512 resource_id, 513 base::Bind(&JobScheduler::OnEntryActionJobDone, 514 weak_ptr_factory_.GetWeakPtr(), 515 new_job->job_info.job_id, 516 callback)); 517 new_job->abort_callback = callback; 518 StartJob(new_job); 519 } 520 521 void JobScheduler::RemoveResourceFromDirectory( 522 const std::string& parent_resource_id, 523 const std::string& resource_id, 524 const ClientContext& context, 525 const google_apis::EntryActionCallback& callback) { 526 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 527 528 JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY); 529 new_job->context = context; 530 new_job->task = base::Bind( 531 &DriveServiceInterface::RemoveResourceFromDirectory, 532 base::Unretained(drive_service_), 533 parent_resource_id, 534 resource_id, 535 base::Bind(&JobScheduler::OnEntryActionJobDone, 536 weak_ptr_factory_.GetWeakPtr(), 537 new_job->job_info.job_id, 538 callback)); 539 new_job->abort_callback = callback; 540 StartJob(new_job); 541 } 542 543 void JobScheduler::AddNewDirectory( 544 const std::string& parent_resource_id, 545 const std::string& directory_title, 546 const DriveServiceInterface::AddNewDirectoryOptions& options, 547 const ClientContext& context, 548 const google_apis::FileResourceCallback& callback) { 549 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 550 551 JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY); 552 new_job->context = context; 553 new_job->task = base::Bind( 554 &DriveServiceInterface::AddNewDirectory, 555 base::Unretained(drive_service_), 556 parent_resource_id, 557 directory_title, 558 options, 559 base::Bind(&JobScheduler::OnGetFileResourceJobDone, 560 weak_ptr_factory_.GetWeakPtr(), 561 new_job->job_info.job_id, 562 callback)); 563 new_job->abort_callback = CreateErrorRunCallback(callback); 564 StartJob(new_job); 565 } 566 567 JobID JobScheduler::DownloadFile( 568 const base::FilePath& virtual_path, 569 int64 expected_file_size, 570 const base::FilePath& local_cache_path, 571 const std::string& resource_id, 572 const ClientContext& context, 573 const google_apis::DownloadActionCallback& download_action_callback, 574 const google_apis::GetContentCallback& get_content_callback) { 575 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 576 577 JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE); 578 new_job->job_info.file_path = virtual_path; 579 new_job->job_info.num_total_bytes = expected_file_size; 580 new_job->context = context; 581 new_job->task = base::Bind( 582 &DriveServiceInterface::DownloadFile, 583 base::Unretained(drive_service_), 584 local_cache_path, 585 resource_id, 586 base::Bind(&JobScheduler::OnDownloadActionJobDone, 587 weak_ptr_factory_.GetWeakPtr(), 588 new_job->job_info.job_id, 589 download_action_callback), 590 get_content_callback, 591 base::Bind(&JobScheduler::UpdateProgress, 592 weak_ptr_factory_.GetWeakPtr(), 593 new_job->job_info.job_id)); 594 new_job->abort_callback = CreateErrorRunCallback(download_action_callback); 595 StartJob(new_job); 596 return new_job->job_info.job_id; 597 } 598 599 void JobScheduler::UploadNewFile( 600 const std::string& parent_resource_id, 601 const base::FilePath& drive_file_path, 602 const base::FilePath& local_file_path, 603 const std::string& title, 604 const std::string& content_type, 605 const DriveUploader::UploadNewFileOptions& options, 606 const ClientContext& context, 607 const google_apis::FileResourceCallback& callback) { 608 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 609 610 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE); 611 new_job->job_info.file_path = drive_file_path; 612 new_job->context = context; 613 614 UploadNewFileParams params; 615 params.parent_resource_id = parent_resource_id; 616 params.local_file_path = local_file_path; 617 params.title = title; 618 params.content_type = content_type; 619 params.options = options; 620 621 ResumeUploadParams resume_params; 622 resume_params.local_file_path = params.local_file_path; 623 resume_params.content_type = params.content_type; 624 625 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone, 626 weak_ptr_factory_.GetWeakPtr(), 627 new_job->job_info.job_id, 628 resume_params, 629 callback); 630 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress, 631 weak_ptr_factory_.GetWeakPtr(), 632 new_job->job_info.job_id); 633 new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params); 634 new_job->abort_callback = CreateErrorRunCallback(callback); 635 StartJob(new_job); 636 } 637 638 void JobScheduler::UploadExistingFile( 639 const std::string& resource_id, 640 const base::FilePath& drive_file_path, 641 const base::FilePath& local_file_path, 642 const std::string& content_type, 643 const DriveUploader::UploadExistingFileOptions& options, 644 const ClientContext& context, 645 const google_apis::FileResourceCallback& callback) { 646 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 647 648 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE); 649 new_job->job_info.file_path = drive_file_path; 650 new_job->context = context; 651 652 UploadExistingFileParams params; 653 params.resource_id = resource_id; 654 params.local_file_path = local_file_path; 655 params.content_type = content_type; 656 params.options = options; 657 658 ResumeUploadParams resume_params; 659 resume_params.local_file_path = params.local_file_path; 660 resume_params.content_type = params.content_type; 661 662 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone, 663 weak_ptr_factory_.GetWeakPtr(), 664 new_job->job_info.job_id, 665 resume_params, 666 callback); 667 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress, 668 weak_ptr_factory_.GetWeakPtr(), 669 new_job->job_info.job_id); 670 new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params); 671 new_job->abort_callback = CreateErrorRunCallback(callback); 672 StartJob(new_job); 673 } 674 675 void JobScheduler::AddPermission( 676 const std::string& resource_id, 677 const std::string& email, 678 google_apis::drive::PermissionRole role, 679 const google_apis::EntryActionCallback& callback) { 680 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 681 DCHECK(!callback.is_null()); 682 683 JobEntry* new_job = CreateNewJob(TYPE_ADD_PERMISSION); 684 new_job->task = base::Bind(&DriveServiceInterface::AddPermission, 685 base::Unretained(drive_service_), 686 resource_id, 687 email, 688 role, 689 base::Bind(&JobScheduler::OnEntryActionJobDone, 690 weak_ptr_factory_.GetWeakPtr(), 691 new_job->job_info.job_id, 692 callback)); 693 new_job->abort_callback = callback; 694 StartJob(new_job); 695 } 696 697 JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) { 698 JobEntry* job = new JobEntry(type); 699 job->job_info.job_id = job_map_.Add(job); // Takes the ownership of |job|. 700 return job; 701 } 702 703 void JobScheduler::StartJob(JobEntry* job) { 704 DCHECK(!job->task.is_null()); 705 706 QueueJob(job->job_info.job_id); 707 NotifyJobAdded(job->job_info); 708 DoJobLoop(GetJobQueueType(job->job_info.job_type)); 709 } 710 711 void JobScheduler::QueueJob(JobID job_id) { 712 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 713 714 JobEntry* job_entry = job_map_.Lookup(job_id); 715 DCHECK(job_entry); 716 const JobInfo& job_info = job_entry->job_info; 717 718 const QueueType queue_type = GetJobQueueType(job_info.job_type); 719 queue_[queue_type]->Push(job_id, job_entry->context.type); 720 721 const std::string retry_prefix = job_entry->retry_count > 0 ? 722 base::StringPrintf(" (retry %d)", job_entry->retry_count) : ""; 723 logger_->Log(logging::LOG_INFO, 724 "Job queued%s: %s - %s", 725 retry_prefix.c_str(), 726 job_info.ToString().c_str(), 727 GetQueueInfo(queue_type).c_str()); 728 } 729 730 void JobScheduler::DoJobLoop(QueueType queue_type) { 731 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 732 733 const int accepted_priority = GetCurrentAcceptedPriority(queue_type); 734 735 // Abort all USER_INITAITED jobs when not accepted. 736 if (accepted_priority < USER_INITIATED) { 737 std::vector<JobID> jobs; 738 queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs); 739 for (size_t i = 0; i < jobs.size(); ++i) { 740 JobEntry* job = job_map_.Lookup(jobs[i]); 741 DCHECK(job); 742 AbortNotRunningJob(job, google_apis::GDATA_NO_CONNECTION); 743 } 744 } 745 746 // Wait when throttled. 747 const base::Time now = base::Time::Now(); 748 if (now < wait_until_) { 749 base::MessageLoopProxy::current()->PostDelayedTask( 750 FROM_HERE, 751 base::Bind(&JobScheduler::DoJobLoop, 752 weak_ptr_factory_.GetWeakPtr(), 753 queue_type), 754 wait_until_ - now); 755 return; 756 } 757 758 // Run the job with the highest priority in the queue. 759 JobID job_id = -1; 760 if (!queue_[queue_type]->PopForRun(accepted_priority, &job_id)) 761 return; 762 763 JobEntry* entry = job_map_.Lookup(job_id); 764 DCHECK(entry); 765 766 JobInfo* job_info = &entry->job_info; 767 job_info->state = STATE_RUNNING; 768 job_info->start_time = now; 769 NotifyJobUpdated(*job_info); 770 771 entry->cancel_callback = entry->task.Run(); 772 773 UpdateWait(); 774 775 logger_->Log(logging::LOG_INFO, 776 "Job started: %s - %s", 777 job_info->ToString().c_str(), 778 GetQueueInfo(queue_type).c_str()); 779 } 780 781 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) { 782 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 783 784 const int kNoJobShouldRun = -1; 785 786 // Should stop if Drive was disabled while running the fetch loop. 787 if (pref_service_->GetBoolean(prefs::kDisableDrive)) 788 return kNoJobShouldRun; 789 790 // Should stop if the network is not online. 791 if (net::NetworkChangeNotifier::IsOffline()) 792 return kNoJobShouldRun; 793 794 // For the file queue, if it is on cellular network, only user initiated 795 // operations are allowed to start. 796 if (queue_type == FILE_QUEUE && 797 pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) && 798 net::NetworkChangeNotifier::IsConnectionCellular( 799 net::NetworkChangeNotifier::GetConnectionType())) 800 return USER_INITIATED; 801 802 // Otherwise, every operations including background tasks are allowed. 803 return BACKGROUND; 804 } 805 806 void JobScheduler::UpdateWait() { 807 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 808 809 if (disable_throttling_ || throttle_count_ == 0) 810 return; 811 812 // Exponential backoff: https://developers.google.com/drive/handle-errors. 813 base::TimeDelta delay = 814 base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) + 815 base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000)); 816 VLOG(1) << "Throttling for " << delay.InMillisecondsF(); 817 818 wait_until_ = std::max(wait_until_, base::Time::Now() + delay); 819 } 820 821 bool JobScheduler::OnJobDone(JobID job_id, google_apis::GDataErrorCode error) { 822 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 823 824 JobEntry* job_entry = job_map_.Lookup(job_id); 825 DCHECK(job_entry); 826 JobInfo* job_info = &job_entry->job_info; 827 QueueType queue_type = GetJobQueueType(job_info->job_type); 828 queue_[queue_type]->MarkFinished(job_id); 829 830 const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time; 831 bool success = (GDataToFileError(error) == FILE_ERROR_OK); 832 logger_->Log(success ? logging::LOG_INFO : logging::LOG_WARNING, 833 "Job done: %s => %s (elapsed time: %sms) - %s", 834 job_info->ToString().c_str(), 835 GDataErrorCodeToString(error).c_str(), 836 base::Int64ToString(elapsed.InMilliseconds()).c_str(), 837 GetQueueInfo(queue_type).c_str()); 838 839 // Retry, depending on the error. 840 const bool is_server_error = 841 error == google_apis::HTTP_SERVICE_UNAVAILABLE || 842 error == google_apis::HTTP_INTERNAL_SERVER_ERROR; 843 if (is_server_error) { 844 if (throttle_count_ < kMaxThrottleCount) 845 ++throttle_count_; 846 UpdateWait(); 847 } else { 848 throttle_count_ = 0; 849 } 850 851 const bool should_retry = 852 is_server_error && job_entry->retry_count < kMaxRetryCount; 853 if (should_retry) { 854 job_entry->cancel_callback.Reset(); 855 job_info->state = STATE_RETRY; 856 NotifyJobUpdated(*job_info); 857 858 ++job_entry->retry_count; 859 860 // Requeue the job. 861 QueueJob(job_id); 862 } else { 863 NotifyJobDone(*job_info, error); 864 // The job has finished, no retry will happen in the scheduler. Now we can 865 // remove the job info from the map. 866 job_map_.Remove(job_id); 867 } 868 869 // Post a task to continue the job loop. This allows us to finish handling 870 // the current job before starting the next one. 871 base::MessageLoopProxy::current()->PostTask(FROM_HERE, 872 base::Bind(&JobScheduler::DoJobLoop, 873 weak_ptr_factory_.GetWeakPtr(), 874 queue_type)); 875 return !should_retry; 876 } 877 878 void JobScheduler::OnGetFileListJobDone( 879 JobID job_id, 880 const google_apis::FileListCallback& callback, 881 google_apis::GDataErrorCode error, 882 scoped_ptr<google_apis::FileList> file_list) { 883 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 884 DCHECK(!callback.is_null()); 885 886 if (OnJobDone(job_id, error)) 887 callback.Run(error, file_list.Pass()); 888 } 889 890 void JobScheduler::OnGetChangeListJobDone( 891 JobID job_id, 892 const google_apis::ChangeListCallback& callback, 893 google_apis::GDataErrorCode error, 894 scoped_ptr<google_apis::ChangeList> change_list) { 895 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 896 DCHECK(!callback.is_null()); 897 898 if (OnJobDone(job_id, error)) 899 callback.Run(error, change_list.Pass()); 900 } 901 902 void JobScheduler::OnGetFileResourceJobDone( 903 JobID job_id, 904 const google_apis::FileResourceCallback& callback, 905 google_apis::GDataErrorCode error, 906 scoped_ptr<google_apis::FileResource> entry) { 907 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 908 DCHECK(!callback.is_null()); 909 910 if (OnJobDone(job_id, error)) 911 callback.Run(error, entry.Pass()); 912 } 913 914 void JobScheduler::OnGetAboutResourceJobDone( 915 JobID job_id, 916 const google_apis::AboutResourceCallback& callback, 917 google_apis::GDataErrorCode error, 918 scoped_ptr<google_apis::AboutResource> about_resource) { 919 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 920 DCHECK(!callback.is_null()); 921 922 if (OnJobDone(job_id, error)) 923 callback.Run(error, about_resource.Pass()); 924 } 925 926 void JobScheduler::OnGetShareUrlJobDone( 927 JobID job_id, 928 const google_apis::GetShareUrlCallback& callback, 929 google_apis::GDataErrorCode error, 930 const GURL& share_url) { 931 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 932 DCHECK(!callback.is_null()); 933 934 if (OnJobDone(job_id, error)) 935 callback.Run(error, share_url); 936 } 937 938 void JobScheduler::OnGetAppListJobDone( 939 JobID job_id, 940 const google_apis::AppListCallback& callback, 941 google_apis::GDataErrorCode error, 942 scoped_ptr<google_apis::AppList> app_list) { 943 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 944 DCHECK(!callback.is_null()); 945 946 if (OnJobDone(job_id, error)) 947 callback.Run(error, app_list.Pass()); 948 } 949 950 void JobScheduler::OnEntryActionJobDone( 951 JobID job_id, 952 const google_apis::EntryActionCallback& callback, 953 google_apis::GDataErrorCode error) { 954 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 955 DCHECK(!callback.is_null()); 956 957 if (OnJobDone(job_id, error)) 958 callback.Run(error); 959 } 960 961 void JobScheduler::OnDownloadActionJobDone( 962 JobID job_id, 963 const google_apis::DownloadActionCallback& callback, 964 google_apis::GDataErrorCode error, 965 const base::FilePath& temp_file) { 966 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 967 DCHECK(!callback.is_null()); 968 969 if (OnJobDone(job_id, error)) 970 callback.Run(error, temp_file); 971 } 972 973 void JobScheduler::OnUploadCompletionJobDone( 974 JobID job_id, 975 const ResumeUploadParams& resume_params, 976 const google_apis::FileResourceCallback& callback, 977 google_apis::GDataErrorCode error, 978 const GURL& upload_location, 979 scoped_ptr<google_apis::FileResource> entry) { 980 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 981 DCHECK(!callback.is_null()); 982 983 if (!upload_location.is_empty()) { 984 // If upload_location is available, update the task to resume the 985 // upload process from the terminated point. 986 // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE 987 // so OnJobDone called below will be in charge to re-queue the job. 988 JobEntry* job_entry = job_map_.Lookup(job_id); 989 DCHECK(job_entry); 990 991 ResumeUploadFileParams params; 992 params.upload_location = upload_location; 993 params.local_file_path = resume_params.local_file_path; 994 params.content_type = resume_params.content_type; 995 params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone, 996 weak_ptr_factory_.GetWeakPtr(), 997 job_id, 998 job_entry->task, 999 callback); 1000 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress, 1001 weak_ptr_factory_.GetWeakPtr(), 1002 job_id); 1003 job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params); 1004 } 1005 1006 if (OnJobDone(job_id, error)) 1007 callback.Run(error, entry.Pass()); 1008 } 1009 1010 void JobScheduler::OnResumeUploadFileDone( 1011 JobID job_id, 1012 const base::Callback<google_apis::CancelCallback()>& original_task, 1013 const google_apis::FileResourceCallback& callback, 1014 google_apis::GDataErrorCode error, 1015 const GURL& upload_location, 1016 scoped_ptr<google_apis::FileResource> entry) { 1017 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 1018 DCHECK(!original_task.is_null()); 1019 DCHECK(!callback.is_null()); 1020 1021 if (upload_location.is_empty()) { 1022 // If upload_location is not available, we should discard it and stop trying 1023 // to resume. Restore the original task. 1024 JobEntry* job_entry = job_map_.Lookup(job_id); 1025 DCHECK(job_entry); 1026 job_entry->task = original_task; 1027 } 1028 1029 if (OnJobDone(job_id, error)) 1030 callback.Run(error, entry.Pass()); 1031 } 1032 1033 void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) { 1034 JobEntry* job_entry = job_map_.Lookup(job_id); 1035 DCHECK(job_entry); 1036 1037 job_entry->job_info.num_completed_bytes = progress; 1038 if (total != -1) 1039 job_entry->job_info.num_total_bytes = total; 1040 NotifyJobUpdated(job_entry->job_info); 1041 } 1042 1043 void JobScheduler::OnConnectionTypeChanged( 1044 net::NetworkChangeNotifier::ConnectionType type) { 1045 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 1046 1047 // Resume the job loop. 1048 // Note that we don't need to check the network connection status as it will 1049 // be checked in GetCurrentAcceptedPriority(). 1050 for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i) 1051 DoJobLoop(static_cast<QueueType>(i)); 1052 } 1053 1054 JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) { 1055 switch (type) { 1056 case TYPE_GET_ABOUT_RESOURCE: 1057 case TYPE_GET_APP_LIST: 1058 case TYPE_GET_ALL_RESOURCE_LIST: 1059 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY: 1060 case TYPE_SEARCH: 1061 case TYPE_GET_CHANGE_LIST: 1062 case TYPE_GET_REMAINING_CHANGE_LIST: 1063 case TYPE_GET_REMAINING_FILE_LIST: 1064 case TYPE_GET_RESOURCE_ENTRY: 1065 case TYPE_GET_SHARE_URL: 1066 case TYPE_TRASH_RESOURCE: 1067 case TYPE_COPY_RESOURCE: 1068 case TYPE_UPDATE_RESOURCE: 1069 case TYPE_ADD_RESOURCE_TO_DIRECTORY: 1070 case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY: 1071 case TYPE_ADD_NEW_DIRECTORY: 1072 case TYPE_CREATE_FILE: 1073 case TYPE_ADD_PERMISSION: 1074 return METADATA_QUEUE; 1075 1076 case TYPE_DOWNLOAD_FILE: 1077 case TYPE_UPLOAD_NEW_FILE: 1078 case TYPE_UPLOAD_EXISTING_FILE: 1079 return FILE_QUEUE; 1080 } 1081 NOTREACHED(); 1082 return FILE_QUEUE; 1083 } 1084 1085 void JobScheduler::AbortNotRunningJob(JobEntry* job, 1086 google_apis::GDataErrorCode error) { 1087 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 1088 1089 const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time; 1090 const QueueType queue_type = GetJobQueueType(job->job_info.job_type); 1091 logger_->Log(logging::LOG_INFO, 1092 "Job aborted: %s => %s (elapsed time: %sms) - %s", 1093 job->job_info.ToString().c_str(), 1094 GDataErrorCodeToString(error).c_str(), 1095 base::Int64ToString(elapsed.InMilliseconds()).c_str(), 1096 GetQueueInfo(queue_type).c_str()); 1097 1098 base::Callback<void(google_apis::GDataErrorCode)> callback = 1099 job->abort_callback; 1100 queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id); 1101 NotifyJobDone(job->job_info, error); 1102 job_map_.Remove(job->job_info.job_id); 1103 base::MessageLoopProxy::current()->PostTask(FROM_HERE, 1104 base::Bind(callback, error)); 1105 } 1106 1107 void JobScheduler::NotifyJobAdded(const JobInfo& job_info) { 1108 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 1109 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info)); 1110 } 1111 1112 void JobScheduler::NotifyJobDone(const JobInfo& job_info, 1113 google_apis::GDataErrorCode error) { 1114 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 1115 FOR_EACH_OBSERVER(JobListObserver, observer_list_, 1116 OnJobDone(job_info, GDataToFileError(error))); 1117 } 1118 1119 void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) { 1120 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 1121 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info)); 1122 } 1123 1124 std::string JobScheduler::GetQueueInfo(QueueType type) const { 1125 return QueueTypeToString(type) + " " + queue_[type]->ToString(); 1126 } 1127 1128 // static 1129 std::string JobScheduler::QueueTypeToString(QueueType type) { 1130 switch (type) { 1131 case METADATA_QUEUE: 1132 return "METADATA_QUEUE"; 1133 case FILE_QUEUE: 1134 return "FILE_QUEUE"; 1135 case NUM_QUEUES: 1136 break; // This value is just a sentinel. Should never be used. 1137 } 1138 NOTREACHED(); 1139 return ""; 1140 } 1141 1142 } // namespace drive 1143