1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "chrome/browser/chromeos/drive/job_scheduler.h" 6 7 #include "base/message_loop/message_loop.h" 8 #include "base/prefs/pref_service.h" 9 #include "base/rand_util.h" 10 #include "base/strings/string_number_conversions.h" 11 #include "base/strings/stringprintf.h" 12 #include "chrome/browser/chromeos/drive/file_system_util.h" 13 #include "chrome/browser/chromeos/drive/logging.h" 14 #include "chrome/browser/google_apis/drive_api_parser.h" 15 #include "chrome/browser/google_apis/task_util.h" 16 #include "chrome/common/pref_names.h" 17 #include "content/public/browser/browser_thread.h" 18 19 using content::BrowserThread; 20 21 namespace drive { 22 23 namespace { 24 25 const int kMaxThrottleCount = 4; 26 27 // According to the API documentation, this should be the same as 28 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors). 29 // But currently multiplied by 2 to ensure upload related jobs retried for a 30 // sufficient number of times. crbug.com/269918 31 const int kMaxRetryCount = 2*kMaxThrottleCount; 32 33 // Parameter struct for RunUploadNewFile. 34 struct UploadNewFileParams { 35 std::string parent_resource_id; 36 base::FilePath local_file_path; 37 std::string title; 38 std::string content_type; 39 UploadCompletionCallback callback; 40 google_apis::ProgressCallback progress_callback; 41 }; 42 43 // Helper function to work around the arity limitation of base::Bind. 44 google_apis::CancelCallback RunUploadNewFile( 45 DriveUploaderInterface* uploader, 46 const UploadNewFileParams& params) { 47 return uploader->UploadNewFile(params.parent_resource_id, 48 params.local_file_path, 49 params.title, 50 params.content_type, 51 params.callback, 52 params.progress_callback); 53 } 54 55 // Parameter struct for RunUploadExistingFile. 56 struct UploadExistingFileParams { 57 std::string resource_id; 58 base::FilePath local_file_path; 59 std::string content_type; 60 std::string etag; 61 UploadCompletionCallback callback; 62 google_apis::ProgressCallback progress_callback; 63 }; 64 65 // Helper function to work around the arity limitation of base::Bind. 66 google_apis::CancelCallback RunUploadExistingFile( 67 DriveUploaderInterface* uploader, 68 const UploadExistingFileParams& params) { 69 return uploader->UploadExistingFile(params.resource_id, 70 params.local_file_path, 71 params.content_type, 72 params.etag, 73 params.callback, 74 params.progress_callback); 75 } 76 77 // Parameter struct for RunResumeUploadFile. 78 struct ResumeUploadFileParams { 79 GURL upload_location; 80 base::FilePath local_file_path; 81 std::string content_type; 82 UploadCompletionCallback callback; 83 google_apis::ProgressCallback progress_callback; 84 }; 85 86 // Helper function to adjust the return type. 87 google_apis::CancelCallback RunResumeUploadFile( 88 DriveUploaderInterface* uploader, 89 const ResumeUploadFileParams& params) { 90 return uploader->ResumeUploadFile(params.upload_location, 91 params.local_file_path, 92 params.content_type, 93 params.callback, 94 params.progress_callback); 95 } 96 97 } // namespace 98 99 const int JobScheduler::kMaxJobCount[] = { 100 5, // METADATA_QUEUE 101 1, // FILE_QUEUE 102 }; 103 104 JobScheduler::JobEntry::JobEntry(JobType type) 105 : job_info(type), 106 context(ClientContext(USER_INITIATED)), 107 retry_count(0) { 108 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 109 } 110 111 JobScheduler::JobEntry::~JobEntry() { 112 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 113 } 114 115 struct JobScheduler::ResumeUploadParams { 116 base::FilePath drive_file_path; 117 base::FilePath local_file_path; 118 std::string content_type; 119 }; 120 121 JobScheduler::JobScheduler( 122 PrefService* pref_service, 123 DriveServiceInterface* drive_service, 124 base::SequencedTaskRunner* blocking_task_runner) 125 : throttle_count_(0), 126 wait_until_(base::Time::Now()), 127 disable_throttling_(false), 128 drive_service_(drive_service), 129 uploader_(new DriveUploader(drive_service, blocking_task_runner)), 130 pref_service_(pref_service), 131 weak_ptr_factory_(this) { 132 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 133 134 for (int i = 0; i < NUM_QUEUES; ++i) 135 queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES)); 136 137 net::NetworkChangeNotifier::AddConnectionTypeObserver(this); 138 } 139 140 JobScheduler::~JobScheduler() { 141 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 142 143 size_t num_queued_jobs = 0; 144 for (int i = 0; i < NUM_QUEUES; ++i) 145 num_queued_jobs += queue_[i]->GetNumberOfJobs(); 146 DCHECK_EQ(num_queued_jobs, job_map_.size()); 147 148 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this); 149 } 150 151 std::vector<JobInfo> JobScheduler::GetJobInfoList() { 152 std::vector<JobInfo> job_info_list; 153 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance()) 154 job_info_list.push_back(iter.GetCurrentValue()->job_info); 155 return job_info_list; 156 } 157 158 void JobScheduler::AddObserver(JobListObserver* observer) { 159 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 160 observer_list_.AddObserver(observer); 161 } 162 163 void JobScheduler::RemoveObserver(JobListObserver* observer) { 164 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 165 observer_list_.RemoveObserver(observer); 166 } 167 168 void JobScheduler::CancelJob(JobID job_id) { 169 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 170 171 JobEntry* job = job_map_.Lookup(job_id); 172 if (job) { 173 if (job->job_info.state == STATE_RUNNING) { 174 // If the job is running an HTTP request, cancel it via |cancel_callback| 175 // returned from the request, and wait for termination in the normal 176 // callback handler, OnJobDone. 177 if (!job->cancel_callback.is_null()) 178 job->cancel_callback.Run(); 179 } else { 180 AbortNotRunningJob(job, google_apis::GDATA_CANCELLED); 181 } 182 } 183 } 184 185 void JobScheduler::CancelAllJobs() { 186 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 187 188 // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports 189 // removable during iteration. 190 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance()) 191 CancelJob(iter.GetCurrentKey()); 192 } 193 194 void JobScheduler::GetAboutResource( 195 const google_apis::GetAboutResourceCallback& callback) { 196 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 197 DCHECK(!callback.is_null()); 198 199 JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE); 200 new_job->task = base::Bind( 201 &DriveServiceInterface::GetAboutResource, 202 base::Unretained(drive_service_), 203 base::Bind(&JobScheduler::OnGetAboutResourceJobDone, 204 weak_ptr_factory_.GetWeakPtr(), 205 new_job->job_info.job_id, 206 callback)); 207 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback); 208 StartJob(new_job); 209 } 210 211 void JobScheduler::GetAppList( 212 const google_apis::GetAppListCallback& callback) { 213 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 214 DCHECK(!callback.is_null()); 215 216 JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST); 217 new_job->task = base::Bind( 218 &DriveServiceInterface::GetAppList, 219 base::Unretained(drive_service_), 220 base::Bind(&JobScheduler::OnGetAppListJobDone, 221 weak_ptr_factory_.GetWeakPtr(), 222 new_job->job_info.job_id, 223 callback)); 224 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback); 225 StartJob(new_job); 226 } 227 228 void JobScheduler::GetAllResourceList( 229 const google_apis::GetResourceListCallback& callback) { 230 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 231 DCHECK(!callback.is_null()); 232 233 JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST); 234 new_job->task = base::Bind( 235 &DriveServiceInterface::GetAllResourceList, 236 base::Unretained(drive_service_), 237 base::Bind(&JobScheduler::OnGetResourceListJobDone, 238 weak_ptr_factory_.GetWeakPtr(), 239 new_job->job_info.job_id, 240 callback)); 241 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback); 242 StartJob(new_job); 243 } 244 245 void JobScheduler::GetResourceListInDirectory( 246 const std::string& directory_resource_id, 247 const google_apis::GetResourceListCallback& callback) { 248 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 249 DCHECK(!callback.is_null()); 250 251 JobEntry* new_job = CreateNewJob( 252 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY); 253 new_job->task = base::Bind( 254 &DriveServiceInterface::GetResourceListInDirectory, 255 base::Unretained(drive_service_), 256 directory_resource_id, 257 base::Bind(&JobScheduler::OnGetResourceListJobDone, 258 weak_ptr_factory_.GetWeakPtr(), 259 new_job->job_info.job_id, 260 callback)); 261 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback); 262 StartJob(new_job); 263 } 264 265 void JobScheduler::Search( 266 const std::string& search_query, 267 const google_apis::GetResourceListCallback& callback) { 268 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 269 DCHECK(!callback.is_null()); 270 271 JobEntry* new_job = CreateNewJob(TYPE_SEARCH); 272 new_job->task = base::Bind( 273 &DriveServiceInterface::Search, 274 base::Unretained(drive_service_), 275 search_query, 276 base::Bind(&JobScheduler::OnGetResourceListJobDone, 277 weak_ptr_factory_.GetWeakPtr(), 278 new_job->job_info.job_id, 279 callback)); 280 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback); 281 StartJob(new_job); 282 } 283 284 void JobScheduler::GetChangeList( 285 int64 start_changestamp, 286 const google_apis::GetResourceListCallback& callback) { 287 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 288 DCHECK(!callback.is_null()); 289 290 JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST); 291 new_job->task = base::Bind( 292 &DriveServiceInterface::GetChangeList, 293 base::Unretained(drive_service_), 294 start_changestamp, 295 base::Bind(&JobScheduler::OnGetResourceListJobDone, 296 weak_ptr_factory_.GetWeakPtr(), 297 new_job->job_info.job_id, 298 callback)); 299 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback); 300 StartJob(new_job); 301 } 302 303 void JobScheduler::ContinueGetResourceList( 304 const GURL& next_url, 305 const google_apis::GetResourceListCallback& callback) { 306 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 307 DCHECK(!callback.is_null()); 308 309 JobEntry* new_job = CreateNewJob(TYPE_CONTINUE_GET_RESOURCE_LIST); 310 new_job->task = base::Bind( 311 &DriveServiceInterface::ContinueGetResourceList, 312 base::Unretained(drive_service_), 313 next_url, 314 base::Bind(&JobScheduler::OnGetResourceListJobDone, 315 weak_ptr_factory_.GetWeakPtr(), 316 new_job->job_info.job_id, 317 callback)); 318 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback); 319 StartJob(new_job); 320 } 321 322 void JobScheduler::GetResourceEntry( 323 const std::string& resource_id, 324 const ClientContext& context, 325 const google_apis::GetResourceEntryCallback& callback) { 326 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 327 DCHECK(!callback.is_null()); 328 329 JobEntry* new_job = CreateNewJob(TYPE_GET_RESOURCE_ENTRY); 330 new_job->context = context; 331 new_job->task = base::Bind( 332 &DriveServiceInterface::GetResourceEntry, 333 base::Unretained(drive_service_), 334 resource_id, 335 base::Bind(&JobScheduler::OnGetResourceEntryJobDone, 336 weak_ptr_factory_.GetWeakPtr(), 337 new_job->job_info.job_id, 338 callback)); 339 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback); 340 StartJob(new_job); 341 } 342 343 void JobScheduler::GetShareUrl( 344 const std::string& resource_id, 345 const GURL& embed_origin, 346 const ClientContext& context, 347 const google_apis::GetShareUrlCallback& callback) { 348 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 349 DCHECK(!callback.is_null()); 350 351 JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL); 352 new_job->context = context; 353 new_job->task = base::Bind( 354 &DriveServiceInterface::GetShareUrl, 355 base::Unretained(drive_service_), 356 resource_id, 357 embed_origin, 358 base::Bind(&JobScheduler::OnGetShareUrlJobDone, 359 weak_ptr_factory_.GetWeakPtr(), 360 new_job->job_info.job_id, 361 callback)); 362 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback); 363 StartJob(new_job); 364 } 365 366 void JobScheduler::DeleteResource( 367 const std::string& resource_id, 368 const google_apis::EntryActionCallback& callback) { 369 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 370 DCHECK(!callback.is_null()); 371 372 JobEntry* new_job = CreateNewJob(TYPE_DELETE_RESOURCE); 373 new_job->task = base::Bind( 374 &DriveServiceInterface::DeleteResource, 375 base::Unretained(drive_service_), 376 resource_id, 377 "", // etag 378 base::Bind(&JobScheduler::OnEntryActionJobDone, 379 weak_ptr_factory_.GetWeakPtr(), 380 new_job->job_info.job_id, 381 callback)); 382 new_job->abort_callback = callback; 383 StartJob(new_job); 384 } 385 386 void JobScheduler::CopyResource( 387 const std::string& resource_id, 388 const std::string& parent_resource_id, 389 const std::string& new_title, 390 const google_apis::GetResourceEntryCallback& callback) { 391 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 392 DCHECK(!callback.is_null()); 393 394 JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE); 395 new_job->task = base::Bind( 396 &DriveServiceInterface::CopyResource, 397 base::Unretained(drive_service_), 398 resource_id, 399 parent_resource_id, 400 new_title, 401 base::Bind(&JobScheduler::OnGetResourceEntryJobDone, 402 weak_ptr_factory_.GetWeakPtr(), 403 new_job->job_info.job_id, 404 callback)); 405 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback); 406 StartJob(new_job); 407 } 408 409 void JobScheduler::CopyHostedDocument( 410 const std::string& resource_id, 411 const std::string& new_title, 412 const google_apis::GetResourceEntryCallback& callback) { 413 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 414 DCHECK(!callback.is_null()); 415 416 JobEntry* new_job = CreateNewJob(TYPE_COPY_HOSTED_DOCUMENT); 417 new_job->task = base::Bind( 418 &DriveServiceInterface::CopyHostedDocument, 419 base::Unretained(drive_service_), 420 resource_id, 421 new_title, 422 base::Bind(&JobScheduler::OnGetResourceEntryJobDone, 423 weak_ptr_factory_.GetWeakPtr(), 424 new_job->job_info.job_id, 425 callback)); 426 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback); 427 StartJob(new_job); 428 } 429 430 void JobScheduler::RenameResource( 431 const std::string& resource_id, 432 const std::string& new_title, 433 const google_apis::EntryActionCallback& callback) { 434 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 435 DCHECK(!callback.is_null()); 436 437 JobEntry* new_job = CreateNewJob(TYPE_RENAME_RESOURCE); 438 new_job->task = base::Bind( 439 &DriveServiceInterface::RenameResource, 440 base::Unretained(drive_service_), 441 resource_id, 442 new_title, 443 base::Bind(&JobScheduler::OnEntryActionJobDone, 444 weak_ptr_factory_.GetWeakPtr(), 445 new_job->job_info.job_id, 446 callback)); 447 new_job->abort_callback = callback; 448 StartJob(new_job); 449 } 450 451 void JobScheduler::TouchResource( 452 const std::string& resource_id, 453 const base::Time& modified_date, 454 const base::Time& last_viewed_by_me_date, 455 const google_apis::GetResourceEntryCallback& callback) { 456 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 457 DCHECK(!callback.is_null()); 458 459 JobEntry* new_job = CreateNewJob(TYPE_TOUCH_RESOURCE); 460 new_job->task = base::Bind( 461 &DriveServiceInterface::TouchResource, 462 base::Unretained(drive_service_), 463 resource_id, 464 modified_date, 465 last_viewed_by_me_date, 466 base::Bind(&JobScheduler::OnGetResourceEntryJobDone, 467 weak_ptr_factory_.GetWeakPtr(), 468 new_job->job_info.job_id, 469 callback)); 470 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback); 471 StartJob(new_job); 472 } 473 474 void JobScheduler::AddResourceToDirectory( 475 const std::string& parent_resource_id, 476 const std::string& resource_id, 477 const google_apis::EntryActionCallback& callback) { 478 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 479 DCHECK(!callback.is_null()); 480 481 JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY); 482 new_job->task = base::Bind( 483 &DriveServiceInterface::AddResourceToDirectory, 484 base::Unretained(drive_service_), 485 parent_resource_id, 486 resource_id, 487 base::Bind(&JobScheduler::OnEntryActionJobDone, 488 weak_ptr_factory_.GetWeakPtr(), 489 new_job->job_info.job_id, 490 callback)); 491 new_job->abort_callback = callback; 492 StartJob(new_job); 493 } 494 495 void JobScheduler::RemoveResourceFromDirectory( 496 const std::string& parent_resource_id, 497 const std::string& resource_id, 498 const google_apis::EntryActionCallback& callback) { 499 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 500 501 JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY); 502 new_job->task = base::Bind( 503 &DriveServiceInterface::RemoveResourceFromDirectory, 504 base::Unretained(drive_service_), 505 parent_resource_id, 506 resource_id, 507 base::Bind(&JobScheduler::OnEntryActionJobDone, 508 weak_ptr_factory_.GetWeakPtr(), 509 new_job->job_info.job_id, 510 callback)); 511 new_job->abort_callback = callback; 512 StartJob(new_job); 513 } 514 515 void JobScheduler::AddNewDirectory( 516 const std::string& parent_resource_id, 517 const std::string& directory_title, 518 const google_apis::GetResourceEntryCallback& callback) { 519 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 520 521 JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY); 522 new_job->task = base::Bind( 523 &DriveServiceInterface::AddNewDirectory, 524 base::Unretained(drive_service_), 525 parent_resource_id, 526 directory_title, 527 base::Bind(&JobScheduler::OnGetResourceEntryJobDone, 528 weak_ptr_factory_.GetWeakPtr(), 529 new_job->job_info.job_id, 530 callback)); 531 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback); 532 StartJob(new_job); 533 } 534 535 JobID JobScheduler::DownloadFile( 536 const base::FilePath& virtual_path, 537 const base::FilePath& local_cache_path, 538 const std::string& resource_id, 539 const ClientContext& context, 540 const google_apis::DownloadActionCallback& download_action_callback, 541 const google_apis::GetContentCallback& get_content_callback) { 542 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 543 544 JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE); 545 new_job->job_info.file_path = virtual_path; 546 new_job->context = context; 547 new_job->task = base::Bind( 548 &DriveServiceInterface::DownloadFile, 549 base::Unretained(drive_service_), 550 local_cache_path, 551 resource_id, 552 base::Bind(&JobScheduler::OnDownloadActionJobDone, 553 weak_ptr_factory_.GetWeakPtr(), 554 new_job->job_info.job_id, 555 download_action_callback), 556 get_content_callback, 557 base::Bind(&JobScheduler::UpdateProgress, 558 weak_ptr_factory_.GetWeakPtr(), 559 new_job->job_info.job_id)); 560 new_job->abort_callback = 561 google_apis::CreateErrorRunCallback(download_action_callback); 562 StartJob(new_job); 563 return new_job->job_info.job_id; 564 } 565 566 void JobScheduler::UploadNewFile( 567 const std::string& parent_resource_id, 568 const base::FilePath& drive_file_path, 569 const base::FilePath& local_file_path, 570 const std::string& title, 571 const std::string& content_type, 572 const ClientContext& context, 573 const google_apis::GetResourceEntryCallback& callback) { 574 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 575 576 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE); 577 new_job->job_info.file_path = drive_file_path; 578 new_job->context = context; 579 580 UploadNewFileParams params; 581 params.parent_resource_id = parent_resource_id; 582 params.local_file_path = local_file_path; 583 params.title = title; 584 params.content_type = content_type; 585 586 ResumeUploadParams resume_params; 587 resume_params.local_file_path = params.local_file_path; 588 resume_params.content_type = params.content_type; 589 590 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone, 591 weak_ptr_factory_.GetWeakPtr(), 592 new_job->job_info.job_id, 593 resume_params, 594 callback); 595 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress, 596 weak_ptr_factory_.GetWeakPtr(), 597 new_job->job_info.job_id); 598 new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params); 599 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback); 600 StartJob(new_job); 601 } 602 603 void JobScheduler::UploadExistingFile( 604 const std::string& resource_id, 605 const base::FilePath& drive_file_path, 606 const base::FilePath& local_file_path, 607 const std::string& content_type, 608 const std::string& etag, 609 const ClientContext& context, 610 const google_apis::GetResourceEntryCallback& callback) { 611 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 612 613 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE); 614 new_job->job_info.file_path = drive_file_path; 615 new_job->context = context; 616 617 UploadExistingFileParams params; 618 params.resource_id = resource_id; 619 params.local_file_path = local_file_path; 620 params.content_type = content_type; 621 params.etag = etag; 622 623 ResumeUploadParams resume_params; 624 resume_params.local_file_path = params.local_file_path; 625 resume_params.content_type = params.content_type; 626 627 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone, 628 weak_ptr_factory_.GetWeakPtr(), 629 new_job->job_info.job_id, 630 resume_params, 631 callback); 632 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress, 633 weak_ptr_factory_.GetWeakPtr(), 634 new_job->job_info.job_id); 635 new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params); 636 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback); 637 StartJob(new_job); 638 } 639 640 void JobScheduler::CreateFile( 641 const std::string& parent_resource_id, 642 const base::FilePath& drive_file_path, 643 const std::string& title, 644 const std::string& content_type, 645 const ClientContext& context, 646 const google_apis::GetResourceEntryCallback& callback) { 647 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 648 649 const base::FilePath kDevNull(FILE_PATH_LITERAL("/dev/null")); 650 651 JobEntry* new_job = CreateNewJob(TYPE_CREATE_FILE); 652 new_job->job_info.file_path = drive_file_path; 653 new_job->context = context; 654 655 UploadNewFileParams params; 656 params.parent_resource_id = parent_resource_id; 657 params.local_file_path = kDevNull; // Upload an empty file. 658 params.title = title; 659 params.content_type = content_type; 660 661 ResumeUploadParams resume_params; 662 resume_params.local_file_path = params.local_file_path; 663 resume_params.content_type = params.content_type; 664 665 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone, 666 weak_ptr_factory_.GetWeakPtr(), 667 new_job->job_info.job_id, 668 resume_params, 669 callback); 670 params.progress_callback = google_apis::ProgressCallback(); 671 672 new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params); 673 new_job->abort_callback = google_apis::CreateErrorRunCallback(callback); 674 StartJob(new_job); 675 } 676 677 JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) { 678 JobEntry* job = new JobEntry(type); 679 job->job_info.job_id = job_map_.Add(job); // Takes the ownership of |job|. 680 return job; 681 } 682 683 void JobScheduler::StartJob(JobEntry* job) { 684 DCHECK(!job->task.is_null()); 685 686 QueueJob(job->job_info.job_id); 687 NotifyJobAdded(job->job_info); 688 DoJobLoop(GetJobQueueType(job->job_info.job_type)); 689 } 690 691 void JobScheduler::QueueJob(JobID job_id) { 692 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 693 694 JobEntry* job_entry = job_map_.Lookup(job_id); 695 DCHECK(job_entry); 696 const JobInfo& job_info = job_entry->job_info; 697 698 QueueType queue_type = GetJobQueueType(job_info.job_type); 699 queue_[queue_type]->Push(job_id, job_entry->context.type); 700 701 const std::string retry_prefix = job_entry->retry_count > 0 ? 702 base::StringPrintf(" (retry %d)", job_entry->retry_count) : ""; 703 util::Log(logging::LOG_INFO, 704 "Job queued%s: %s - %s", 705 retry_prefix.c_str(), 706 job_info.ToString().c_str(), 707 GetQueueInfo(queue_type).c_str()); 708 } 709 710 void JobScheduler::DoJobLoop(QueueType queue_type) { 711 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 712 713 const int accepted_priority = GetCurrentAcceptedPriority(queue_type); 714 715 // Abort all USER_INITAITED jobs when not accepted. 716 if (accepted_priority < USER_INITIATED) { 717 std::vector<JobID> jobs; 718 queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs); 719 for (size_t i = 0; i < jobs.size(); ++i) { 720 JobEntry* job = job_map_.Lookup(jobs[i]); 721 DCHECK(job); 722 AbortNotRunningJob(job, google_apis::GDATA_NO_CONNECTION); 723 } 724 } 725 726 // Wait when throttled. 727 const base::Time now = base::Time::Now(); 728 if (now < wait_until_) { 729 base::MessageLoopProxy::current()->PostDelayedTask( 730 FROM_HERE, 731 base::Bind(&JobScheduler::DoJobLoop, 732 weak_ptr_factory_.GetWeakPtr(), 733 queue_type), 734 wait_until_ - now); 735 return; 736 } 737 738 // Run the job with the highest priority in the queue. 739 JobID job_id = -1; 740 if (!queue_[queue_type]->PopForRun(accepted_priority, &job_id)) 741 return; 742 743 JobEntry* entry = job_map_.Lookup(job_id); 744 DCHECK(entry); 745 746 JobInfo* job_info = &entry->job_info; 747 job_info->state = STATE_RUNNING; 748 job_info->start_time = now; 749 NotifyJobUpdated(*job_info); 750 751 entry->cancel_callback = entry->task.Run(); 752 753 UpdateWait(); 754 755 util::Log(logging::LOG_INFO, 756 "Job started: %s - %s", 757 job_info->ToString().c_str(), 758 GetQueueInfo(queue_type).c_str()); 759 } 760 761 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) { 762 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 763 764 const int kNoJobShouldRun = -1; 765 766 // Should stop if Drive was disabled while running the fetch loop. 767 if (pref_service_->GetBoolean(prefs::kDisableDrive)) 768 return kNoJobShouldRun; 769 770 // Should stop if the network is not online. 771 if (net::NetworkChangeNotifier::IsOffline()) 772 return kNoJobShouldRun; 773 774 // For the file queue, if it is on cellular network, only user initiated 775 // operations are allowed to start. 776 if (queue_type == FILE_QUEUE && 777 pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) && 778 net::NetworkChangeNotifier::IsConnectionCellular( 779 net::NetworkChangeNotifier::GetConnectionType())) 780 return USER_INITIATED; 781 782 // Otherwise, every operations including background tasks are allowed. 783 return BACKGROUND; 784 } 785 786 void JobScheduler::UpdateWait() { 787 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 788 789 if (disable_throttling_ || throttle_count_ == 0) 790 return; 791 792 // Exponential backoff: https://developers.google.com/drive/handle-errors. 793 base::TimeDelta delay = 794 base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) + 795 base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000)); 796 VLOG(1) << "Throttling for " << delay.InMillisecondsF(); 797 798 wait_until_ = std::max(wait_until_, base::Time::Now() + delay); 799 } 800 801 bool JobScheduler::OnJobDone(JobID job_id, google_apis::GDataErrorCode error) { 802 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 803 804 JobEntry* job_entry = job_map_.Lookup(job_id); 805 DCHECK(job_entry); 806 JobInfo* job_info = &job_entry->job_info; 807 QueueType queue_type = GetJobQueueType(job_info->job_type); 808 queue_[queue_type]->MarkFinished(job_id); 809 810 const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time; 811 bool success = (GDataToFileError(error) == FILE_ERROR_OK); 812 util::Log(success ? logging::LOG_INFO : logging::LOG_WARNING, 813 "Job done: %s => %s (elapsed time: %sms) - %s", 814 job_info->ToString().c_str(), 815 GDataErrorCodeToString(error).c_str(), 816 base::Int64ToString(elapsed.InMilliseconds()).c_str(), 817 GetQueueInfo(queue_type).c_str()); 818 819 // Retry, depending on the error. 820 const bool is_server_error = 821 error == google_apis::HTTP_SERVICE_UNAVAILABLE || 822 error == google_apis::HTTP_INTERNAL_SERVER_ERROR; 823 if (is_server_error) { 824 if (throttle_count_ < kMaxThrottleCount) 825 ++throttle_count_; 826 UpdateWait(); 827 } else { 828 throttle_count_ = 0; 829 } 830 831 const bool should_retry = 832 is_server_error && job_entry->retry_count < kMaxRetryCount; 833 if (should_retry) { 834 job_entry->cancel_callback.Reset(); 835 job_info->state = STATE_RETRY; 836 NotifyJobUpdated(*job_info); 837 838 ++job_entry->retry_count; 839 840 // Requeue the job. 841 QueueJob(job_id); 842 } else { 843 NotifyJobDone(*job_info, error); 844 // The job has finished, no retry will happen in the scheduler. Now we can 845 // remove the job info from the map. 846 job_map_.Remove(job_id); 847 } 848 849 // Post a task to continue the job loop. This allows us to finish handling 850 // the current job before starting the next one. 851 base::MessageLoopProxy::current()->PostTask(FROM_HERE, 852 base::Bind(&JobScheduler::DoJobLoop, 853 weak_ptr_factory_.GetWeakPtr(), 854 queue_type)); 855 return !should_retry; 856 } 857 858 void JobScheduler::OnGetResourceListJobDone( 859 JobID job_id, 860 const google_apis::GetResourceListCallback& callback, 861 google_apis::GDataErrorCode error, 862 scoped_ptr<google_apis::ResourceList> resource_list) { 863 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 864 DCHECK(!callback.is_null()); 865 866 if (OnJobDone(job_id, error)) 867 callback.Run(error, resource_list.Pass()); 868 } 869 870 void JobScheduler::OnGetResourceEntryJobDone( 871 JobID job_id, 872 const google_apis::GetResourceEntryCallback& callback, 873 google_apis::GDataErrorCode error, 874 scoped_ptr<google_apis::ResourceEntry> entry) { 875 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 876 DCHECK(!callback.is_null()); 877 878 if (OnJobDone(job_id, error)) 879 callback.Run(error, entry.Pass()); 880 } 881 882 void JobScheduler::OnGetAboutResourceJobDone( 883 JobID job_id, 884 const google_apis::GetAboutResourceCallback& callback, 885 google_apis::GDataErrorCode error, 886 scoped_ptr<google_apis::AboutResource> about_resource) { 887 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 888 DCHECK(!callback.is_null()); 889 890 if (OnJobDone(job_id, error)) 891 callback.Run(error, about_resource.Pass()); 892 } 893 894 void JobScheduler::OnGetShareUrlJobDone( 895 JobID job_id, 896 const google_apis::GetShareUrlCallback& callback, 897 google_apis::GDataErrorCode error, 898 const GURL& share_url) { 899 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 900 DCHECK(!callback.is_null()); 901 902 if (OnJobDone(job_id, error)) 903 callback.Run(error, share_url); 904 } 905 906 void JobScheduler::OnGetAppListJobDone( 907 JobID job_id, 908 const google_apis::GetAppListCallback& callback, 909 google_apis::GDataErrorCode error, 910 scoped_ptr<google_apis::AppList> app_list) { 911 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 912 DCHECK(!callback.is_null()); 913 914 if (OnJobDone(job_id, error)) 915 callback.Run(error, app_list.Pass()); 916 } 917 918 void JobScheduler::OnEntryActionJobDone( 919 JobID job_id, 920 const google_apis::EntryActionCallback& callback, 921 google_apis::GDataErrorCode error) { 922 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 923 DCHECK(!callback.is_null()); 924 925 if (OnJobDone(job_id, error)) 926 callback.Run(error); 927 } 928 929 void JobScheduler::OnDownloadActionJobDone( 930 JobID job_id, 931 const google_apis::DownloadActionCallback& callback, 932 google_apis::GDataErrorCode error, 933 const base::FilePath& temp_file) { 934 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 935 DCHECK(!callback.is_null()); 936 937 if (OnJobDone(job_id, error)) 938 callback.Run(error, temp_file); 939 } 940 941 void JobScheduler::OnUploadCompletionJobDone( 942 JobID job_id, 943 const ResumeUploadParams& resume_params, 944 const google_apis::GetResourceEntryCallback& callback, 945 google_apis::GDataErrorCode error, 946 const GURL& upload_location, 947 scoped_ptr<google_apis::ResourceEntry> resource_entry) { 948 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 949 DCHECK(!callback.is_null()); 950 951 if (!upload_location.is_empty()) { 952 // If upload_location is available, update the task to resume the 953 // upload process from the terminated point. 954 // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE 955 // so OnJobDone called below will be in charge to re-queue the job. 956 JobEntry* job_entry = job_map_.Lookup(job_id); 957 DCHECK(job_entry); 958 959 ResumeUploadFileParams params; 960 params.upload_location = upload_location; 961 params.local_file_path = resume_params.local_file_path; 962 params.content_type = resume_params.content_type; 963 params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone, 964 weak_ptr_factory_.GetWeakPtr(), 965 job_id, 966 job_entry->task, 967 callback); 968 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress, 969 weak_ptr_factory_.GetWeakPtr(), 970 job_id); 971 job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params); 972 } 973 974 if (OnJobDone(job_id, error)) 975 callback.Run(error, resource_entry.Pass()); 976 } 977 978 void JobScheduler::OnResumeUploadFileDone( 979 JobID job_id, 980 const base::Callback<google_apis::CancelCallback()>& original_task, 981 const google_apis::GetResourceEntryCallback& callback, 982 google_apis::GDataErrorCode error, 983 const GURL& upload_location, 984 scoped_ptr<google_apis::ResourceEntry> resource_entry) { 985 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 986 DCHECK(!original_task.is_null()); 987 DCHECK(!callback.is_null()); 988 989 if (upload_location.is_empty()) { 990 // If upload_location is not available, we should discard it and stop trying 991 // to resume. Restore the original task. 992 JobEntry* job_entry = job_map_.Lookup(job_id); 993 DCHECK(job_entry); 994 job_entry->task = original_task; 995 } 996 997 if (OnJobDone(job_id, error)) 998 callback.Run(error, resource_entry.Pass()); 999 } 1000 1001 void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) { 1002 JobEntry* job_entry = job_map_.Lookup(job_id); 1003 DCHECK(job_entry); 1004 1005 job_entry->job_info.num_completed_bytes = progress; 1006 job_entry->job_info.num_total_bytes = total; 1007 NotifyJobUpdated(job_entry->job_info); 1008 } 1009 1010 void JobScheduler::OnConnectionTypeChanged( 1011 net::NetworkChangeNotifier::ConnectionType type) { 1012 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 1013 1014 // Resume the job loop. 1015 // Note that we don't need to check the network connection status as it will 1016 // be checked in GetCurrentAcceptedPriority(). 1017 for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i) 1018 DoJobLoop(static_cast<QueueType>(i)); 1019 } 1020 1021 JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) { 1022 switch (type) { 1023 case TYPE_GET_ABOUT_RESOURCE: 1024 case TYPE_GET_APP_LIST: 1025 case TYPE_GET_ALL_RESOURCE_LIST: 1026 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY: 1027 case TYPE_SEARCH: 1028 case TYPE_GET_CHANGE_LIST: 1029 case TYPE_CONTINUE_GET_RESOURCE_LIST: 1030 case TYPE_GET_RESOURCE_ENTRY: 1031 case TYPE_GET_SHARE_URL: 1032 case TYPE_DELETE_RESOURCE: 1033 case TYPE_COPY_RESOURCE: 1034 case TYPE_COPY_HOSTED_DOCUMENT: 1035 case TYPE_RENAME_RESOURCE: 1036 case TYPE_TOUCH_RESOURCE: 1037 case TYPE_ADD_RESOURCE_TO_DIRECTORY: 1038 case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY: 1039 case TYPE_ADD_NEW_DIRECTORY: 1040 case TYPE_CREATE_FILE: 1041 return METADATA_QUEUE; 1042 1043 case TYPE_DOWNLOAD_FILE: 1044 case TYPE_UPLOAD_NEW_FILE: 1045 case TYPE_UPLOAD_EXISTING_FILE: 1046 return FILE_QUEUE; 1047 } 1048 NOTREACHED(); 1049 return FILE_QUEUE; 1050 } 1051 1052 void JobScheduler::AbortNotRunningJob(JobEntry* job, 1053 google_apis::GDataErrorCode error) { 1054 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 1055 1056 const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time; 1057 const QueueType queue_type = GetJobQueueType(job->job_info.job_type); 1058 util::Log(logging::LOG_INFO, 1059 "Job aborted: %s => %s (elapsed time: %sms) - %s", 1060 job->job_info.ToString().c_str(), 1061 GDataErrorCodeToString(error).c_str(), 1062 base::Int64ToString(elapsed.InMilliseconds()).c_str(), 1063 GetQueueInfo(queue_type).c_str()); 1064 1065 base::Callback<void(google_apis::GDataErrorCode)> callback = 1066 job->abort_callback; 1067 queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id); 1068 NotifyJobDone(job->job_info, error); 1069 job_map_.Remove(job->job_info.job_id); 1070 base::MessageLoopProxy::current()->PostTask(FROM_HERE, 1071 base::Bind(callback, error)); 1072 } 1073 1074 void JobScheduler::NotifyJobAdded(const JobInfo& job_info) { 1075 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 1076 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info)); 1077 } 1078 1079 void JobScheduler::NotifyJobDone(const JobInfo& job_info, 1080 google_apis::GDataErrorCode error) { 1081 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 1082 FOR_EACH_OBSERVER(JobListObserver, observer_list_, 1083 OnJobDone(job_info, GDataToFileError(error))); 1084 } 1085 1086 void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) { 1087 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); 1088 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info)); 1089 } 1090 1091 std::string JobScheduler::GetQueueInfo(QueueType type) const { 1092 return QueueTypeToString(type) + " " + queue_[type]->ToString(); 1093 } 1094 1095 // static 1096 std::string JobScheduler::QueueTypeToString(QueueType type) { 1097 switch (type) { 1098 case METADATA_QUEUE: 1099 return "METADATA_QUEUE"; 1100 case FILE_QUEUE: 1101 return "FILE_QUEUE"; 1102 case NUM_QUEUES: 1103 break; // This value is just a sentinel. Should never be used. 1104 } 1105 NOTREACHED(); 1106 return ""; 1107 } 1108 1109 } // namespace drive 1110