Home | History | Annotate | Download | only in drive
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "chrome/browser/chromeos/drive/job_scheduler.h"
      6 
      7 #include "base/message_loop/message_loop.h"
      8 #include "base/prefs/pref_service.h"
      9 #include "base/rand_util.h"
     10 #include "base/strings/string_number_conversions.h"
     11 #include "base/strings/stringprintf.h"
     12 #include "chrome/browser/chromeos/drive/file_system_util.h"
     13 #include "chrome/browser/chromeos/drive/logging.h"
     14 #include "chrome/browser/google_apis/drive_api_parser.h"
     15 #include "chrome/browser/google_apis/task_util.h"
     16 #include "chrome/common/pref_names.h"
     17 #include "content/public/browser/browser_thread.h"
     18 
     19 using content::BrowserThread;
     20 
     21 namespace drive {
     22 
     23 namespace {
     24 
     25 const int kMaxThrottleCount = 4;
     26 
     27 // According to the API documentation, this should be the same as
     28 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
     29 // But currently multiplied by 2 to ensure upload related jobs retried for a
     30 // sufficient number of times. crbug.com/269918
     31 const int kMaxRetryCount = 2*kMaxThrottleCount;
     32 
     33 // Parameter struct for RunUploadNewFile.
     34 struct UploadNewFileParams {
     35   std::string parent_resource_id;
     36   base::FilePath local_file_path;
     37   std::string title;
     38   std::string content_type;
     39   UploadCompletionCallback callback;
     40   google_apis::ProgressCallback progress_callback;
     41 };
     42 
     43 // Helper function to work around the arity limitation of base::Bind.
     44 google_apis::CancelCallback RunUploadNewFile(
     45     DriveUploaderInterface* uploader,
     46     const UploadNewFileParams& params) {
     47   return uploader->UploadNewFile(params.parent_resource_id,
     48                                  params.local_file_path,
     49                                  params.title,
     50                                  params.content_type,
     51                                  params.callback,
     52                                  params.progress_callback);
     53 }
     54 
     55 // Parameter struct for RunUploadExistingFile.
     56 struct UploadExistingFileParams {
     57   std::string resource_id;
     58   base::FilePath local_file_path;
     59   std::string content_type;
     60   std::string etag;
     61   UploadCompletionCallback callback;
     62   google_apis::ProgressCallback progress_callback;
     63 };
     64 
     65 // Helper function to work around the arity limitation of base::Bind.
     66 google_apis::CancelCallback RunUploadExistingFile(
     67     DriveUploaderInterface* uploader,
     68     const UploadExistingFileParams& params) {
     69   return uploader->UploadExistingFile(params.resource_id,
     70                                       params.local_file_path,
     71                                       params.content_type,
     72                                       params.etag,
     73                                       params.callback,
     74                                       params.progress_callback);
     75 }
     76 
     77 // Parameter struct for RunResumeUploadFile.
     78 struct ResumeUploadFileParams {
     79   GURL upload_location;
     80   base::FilePath local_file_path;
     81   std::string content_type;
     82   UploadCompletionCallback callback;
     83   google_apis::ProgressCallback progress_callback;
     84 };
     85 
     86 // Helper function to adjust the return type.
     87 google_apis::CancelCallback RunResumeUploadFile(
     88     DriveUploaderInterface* uploader,
     89     const ResumeUploadFileParams& params) {
     90   return uploader->ResumeUploadFile(params.upload_location,
     91                                     params.local_file_path,
     92                                     params.content_type,
     93                                     params.callback,
     94                                     params.progress_callback);
     95 }
     96 
     97 }  // namespace
     98 
     99 const int JobScheduler::kMaxJobCount[] = {
    100   5,  // METADATA_QUEUE
    101   1,  // FILE_QUEUE
    102 };
    103 
    104 JobScheduler::JobEntry::JobEntry(JobType type)
    105     : job_info(type),
    106       context(ClientContext(USER_INITIATED)),
    107       retry_count(0) {
    108   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    109 }
    110 
    111 JobScheduler::JobEntry::~JobEntry() {
    112   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    113 }
    114 
    115 struct JobScheduler::ResumeUploadParams {
    116   base::FilePath drive_file_path;
    117   base::FilePath local_file_path;
    118   std::string content_type;
    119 };
    120 
    121 JobScheduler::JobScheduler(
    122     PrefService* pref_service,
    123     DriveServiceInterface* drive_service,
    124     base::SequencedTaskRunner* blocking_task_runner)
    125     : throttle_count_(0),
    126       wait_until_(base::Time::Now()),
    127       disable_throttling_(false),
    128       drive_service_(drive_service),
    129       uploader_(new DriveUploader(drive_service, blocking_task_runner)),
    130       pref_service_(pref_service),
    131       weak_ptr_factory_(this) {
    132   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    133 
    134   for (int i = 0; i < NUM_QUEUES; ++i)
    135     queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES));
    136 
    137   net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
    138 }
    139 
    140 JobScheduler::~JobScheduler() {
    141   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    142 
    143   size_t num_queued_jobs = 0;
    144   for (int i = 0; i < NUM_QUEUES; ++i)
    145     num_queued_jobs += queue_[i]->GetNumberOfJobs();
    146   DCHECK_EQ(num_queued_jobs, job_map_.size());
    147 
    148   net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
    149 }
    150 
    151 std::vector<JobInfo> JobScheduler::GetJobInfoList() {
    152   std::vector<JobInfo> job_info_list;
    153   for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
    154     job_info_list.push_back(iter.GetCurrentValue()->job_info);
    155   return job_info_list;
    156 }
    157 
    158 void JobScheduler::AddObserver(JobListObserver* observer) {
    159   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    160   observer_list_.AddObserver(observer);
    161 }
    162 
    163 void JobScheduler::RemoveObserver(JobListObserver* observer) {
    164   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    165   observer_list_.RemoveObserver(observer);
    166 }
    167 
    168 void JobScheduler::CancelJob(JobID job_id) {
    169   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    170 
    171   JobEntry* job = job_map_.Lookup(job_id);
    172   if (job) {
    173     if (job->job_info.state == STATE_RUNNING) {
    174       // If the job is running an HTTP request, cancel it via |cancel_callback|
    175       // returned from the request, and wait for termination in the normal
    176       // callback handler, OnJobDone.
    177       if (!job->cancel_callback.is_null())
    178         job->cancel_callback.Run();
    179     } else {
    180       AbortNotRunningJob(job, google_apis::GDATA_CANCELLED);
    181     }
    182   }
    183 }
    184 
    185 void JobScheduler::CancelAllJobs() {
    186   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    187 
    188   // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
    189   // removable during iteration.
    190   for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
    191     CancelJob(iter.GetCurrentKey());
    192 }
    193 
    194 void JobScheduler::GetAboutResource(
    195     const google_apis::GetAboutResourceCallback& callback) {
    196   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    197   DCHECK(!callback.is_null());
    198 
    199   JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE);
    200   new_job->task = base::Bind(
    201       &DriveServiceInterface::GetAboutResource,
    202       base::Unretained(drive_service_),
    203       base::Bind(&JobScheduler::OnGetAboutResourceJobDone,
    204                  weak_ptr_factory_.GetWeakPtr(),
    205                  new_job->job_info.job_id,
    206                  callback));
    207   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
    208   StartJob(new_job);
    209 }
    210 
    211 void JobScheduler::GetAppList(
    212     const google_apis::GetAppListCallback& callback) {
    213   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    214   DCHECK(!callback.is_null());
    215 
    216   JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST);
    217   new_job->task = base::Bind(
    218       &DriveServiceInterface::GetAppList,
    219       base::Unretained(drive_service_),
    220       base::Bind(&JobScheduler::OnGetAppListJobDone,
    221                  weak_ptr_factory_.GetWeakPtr(),
    222                  new_job->job_info.job_id,
    223                  callback));
    224   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
    225   StartJob(new_job);
    226 }
    227 
    228 void JobScheduler::GetAllResourceList(
    229     const google_apis::GetResourceListCallback& callback) {
    230   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    231   DCHECK(!callback.is_null());
    232 
    233   JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST);
    234   new_job->task = base::Bind(
    235       &DriveServiceInterface::GetAllResourceList,
    236       base::Unretained(drive_service_),
    237       base::Bind(&JobScheduler::OnGetResourceListJobDone,
    238                  weak_ptr_factory_.GetWeakPtr(),
    239                  new_job->job_info.job_id,
    240                  callback));
    241   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
    242   StartJob(new_job);
    243 }
    244 
    245 void JobScheduler::GetResourceListInDirectory(
    246     const std::string& directory_resource_id,
    247     const google_apis::GetResourceListCallback& callback) {
    248   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    249   DCHECK(!callback.is_null());
    250 
    251   JobEntry* new_job = CreateNewJob(
    252       TYPE_GET_RESOURCE_LIST_IN_DIRECTORY);
    253   new_job->task = base::Bind(
    254       &DriveServiceInterface::GetResourceListInDirectory,
    255       base::Unretained(drive_service_),
    256       directory_resource_id,
    257       base::Bind(&JobScheduler::OnGetResourceListJobDone,
    258                  weak_ptr_factory_.GetWeakPtr(),
    259                  new_job->job_info.job_id,
    260                  callback));
    261   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
    262   StartJob(new_job);
    263 }
    264 
    265 void JobScheduler::Search(
    266     const std::string& search_query,
    267     const google_apis::GetResourceListCallback& callback) {
    268   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    269   DCHECK(!callback.is_null());
    270 
    271   JobEntry* new_job = CreateNewJob(TYPE_SEARCH);
    272   new_job->task = base::Bind(
    273       &DriveServiceInterface::Search,
    274       base::Unretained(drive_service_),
    275       search_query,
    276       base::Bind(&JobScheduler::OnGetResourceListJobDone,
    277                  weak_ptr_factory_.GetWeakPtr(),
    278                  new_job->job_info.job_id,
    279                  callback));
    280   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
    281   StartJob(new_job);
    282 }
    283 
    284 void JobScheduler::GetChangeList(
    285     int64 start_changestamp,
    286     const google_apis::GetResourceListCallback& callback) {
    287   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    288   DCHECK(!callback.is_null());
    289 
    290   JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST);
    291   new_job->task = base::Bind(
    292       &DriveServiceInterface::GetChangeList,
    293       base::Unretained(drive_service_),
    294       start_changestamp,
    295       base::Bind(&JobScheduler::OnGetResourceListJobDone,
    296                  weak_ptr_factory_.GetWeakPtr(),
    297                  new_job->job_info.job_id,
    298                  callback));
    299   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
    300   StartJob(new_job);
    301 }
    302 
    303 void JobScheduler::ContinueGetResourceList(
    304     const GURL& next_url,
    305     const google_apis::GetResourceListCallback& callback) {
    306   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    307   DCHECK(!callback.is_null());
    308 
    309   JobEntry* new_job = CreateNewJob(TYPE_CONTINUE_GET_RESOURCE_LIST);
    310   new_job->task = base::Bind(
    311       &DriveServiceInterface::ContinueGetResourceList,
    312       base::Unretained(drive_service_),
    313       next_url,
    314       base::Bind(&JobScheduler::OnGetResourceListJobDone,
    315                  weak_ptr_factory_.GetWeakPtr(),
    316                  new_job->job_info.job_id,
    317                  callback));
    318   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
    319   StartJob(new_job);
    320 }
    321 
    322 void JobScheduler::GetResourceEntry(
    323     const std::string& resource_id,
    324     const ClientContext& context,
    325     const google_apis::GetResourceEntryCallback& callback) {
    326   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    327   DCHECK(!callback.is_null());
    328 
    329   JobEntry* new_job = CreateNewJob(TYPE_GET_RESOURCE_ENTRY);
    330   new_job->context = context;
    331   new_job->task = base::Bind(
    332       &DriveServiceInterface::GetResourceEntry,
    333       base::Unretained(drive_service_),
    334       resource_id,
    335       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
    336                  weak_ptr_factory_.GetWeakPtr(),
    337                  new_job->job_info.job_id,
    338                  callback));
    339   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
    340   StartJob(new_job);
    341 }
    342 
    343 void JobScheduler::GetShareUrl(
    344     const std::string& resource_id,
    345     const GURL& embed_origin,
    346     const ClientContext& context,
    347     const google_apis::GetShareUrlCallback& callback) {
    348   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    349   DCHECK(!callback.is_null());
    350 
    351   JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL);
    352   new_job->context = context;
    353   new_job->task = base::Bind(
    354       &DriveServiceInterface::GetShareUrl,
    355       base::Unretained(drive_service_),
    356       resource_id,
    357       embed_origin,
    358       base::Bind(&JobScheduler::OnGetShareUrlJobDone,
    359                  weak_ptr_factory_.GetWeakPtr(),
    360                  new_job->job_info.job_id,
    361                  callback));
    362   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
    363   StartJob(new_job);
    364 }
    365 
    366 void JobScheduler::DeleteResource(
    367     const std::string& resource_id,
    368     const google_apis::EntryActionCallback& callback) {
    369   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    370   DCHECK(!callback.is_null());
    371 
    372   JobEntry* new_job = CreateNewJob(TYPE_DELETE_RESOURCE);
    373   new_job->task = base::Bind(
    374       &DriveServiceInterface::DeleteResource,
    375       base::Unretained(drive_service_),
    376       resource_id,
    377       "",  // etag
    378       base::Bind(&JobScheduler::OnEntryActionJobDone,
    379                  weak_ptr_factory_.GetWeakPtr(),
    380                  new_job->job_info.job_id,
    381                  callback));
    382   new_job->abort_callback = callback;
    383   StartJob(new_job);
    384 }
    385 
    386 void JobScheduler::CopyResource(
    387     const std::string& resource_id,
    388     const std::string& parent_resource_id,
    389     const std::string& new_title,
    390     const google_apis::GetResourceEntryCallback& callback) {
    391   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    392   DCHECK(!callback.is_null());
    393 
    394   JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE);
    395   new_job->task = base::Bind(
    396       &DriveServiceInterface::CopyResource,
    397       base::Unretained(drive_service_),
    398       resource_id,
    399       parent_resource_id,
    400       new_title,
    401       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
    402                  weak_ptr_factory_.GetWeakPtr(),
    403                  new_job->job_info.job_id,
    404                  callback));
    405   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
    406   StartJob(new_job);
    407 }
    408 
    409 void JobScheduler::CopyHostedDocument(
    410     const std::string& resource_id,
    411     const std::string& new_title,
    412     const google_apis::GetResourceEntryCallback& callback) {
    413   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    414   DCHECK(!callback.is_null());
    415 
    416   JobEntry* new_job = CreateNewJob(TYPE_COPY_HOSTED_DOCUMENT);
    417   new_job->task = base::Bind(
    418       &DriveServiceInterface::CopyHostedDocument,
    419       base::Unretained(drive_service_),
    420       resource_id,
    421       new_title,
    422       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
    423                  weak_ptr_factory_.GetWeakPtr(),
    424                  new_job->job_info.job_id,
    425                  callback));
    426   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
    427   StartJob(new_job);
    428 }
    429 
    430 void JobScheduler::RenameResource(
    431     const std::string& resource_id,
    432     const std::string& new_title,
    433     const google_apis::EntryActionCallback& callback) {
    434   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    435   DCHECK(!callback.is_null());
    436 
    437   JobEntry* new_job = CreateNewJob(TYPE_RENAME_RESOURCE);
    438   new_job->task = base::Bind(
    439       &DriveServiceInterface::RenameResource,
    440       base::Unretained(drive_service_),
    441       resource_id,
    442       new_title,
    443       base::Bind(&JobScheduler::OnEntryActionJobDone,
    444                  weak_ptr_factory_.GetWeakPtr(),
    445                  new_job->job_info.job_id,
    446                  callback));
    447   new_job->abort_callback = callback;
    448   StartJob(new_job);
    449 }
    450 
    451 void JobScheduler::TouchResource(
    452     const std::string& resource_id,
    453     const base::Time& modified_date,
    454     const base::Time& last_viewed_by_me_date,
    455     const google_apis::GetResourceEntryCallback& callback) {
    456   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    457   DCHECK(!callback.is_null());
    458 
    459   JobEntry* new_job = CreateNewJob(TYPE_TOUCH_RESOURCE);
    460   new_job->task = base::Bind(
    461       &DriveServiceInterface::TouchResource,
    462       base::Unretained(drive_service_),
    463       resource_id,
    464       modified_date,
    465       last_viewed_by_me_date,
    466       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
    467                  weak_ptr_factory_.GetWeakPtr(),
    468                  new_job->job_info.job_id,
    469                  callback));
    470   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
    471   StartJob(new_job);
    472 }
    473 
    474 void JobScheduler::AddResourceToDirectory(
    475     const std::string& parent_resource_id,
    476     const std::string& resource_id,
    477     const google_apis::EntryActionCallback& callback) {
    478   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    479   DCHECK(!callback.is_null());
    480 
    481   JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY);
    482   new_job->task = base::Bind(
    483       &DriveServiceInterface::AddResourceToDirectory,
    484       base::Unretained(drive_service_),
    485       parent_resource_id,
    486       resource_id,
    487       base::Bind(&JobScheduler::OnEntryActionJobDone,
    488                  weak_ptr_factory_.GetWeakPtr(),
    489                  new_job->job_info.job_id,
    490                  callback));
    491   new_job->abort_callback = callback;
    492   StartJob(new_job);
    493 }
    494 
    495 void JobScheduler::RemoveResourceFromDirectory(
    496     const std::string& parent_resource_id,
    497     const std::string& resource_id,
    498     const google_apis::EntryActionCallback& callback) {
    499   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    500 
    501   JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY);
    502   new_job->task = base::Bind(
    503       &DriveServiceInterface::RemoveResourceFromDirectory,
    504       base::Unretained(drive_service_),
    505       parent_resource_id,
    506       resource_id,
    507       base::Bind(&JobScheduler::OnEntryActionJobDone,
    508                  weak_ptr_factory_.GetWeakPtr(),
    509                  new_job->job_info.job_id,
    510                  callback));
    511   new_job->abort_callback = callback;
    512   StartJob(new_job);
    513 }
    514 
    515 void JobScheduler::AddNewDirectory(
    516     const std::string& parent_resource_id,
    517     const std::string& directory_title,
    518     const google_apis::GetResourceEntryCallback& callback) {
    519   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    520 
    521   JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY);
    522   new_job->task = base::Bind(
    523       &DriveServiceInterface::AddNewDirectory,
    524       base::Unretained(drive_service_),
    525       parent_resource_id,
    526       directory_title,
    527       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
    528                  weak_ptr_factory_.GetWeakPtr(),
    529                  new_job->job_info.job_id,
    530                  callback));
    531   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
    532   StartJob(new_job);
    533 }
    534 
    535 JobID JobScheduler::DownloadFile(
    536     const base::FilePath& virtual_path,
    537     const base::FilePath& local_cache_path,
    538     const std::string& resource_id,
    539     const ClientContext& context,
    540     const google_apis::DownloadActionCallback& download_action_callback,
    541     const google_apis::GetContentCallback& get_content_callback) {
    542   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    543 
    544   JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE);
    545   new_job->job_info.file_path = virtual_path;
    546   new_job->context = context;
    547   new_job->task = base::Bind(
    548       &DriveServiceInterface::DownloadFile,
    549       base::Unretained(drive_service_),
    550       local_cache_path,
    551       resource_id,
    552       base::Bind(&JobScheduler::OnDownloadActionJobDone,
    553                  weak_ptr_factory_.GetWeakPtr(),
    554                  new_job->job_info.job_id,
    555                  download_action_callback),
    556       get_content_callback,
    557       base::Bind(&JobScheduler::UpdateProgress,
    558                  weak_ptr_factory_.GetWeakPtr(),
    559                  new_job->job_info.job_id));
    560   new_job->abort_callback =
    561       google_apis::CreateErrorRunCallback(download_action_callback);
    562   StartJob(new_job);
    563   return new_job->job_info.job_id;
    564 }
    565 
    566 void JobScheduler::UploadNewFile(
    567     const std::string& parent_resource_id,
    568     const base::FilePath& drive_file_path,
    569     const base::FilePath& local_file_path,
    570     const std::string& title,
    571     const std::string& content_type,
    572     const ClientContext& context,
    573     const google_apis::GetResourceEntryCallback& callback) {
    574   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    575 
    576   JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE);
    577   new_job->job_info.file_path = drive_file_path;
    578   new_job->context = context;
    579 
    580   UploadNewFileParams params;
    581   params.parent_resource_id = parent_resource_id;
    582   params.local_file_path = local_file_path;
    583   params.title = title;
    584   params.content_type = content_type;
    585 
    586   ResumeUploadParams resume_params;
    587   resume_params.local_file_path = params.local_file_path;
    588   resume_params.content_type = params.content_type;
    589 
    590   params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
    591                                weak_ptr_factory_.GetWeakPtr(),
    592                                new_job->job_info.job_id,
    593                                resume_params,
    594                                callback);
    595   params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
    596                                         weak_ptr_factory_.GetWeakPtr(),
    597                                         new_job->job_info.job_id);
    598   new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
    599   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
    600   StartJob(new_job);
    601 }
    602 
    603 void JobScheduler::UploadExistingFile(
    604     const std::string& resource_id,
    605     const base::FilePath& drive_file_path,
    606     const base::FilePath& local_file_path,
    607     const std::string& content_type,
    608     const std::string& etag,
    609     const ClientContext& context,
    610     const google_apis::GetResourceEntryCallback& callback) {
    611   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    612 
    613   JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE);
    614   new_job->job_info.file_path = drive_file_path;
    615   new_job->context = context;
    616 
    617   UploadExistingFileParams params;
    618   params.resource_id = resource_id;
    619   params.local_file_path = local_file_path;
    620   params.content_type = content_type;
    621   params.etag = etag;
    622 
    623   ResumeUploadParams resume_params;
    624   resume_params.local_file_path = params.local_file_path;
    625   resume_params.content_type = params.content_type;
    626 
    627   params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
    628                                weak_ptr_factory_.GetWeakPtr(),
    629                                new_job->job_info.job_id,
    630                                resume_params,
    631                                callback);
    632   params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
    633                                         weak_ptr_factory_.GetWeakPtr(),
    634                                         new_job->job_info.job_id);
    635   new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params);
    636   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
    637   StartJob(new_job);
    638 }
    639 
    640 void JobScheduler::CreateFile(
    641     const std::string& parent_resource_id,
    642     const base::FilePath& drive_file_path,
    643     const std::string& title,
    644     const std::string& content_type,
    645     const ClientContext& context,
    646     const google_apis::GetResourceEntryCallback& callback) {
    647   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    648 
    649   const base::FilePath kDevNull(FILE_PATH_LITERAL("/dev/null"));
    650 
    651   JobEntry* new_job = CreateNewJob(TYPE_CREATE_FILE);
    652   new_job->job_info.file_path = drive_file_path;
    653   new_job->context = context;
    654 
    655   UploadNewFileParams params;
    656   params.parent_resource_id = parent_resource_id;
    657   params.local_file_path = kDevNull;  // Upload an empty file.
    658   params.title = title;
    659   params.content_type = content_type;
    660 
    661   ResumeUploadParams resume_params;
    662   resume_params.local_file_path = params.local_file_path;
    663   resume_params.content_type = params.content_type;
    664 
    665   params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
    666                                weak_ptr_factory_.GetWeakPtr(),
    667                                new_job->job_info.job_id,
    668                                resume_params,
    669                                callback);
    670   params.progress_callback = google_apis::ProgressCallback();
    671 
    672   new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
    673   new_job->abort_callback = google_apis::CreateErrorRunCallback(callback);
    674   StartJob(new_job);
    675 }
    676 
    677 JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) {
    678   JobEntry* job = new JobEntry(type);
    679   job->job_info.job_id = job_map_.Add(job);  // Takes the ownership of |job|.
    680   return job;
    681 }
    682 
    683 void JobScheduler::StartJob(JobEntry* job) {
    684   DCHECK(!job->task.is_null());
    685 
    686   QueueJob(job->job_info.job_id);
    687   NotifyJobAdded(job->job_info);
    688   DoJobLoop(GetJobQueueType(job->job_info.job_type));
    689 }
    690 
    691 void JobScheduler::QueueJob(JobID job_id) {
    692   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    693 
    694   JobEntry* job_entry = job_map_.Lookup(job_id);
    695   DCHECK(job_entry);
    696   const JobInfo& job_info = job_entry->job_info;
    697 
    698   QueueType queue_type = GetJobQueueType(job_info.job_type);
    699   queue_[queue_type]->Push(job_id, job_entry->context.type);
    700 
    701   const std::string retry_prefix = job_entry->retry_count > 0 ?
    702       base::StringPrintf(" (retry %d)", job_entry->retry_count) : "";
    703   util::Log(logging::LOG_INFO,
    704             "Job queued%s: %s - %s",
    705             retry_prefix.c_str(),
    706             job_info.ToString().c_str(),
    707             GetQueueInfo(queue_type).c_str());
    708 }
    709 
    710 void JobScheduler::DoJobLoop(QueueType queue_type) {
    711   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    712 
    713   const int accepted_priority = GetCurrentAcceptedPriority(queue_type);
    714 
    715   // Abort all USER_INITAITED jobs when not accepted.
    716   if (accepted_priority < USER_INITIATED) {
    717     std::vector<JobID> jobs;
    718     queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs);
    719     for (size_t i = 0; i < jobs.size(); ++i) {
    720       JobEntry* job = job_map_.Lookup(jobs[i]);
    721       DCHECK(job);
    722       AbortNotRunningJob(job, google_apis::GDATA_NO_CONNECTION);
    723     }
    724   }
    725 
    726   // Wait when throttled.
    727   const base::Time now = base::Time::Now();
    728   if (now < wait_until_) {
    729     base::MessageLoopProxy::current()->PostDelayedTask(
    730         FROM_HERE,
    731         base::Bind(&JobScheduler::DoJobLoop,
    732                    weak_ptr_factory_.GetWeakPtr(),
    733                    queue_type),
    734         wait_until_ - now);
    735     return;
    736   }
    737 
    738   // Run the job with the highest priority in the queue.
    739   JobID job_id = -1;
    740   if (!queue_[queue_type]->PopForRun(accepted_priority, &job_id))
    741     return;
    742 
    743   JobEntry* entry = job_map_.Lookup(job_id);
    744   DCHECK(entry);
    745 
    746   JobInfo* job_info = &entry->job_info;
    747   job_info->state = STATE_RUNNING;
    748   job_info->start_time = now;
    749   NotifyJobUpdated(*job_info);
    750 
    751   entry->cancel_callback = entry->task.Run();
    752 
    753   UpdateWait();
    754 
    755   util::Log(logging::LOG_INFO,
    756             "Job started: %s - %s",
    757             job_info->ToString().c_str(),
    758             GetQueueInfo(queue_type).c_str());
    759 }
    760 
    761 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
    762   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    763 
    764   const int kNoJobShouldRun = -1;
    765 
    766   // Should stop if Drive was disabled while running the fetch loop.
    767   if (pref_service_->GetBoolean(prefs::kDisableDrive))
    768     return kNoJobShouldRun;
    769 
    770   // Should stop if the network is not online.
    771   if (net::NetworkChangeNotifier::IsOffline())
    772     return kNoJobShouldRun;
    773 
    774   // For the file queue, if it is on cellular network, only user initiated
    775   // operations are allowed to start.
    776   if (queue_type == FILE_QUEUE &&
    777       pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) &&
    778       net::NetworkChangeNotifier::IsConnectionCellular(
    779           net::NetworkChangeNotifier::GetConnectionType()))
    780     return USER_INITIATED;
    781 
    782   // Otherwise, every operations including background tasks are allowed.
    783   return BACKGROUND;
    784 }
    785 
    786 void JobScheduler::UpdateWait() {
    787   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    788 
    789   if (disable_throttling_ || throttle_count_ == 0)
    790     return;
    791 
    792   // Exponential backoff: https://developers.google.com/drive/handle-errors.
    793   base::TimeDelta delay =
    794       base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) +
    795       base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
    796   VLOG(1) << "Throttling for " << delay.InMillisecondsF();
    797 
    798   wait_until_ = std::max(wait_until_, base::Time::Now() + delay);
    799 }
    800 
    801 bool JobScheduler::OnJobDone(JobID job_id, google_apis::GDataErrorCode error) {
    802   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    803 
    804   JobEntry* job_entry = job_map_.Lookup(job_id);
    805   DCHECK(job_entry);
    806   JobInfo* job_info = &job_entry->job_info;
    807   QueueType queue_type = GetJobQueueType(job_info->job_type);
    808   queue_[queue_type]->MarkFinished(job_id);
    809 
    810   const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time;
    811   bool success = (GDataToFileError(error) == FILE_ERROR_OK);
    812   util::Log(success ? logging::LOG_INFO : logging::LOG_WARNING,
    813             "Job done: %s => %s (elapsed time: %sms) - %s",
    814             job_info->ToString().c_str(),
    815             GDataErrorCodeToString(error).c_str(),
    816             base::Int64ToString(elapsed.InMilliseconds()).c_str(),
    817             GetQueueInfo(queue_type).c_str());
    818 
    819   // Retry, depending on the error.
    820   const bool is_server_error =
    821       error == google_apis::HTTP_SERVICE_UNAVAILABLE ||
    822       error == google_apis::HTTP_INTERNAL_SERVER_ERROR;
    823   if (is_server_error) {
    824     if (throttle_count_ < kMaxThrottleCount)
    825       ++throttle_count_;
    826     UpdateWait();
    827   } else {
    828     throttle_count_ = 0;
    829   }
    830 
    831   const bool should_retry =
    832       is_server_error && job_entry->retry_count < kMaxRetryCount;
    833   if (should_retry) {
    834     job_entry->cancel_callback.Reset();
    835     job_info->state = STATE_RETRY;
    836     NotifyJobUpdated(*job_info);
    837 
    838     ++job_entry->retry_count;
    839 
    840     // Requeue the job.
    841     QueueJob(job_id);
    842   } else {
    843     NotifyJobDone(*job_info, error);
    844     // The job has finished, no retry will happen in the scheduler. Now we can
    845     // remove the job info from the map.
    846     job_map_.Remove(job_id);
    847   }
    848 
    849   // Post a task to continue the job loop.  This allows us to finish handling
    850   // the current job before starting the next one.
    851   base::MessageLoopProxy::current()->PostTask(FROM_HERE,
    852       base::Bind(&JobScheduler::DoJobLoop,
    853                  weak_ptr_factory_.GetWeakPtr(),
    854                  queue_type));
    855   return !should_retry;
    856 }
    857 
    858 void JobScheduler::OnGetResourceListJobDone(
    859     JobID job_id,
    860     const google_apis::GetResourceListCallback& callback,
    861     google_apis::GDataErrorCode error,
    862     scoped_ptr<google_apis::ResourceList> resource_list) {
    863   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    864   DCHECK(!callback.is_null());
    865 
    866   if (OnJobDone(job_id, error))
    867     callback.Run(error, resource_list.Pass());
    868 }
    869 
    870 void JobScheduler::OnGetResourceEntryJobDone(
    871     JobID job_id,
    872     const google_apis::GetResourceEntryCallback& callback,
    873     google_apis::GDataErrorCode error,
    874     scoped_ptr<google_apis::ResourceEntry> entry) {
    875   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    876   DCHECK(!callback.is_null());
    877 
    878   if (OnJobDone(job_id, error))
    879     callback.Run(error, entry.Pass());
    880 }
    881 
    882 void JobScheduler::OnGetAboutResourceJobDone(
    883     JobID job_id,
    884     const google_apis::GetAboutResourceCallback& callback,
    885     google_apis::GDataErrorCode error,
    886     scoped_ptr<google_apis::AboutResource> about_resource) {
    887   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    888   DCHECK(!callback.is_null());
    889 
    890   if (OnJobDone(job_id, error))
    891     callback.Run(error, about_resource.Pass());
    892 }
    893 
    894 void JobScheduler::OnGetShareUrlJobDone(
    895     JobID job_id,
    896     const google_apis::GetShareUrlCallback& callback,
    897     google_apis::GDataErrorCode error,
    898     const GURL& share_url) {
    899   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    900   DCHECK(!callback.is_null());
    901 
    902   if (OnJobDone(job_id, error))
    903     callback.Run(error, share_url);
    904 }
    905 
    906 void JobScheduler::OnGetAppListJobDone(
    907     JobID job_id,
    908     const google_apis::GetAppListCallback& callback,
    909     google_apis::GDataErrorCode error,
    910     scoped_ptr<google_apis::AppList> app_list) {
    911   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    912   DCHECK(!callback.is_null());
    913 
    914   if (OnJobDone(job_id, error))
    915     callback.Run(error, app_list.Pass());
    916 }
    917 
    918 void JobScheduler::OnEntryActionJobDone(
    919     JobID job_id,
    920     const google_apis::EntryActionCallback& callback,
    921     google_apis::GDataErrorCode error) {
    922   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    923   DCHECK(!callback.is_null());
    924 
    925   if (OnJobDone(job_id, error))
    926     callback.Run(error);
    927 }
    928 
    929 void JobScheduler::OnDownloadActionJobDone(
    930     JobID job_id,
    931     const google_apis::DownloadActionCallback& callback,
    932     google_apis::GDataErrorCode error,
    933     const base::FilePath& temp_file) {
    934   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    935   DCHECK(!callback.is_null());
    936 
    937   if (OnJobDone(job_id, error))
    938     callback.Run(error, temp_file);
    939 }
    940 
    941 void JobScheduler::OnUploadCompletionJobDone(
    942     JobID job_id,
    943     const ResumeUploadParams& resume_params,
    944     const google_apis::GetResourceEntryCallback& callback,
    945     google_apis::GDataErrorCode error,
    946     const GURL& upload_location,
    947     scoped_ptr<google_apis::ResourceEntry> resource_entry) {
    948   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    949   DCHECK(!callback.is_null());
    950 
    951   if (!upload_location.is_empty()) {
    952     // If upload_location is available, update the task to resume the
    953     // upload process from the terminated point.
    954     // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
    955     // so OnJobDone called below will be in charge to re-queue the job.
    956     JobEntry* job_entry = job_map_.Lookup(job_id);
    957     DCHECK(job_entry);
    958 
    959     ResumeUploadFileParams params;
    960     params.upload_location = upload_location;
    961     params.local_file_path = resume_params.local_file_path;
    962     params.content_type = resume_params.content_type;
    963     params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone,
    964                                  weak_ptr_factory_.GetWeakPtr(),
    965                                  job_id,
    966                                  job_entry->task,
    967                                  callback);
    968     params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
    969                                           weak_ptr_factory_.GetWeakPtr(),
    970                                           job_id);
    971     job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params);
    972   }
    973 
    974   if (OnJobDone(job_id, error))
    975     callback.Run(error, resource_entry.Pass());
    976 }
    977 
    978 void JobScheduler::OnResumeUploadFileDone(
    979     JobID job_id,
    980     const base::Callback<google_apis::CancelCallback()>& original_task,
    981     const google_apis::GetResourceEntryCallback& callback,
    982     google_apis::GDataErrorCode error,
    983     const GURL& upload_location,
    984     scoped_ptr<google_apis::ResourceEntry> resource_entry) {
    985   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    986   DCHECK(!original_task.is_null());
    987   DCHECK(!callback.is_null());
    988 
    989   if (upload_location.is_empty()) {
    990     // If upload_location is not available, we should discard it and stop trying
    991     // to resume. Restore the original task.
    992     JobEntry* job_entry = job_map_.Lookup(job_id);
    993     DCHECK(job_entry);
    994     job_entry->task = original_task;
    995   }
    996 
    997   if (OnJobDone(job_id, error))
    998     callback.Run(error, resource_entry.Pass());
    999 }
   1000 
   1001 void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) {
   1002   JobEntry* job_entry = job_map_.Lookup(job_id);
   1003   DCHECK(job_entry);
   1004 
   1005   job_entry->job_info.num_completed_bytes = progress;
   1006   job_entry->job_info.num_total_bytes = total;
   1007   NotifyJobUpdated(job_entry->job_info);
   1008 }
   1009 
   1010 void JobScheduler::OnConnectionTypeChanged(
   1011     net::NetworkChangeNotifier::ConnectionType type) {
   1012   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
   1013 
   1014   // Resume the job loop.
   1015   // Note that we don't need to check the network connection status as it will
   1016   // be checked in GetCurrentAcceptedPriority().
   1017   for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i)
   1018     DoJobLoop(static_cast<QueueType>(i));
   1019 }
   1020 
   1021 JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) {
   1022   switch (type) {
   1023     case TYPE_GET_ABOUT_RESOURCE:
   1024     case TYPE_GET_APP_LIST:
   1025     case TYPE_GET_ALL_RESOURCE_LIST:
   1026     case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY:
   1027     case TYPE_SEARCH:
   1028     case TYPE_GET_CHANGE_LIST:
   1029     case TYPE_CONTINUE_GET_RESOURCE_LIST:
   1030     case TYPE_GET_RESOURCE_ENTRY:
   1031     case TYPE_GET_SHARE_URL:
   1032     case TYPE_DELETE_RESOURCE:
   1033     case TYPE_COPY_RESOURCE:
   1034     case TYPE_COPY_HOSTED_DOCUMENT:
   1035     case TYPE_RENAME_RESOURCE:
   1036     case TYPE_TOUCH_RESOURCE:
   1037     case TYPE_ADD_RESOURCE_TO_DIRECTORY:
   1038     case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY:
   1039     case TYPE_ADD_NEW_DIRECTORY:
   1040     case TYPE_CREATE_FILE:
   1041       return METADATA_QUEUE;
   1042 
   1043     case TYPE_DOWNLOAD_FILE:
   1044     case TYPE_UPLOAD_NEW_FILE:
   1045     case TYPE_UPLOAD_EXISTING_FILE:
   1046       return FILE_QUEUE;
   1047   }
   1048   NOTREACHED();
   1049   return FILE_QUEUE;
   1050 }
   1051 
   1052 void JobScheduler::AbortNotRunningJob(JobEntry* job,
   1053                                       google_apis::GDataErrorCode error) {
   1054   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
   1055 
   1056   const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time;
   1057   const QueueType queue_type = GetJobQueueType(job->job_info.job_type);
   1058   util::Log(logging::LOG_INFO,
   1059             "Job aborted: %s => %s (elapsed time: %sms) - %s",
   1060             job->job_info.ToString().c_str(),
   1061             GDataErrorCodeToString(error).c_str(),
   1062             base::Int64ToString(elapsed.InMilliseconds()).c_str(),
   1063             GetQueueInfo(queue_type).c_str());
   1064 
   1065   base::Callback<void(google_apis::GDataErrorCode)> callback =
   1066       job->abort_callback;
   1067   queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id);
   1068   NotifyJobDone(job->job_info, error);
   1069   job_map_.Remove(job->job_info.job_id);
   1070   base::MessageLoopProxy::current()->PostTask(FROM_HERE,
   1071                                               base::Bind(callback, error));
   1072 }
   1073 
   1074 void JobScheduler::NotifyJobAdded(const JobInfo& job_info) {
   1075   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
   1076   FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info));
   1077 }
   1078 
   1079 void JobScheduler::NotifyJobDone(const JobInfo& job_info,
   1080                                  google_apis::GDataErrorCode error) {
   1081   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
   1082   FOR_EACH_OBSERVER(JobListObserver, observer_list_,
   1083                     OnJobDone(job_info, GDataToFileError(error)));
   1084 }
   1085 
   1086 void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) {
   1087   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
   1088   FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info));
   1089 }
   1090 
   1091 std::string JobScheduler::GetQueueInfo(QueueType type) const {
   1092   return QueueTypeToString(type) + " " + queue_[type]->ToString();
   1093 }
   1094 
   1095 // static
   1096 std::string JobScheduler::QueueTypeToString(QueueType type) {
   1097   switch (type) {
   1098     case METADATA_QUEUE:
   1099       return "METADATA_QUEUE";
   1100     case FILE_QUEUE:
   1101       return "FILE_QUEUE";
   1102     case NUM_QUEUES:
   1103       break;  // This value is just a sentinel. Should never be used.
   1104   }
   1105   NOTREACHED();
   1106   return "";
   1107 }
   1108 
   1109 }  // namespace drive
   1110