Home | History | Annotate | Download | only in drive
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "chrome/browser/chromeos/drive/job_scheduler.h"
      6 
      7 #include "base/message_loop/message_loop.h"
      8 #include "base/prefs/pref_service.h"
      9 #include "base/rand_util.h"
     10 #include "base/strings/string_number_conversions.h"
     11 #include "base/strings/stringprintf.h"
     12 #include "chrome/browser/chromeos/drive/file_system_util.h"
     13 #include "chrome/browser/chromeos/drive/logging.h"
     14 #include "chrome/common/pref_names.h"
     15 #include "content/public/browser/browser_thread.h"
     16 #include "google_apis/drive/drive_api_parser.h"
     17 
     18 using content::BrowserThread;
     19 
     20 namespace drive {
     21 
     22 namespace {
     23 
     24 // All jobs are retried at maximum of kMaxRetryCount when they fail due to
     25 // throttling or server error.  The delay before retrying a job is shared among
     26 // jobs. It doubles in length on each failure, upto 2^kMaxThrottleCount seconds.
     27 //
     28 // According to the API documentation, kMaxRetryCount should be the same as
     29 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
     30 // But currently multiplied by 2 to ensure upload related jobs retried for a
     31 // sufficient number of times. crbug.com/269918
     32 const int kMaxThrottleCount = 4;
     33 const int kMaxRetryCount = 2 * kMaxThrottleCount;
     34 
     35 // GetDefaultValue returns a value constructed by the default constructor.
     36 template<typename T> struct DefaultValueCreator {
     37   static T GetDefaultValue() { return T(); }
     38 };
     39 template<typename T> struct DefaultValueCreator<const T&> {
     40   static T GetDefaultValue() { return T(); }
     41 };
     42 
     43 // Helper of CreateErrorRunCallback implementation.
     44 // Provides:
     45 // - ResultType; the type of the Callback which should be returned by
     46 //     CreateErrorRunCallback.
     47 // - Run(): a static function which takes the original |callback| and |error|,
     48 //     and runs the |callback|.Run() with the error code and default values
     49 //     for remaining arguments.
     50 template<typename CallbackType> struct CreateErrorRunCallbackHelper;
     51 
     52 // CreateErrorRunCallback with two arguments.
     53 template<typename P1>
     54 struct CreateErrorRunCallbackHelper<void(google_apis::GDataErrorCode, P1)> {
     55   static void Run(
     56       const base::Callback<void(google_apis::GDataErrorCode, P1)>& callback,
     57       google_apis::GDataErrorCode error) {
     58     callback.Run(error, DefaultValueCreator<P1>::GetDefaultValue());
     59   }
     60 };
     61 
     62 // Returns a callback with the tail parameter bound to its default value.
     63 // In other words, returned_callback.Run(error) runs callback.Run(error, T()).
     64 template<typename CallbackType>
     65 base::Callback<void(google_apis::GDataErrorCode)>
     66 CreateErrorRunCallback(const base::Callback<CallbackType>& callback) {
     67   return base::Bind(&CreateErrorRunCallbackHelper<CallbackType>::Run, callback);
     68 }
     69 
     70 // Parameter struct for RunUploadNewFile.
     71 struct UploadNewFileParams {
     72   std::string parent_resource_id;
     73   base::FilePath local_file_path;
     74   std::string title;
     75   std::string content_type;
     76   UploadCompletionCallback callback;
     77   google_apis::ProgressCallback progress_callback;
     78 };
     79 
     80 // Helper function to work around the arity limitation of base::Bind.
     81 google_apis::CancelCallback RunUploadNewFile(
     82     DriveUploaderInterface* uploader,
     83     const UploadNewFileParams& params) {
     84   return uploader->UploadNewFile(params.parent_resource_id,
     85                                  params.local_file_path,
     86                                  params.title,
     87                                  params.content_type,
     88                                  params.callback,
     89                                  params.progress_callback);
     90 }
     91 
     92 // Parameter struct for RunUploadExistingFile.
     93 struct UploadExistingFileParams {
     94   std::string resource_id;
     95   base::FilePath local_file_path;
     96   std::string content_type;
     97   std::string etag;
     98   UploadCompletionCallback callback;
     99   google_apis::ProgressCallback progress_callback;
    100 };
    101 
    102 // Helper function to work around the arity limitation of base::Bind.
    103 google_apis::CancelCallback RunUploadExistingFile(
    104     DriveUploaderInterface* uploader,
    105     const UploadExistingFileParams& params) {
    106   return uploader->UploadExistingFile(params.resource_id,
    107                                       params.local_file_path,
    108                                       params.content_type,
    109                                       params.etag,
    110                                       params.callback,
    111                                       params.progress_callback);
    112 }
    113 
    114 // Parameter struct for RunResumeUploadFile.
    115 struct ResumeUploadFileParams {
    116   GURL upload_location;
    117   base::FilePath local_file_path;
    118   std::string content_type;
    119   UploadCompletionCallback callback;
    120   google_apis::ProgressCallback progress_callback;
    121 };
    122 
    123 // Helper function to adjust the return type.
    124 google_apis::CancelCallback RunResumeUploadFile(
    125     DriveUploaderInterface* uploader,
    126     const ResumeUploadFileParams& params) {
    127   return uploader->ResumeUploadFile(params.upload_location,
    128                                     params.local_file_path,
    129                                     params.content_type,
    130                                     params.callback,
    131                                     params.progress_callback);
    132 }
    133 
    134 }  // namespace
    135 
    136 // Metadata jobs are cheap, so we run them concurrently. File jobs run serially.
    137 const int JobScheduler::kMaxJobCount[] = {
    138   5,  // METADATA_QUEUE
    139   1,  // FILE_QUEUE
    140 };
    141 
    142 JobScheduler::JobEntry::JobEntry(JobType type)
    143     : job_info(type),
    144       context(ClientContext(USER_INITIATED)),
    145       retry_count(0) {
    146   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    147 }
    148 
    149 JobScheduler::JobEntry::~JobEntry() {
    150   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    151 }
    152 
    153 struct JobScheduler::ResumeUploadParams {
    154   base::FilePath drive_file_path;
    155   base::FilePath local_file_path;
    156   std::string content_type;
    157 };
    158 
    159 JobScheduler::JobScheduler(
    160     PrefService* pref_service,
    161     DriveServiceInterface* drive_service,
    162     base::SequencedTaskRunner* blocking_task_runner)
    163     : throttle_count_(0),
    164       wait_until_(base::Time::Now()),
    165       disable_throttling_(false),
    166       drive_service_(drive_service),
    167       uploader_(new DriveUploader(drive_service, blocking_task_runner)),
    168       pref_service_(pref_service),
    169       weak_ptr_factory_(this) {
    170   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    171 
    172   for (int i = 0; i < NUM_QUEUES; ++i)
    173     queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES));
    174 
    175   net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
    176 }
    177 
    178 JobScheduler::~JobScheduler() {
    179   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    180 
    181   size_t num_queued_jobs = 0;
    182   for (int i = 0; i < NUM_QUEUES; ++i)
    183     num_queued_jobs += queue_[i]->GetNumberOfJobs();
    184   DCHECK_EQ(num_queued_jobs, job_map_.size());
    185 
    186   net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
    187 }
    188 
    189 std::vector<JobInfo> JobScheduler::GetJobInfoList() {
    190   std::vector<JobInfo> job_info_list;
    191   for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
    192     job_info_list.push_back(iter.GetCurrentValue()->job_info);
    193   return job_info_list;
    194 }
    195 
    196 void JobScheduler::AddObserver(JobListObserver* observer) {
    197   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    198   observer_list_.AddObserver(observer);
    199 }
    200 
    201 void JobScheduler::RemoveObserver(JobListObserver* observer) {
    202   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    203   observer_list_.RemoveObserver(observer);
    204 }
    205 
    206 void JobScheduler::CancelJob(JobID job_id) {
    207   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    208 
    209   JobEntry* job = job_map_.Lookup(job_id);
    210   if (job) {
    211     if (job->job_info.state == STATE_RUNNING) {
    212       // If the job is running an HTTP request, cancel it via |cancel_callback|
    213       // returned from the request, and wait for termination in the normal
    214       // callback handler, OnJobDone.
    215       if (!job->cancel_callback.is_null())
    216         job->cancel_callback.Run();
    217     } else {
    218       AbortNotRunningJob(job, google_apis::GDATA_CANCELLED);
    219     }
    220   }
    221 }
    222 
    223 void JobScheduler::CancelAllJobs() {
    224   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    225 
    226   // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
    227   // removable during iteration.
    228   for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
    229     CancelJob(iter.GetCurrentKey());
    230 }
    231 
    232 void JobScheduler::GetAboutResource(
    233     const google_apis::AboutResourceCallback& callback) {
    234   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    235   DCHECK(!callback.is_null());
    236 
    237   JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE);
    238   new_job->task = base::Bind(
    239       &DriveServiceInterface::GetAboutResource,
    240       base::Unretained(drive_service_),
    241       base::Bind(&JobScheduler::OnGetAboutResourceJobDone,
    242                  weak_ptr_factory_.GetWeakPtr(),
    243                  new_job->job_info.job_id,
    244                  callback));
    245   new_job->abort_callback = CreateErrorRunCallback(callback);
    246   StartJob(new_job);
    247 }
    248 
    249 void JobScheduler::GetAppList(const google_apis::AppListCallback& callback) {
    250   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    251   DCHECK(!callback.is_null());
    252 
    253   JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST);
    254   new_job->task = base::Bind(
    255       &DriveServiceInterface::GetAppList,
    256       base::Unretained(drive_service_),
    257       base::Bind(&JobScheduler::OnGetAppListJobDone,
    258                  weak_ptr_factory_.GetWeakPtr(),
    259                  new_job->job_info.job_id,
    260                  callback));
    261   new_job->abort_callback = CreateErrorRunCallback(callback);
    262   StartJob(new_job);
    263 }
    264 
    265 void JobScheduler::GetAllResourceList(
    266     const google_apis::GetResourceListCallback& callback) {
    267   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    268   DCHECK(!callback.is_null());
    269 
    270   JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST);
    271   new_job->task = base::Bind(
    272       &DriveServiceInterface::GetAllResourceList,
    273       base::Unretained(drive_service_),
    274       base::Bind(&JobScheduler::OnGetResourceListJobDone,
    275                  weak_ptr_factory_.GetWeakPtr(),
    276                  new_job->job_info.job_id,
    277                  callback));
    278   new_job->abort_callback = CreateErrorRunCallback(callback);
    279   StartJob(new_job);
    280 }
    281 
    282 void JobScheduler::GetResourceListInDirectory(
    283     const std::string& directory_resource_id,
    284     const google_apis::GetResourceListCallback& callback) {
    285   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    286   DCHECK(!callback.is_null());
    287 
    288   JobEntry* new_job = CreateNewJob(
    289       TYPE_GET_RESOURCE_LIST_IN_DIRECTORY);
    290   new_job->task = base::Bind(
    291       &DriveServiceInterface::GetResourceListInDirectory,
    292       base::Unretained(drive_service_),
    293       directory_resource_id,
    294       base::Bind(&JobScheduler::OnGetResourceListJobDone,
    295                  weak_ptr_factory_.GetWeakPtr(),
    296                  new_job->job_info.job_id,
    297                  callback));
    298   new_job->abort_callback = CreateErrorRunCallback(callback);
    299   StartJob(new_job);
    300 }
    301 
    302 void JobScheduler::Search(
    303     const std::string& search_query,
    304     const google_apis::GetResourceListCallback& callback) {
    305   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    306   DCHECK(!callback.is_null());
    307 
    308   JobEntry* new_job = CreateNewJob(TYPE_SEARCH);
    309   new_job->task = base::Bind(
    310       &DriveServiceInterface::Search,
    311       base::Unretained(drive_service_),
    312       search_query,
    313       base::Bind(&JobScheduler::OnGetResourceListJobDone,
    314                  weak_ptr_factory_.GetWeakPtr(),
    315                  new_job->job_info.job_id,
    316                  callback));
    317   new_job->abort_callback = CreateErrorRunCallback(callback);
    318   StartJob(new_job);
    319 }
    320 
    321 void JobScheduler::GetChangeList(
    322     int64 start_changestamp,
    323     const google_apis::GetResourceListCallback& callback) {
    324   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    325   DCHECK(!callback.is_null());
    326 
    327   JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST);
    328   new_job->task = base::Bind(
    329       &DriveServiceInterface::GetChangeList,
    330       base::Unretained(drive_service_),
    331       start_changestamp,
    332       base::Bind(&JobScheduler::OnGetResourceListJobDone,
    333                  weak_ptr_factory_.GetWeakPtr(),
    334                  new_job->job_info.job_id,
    335                  callback));
    336   new_job->abort_callback = CreateErrorRunCallback(callback);
    337   StartJob(new_job);
    338 }
    339 
    340 void JobScheduler::GetRemainingChangeList(
    341     const GURL& next_link,
    342     const google_apis::GetResourceListCallback& callback) {
    343   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    344   DCHECK(!callback.is_null());
    345 
    346   JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST);
    347   new_job->task = base::Bind(
    348       &DriveServiceInterface::GetRemainingChangeList,
    349       base::Unretained(drive_service_),
    350       next_link,
    351       base::Bind(&JobScheduler::OnGetResourceListJobDone,
    352                  weak_ptr_factory_.GetWeakPtr(),
    353                  new_job->job_info.job_id,
    354                  callback));
    355   new_job->abort_callback = CreateErrorRunCallback(callback);
    356   StartJob(new_job);
    357 }
    358 
    359 void JobScheduler::GetRemainingFileList(
    360     const GURL& next_link,
    361     const google_apis::GetResourceListCallback& callback) {
    362   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    363   DCHECK(!callback.is_null());
    364 
    365   JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_FILE_LIST);
    366   new_job->task = base::Bind(
    367       &DriveServiceInterface::GetRemainingFileList,
    368       base::Unretained(drive_service_),
    369       next_link,
    370       base::Bind(&JobScheduler::OnGetResourceListJobDone,
    371                  weak_ptr_factory_.GetWeakPtr(),
    372                  new_job->job_info.job_id,
    373                  callback));
    374   new_job->abort_callback = CreateErrorRunCallback(callback);
    375   StartJob(new_job);
    376 }
    377 
    378 void JobScheduler::GetResourceEntry(
    379     const std::string& resource_id,
    380     const ClientContext& context,
    381     const google_apis::GetResourceEntryCallback& callback) {
    382   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    383   DCHECK(!callback.is_null());
    384 
    385   JobEntry* new_job = CreateNewJob(TYPE_GET_RESOURCE_ENTRY);
    386   new_job->context = context;
    387   new_job->task = base::Bind(
    388       &DriveServiceInterface::GetResourceEntry,
    389       base::Unretained(drive_service_),
    390       resource_id,
    391       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
    392                  weak_ptr_factory_.GetWeakPtr(),
    393                  new_job->job_info.job_id,
    394                  callback));
    395   new_job->abort_callback = CreateErrorRunCallback(callback);
    396   StartJob(new_job);
    397 }
    398 
    399 void JobScheduler::GetShareUrl(
    400     const std::string& resource_id,
    401     const GURL& embed_origin,
    402     const ClientContext& context,
    403     const google_apis::GetShareUrlCallback& callback) {
    404   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    405   DCHECK(!callback.is_null());
    406 
    407   JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL);
    408   new_job->context = context;
    409   new_job->task = base::Bind(
    410       &DriveServiceInterface::GetShareUrl,
    411       base::Unretained(drive_service_),
    412       resource_id,
    413       embed_origin,
    414       base::Bind(&JobScheduler::OnGetShareUrlJobDone,
    415                  weak_ptr_factory_.GetWeakPtr(),
    416                  new_job->job_info.job_id,
    417                  callback));
    418   new_job->abort_callback = CreateErrorRunCallback(callback);
    419   StartJob(new_job);
    420 }
    421 
    422 void JobScheduler::TrashResource(
    423     const std::string& resource_id,
    424     const ClientContext& context,
    425     const google_apis::EntryActionCallback& callback) {
    426   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    427   DCHECK(!callback.is_null());
    428 
    429   JobEntry* new_job = CreateNewJob(TYPE_TRASH_RESOURCE);
    430   new_job->context = context;
    431   new_job->task = base::Bind(
    432       &DriveServiceInterface::TrashResource,
    433       base::Unretained(drive_service_),
    434       resource_id,
    435       base::Bind(&JobScheduler::OnEntryActionJobDone,
    436                  weak_ptr_factory_.GetWeakPtr(),
    437                  new_job->job_info.job_id,
    438                  callback));
    439   new_job->abort_callback = callback;
    440   StartJob(new_job);
    441 }
    442 
    443 void JobScheduler::CopyResource(
    444     const std::string& resource_id,
    445     const std::string& parent_resource_id,
    446     const std::string& new_title,
    447     const base::Time& last_modified,
    448     const google_apis::GetResourceEntryCallback& callback) {
    449   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    450   DCHECK(!callback.is_null());
    451 
    452   JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE);
    453   new_job->task = base::Bind(
    454       &DriveServiceInterface::CopyResource,
    455       base::Unretained(drive_service_),
    456       resource_id,
    457       parent_resource_id,
    458       new_title,
    459       last_modified,
    460       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
    461                  weak_ptr_factory_.GetWeakPtr(),
    462                  new_job->job_info.job_id,
    463                  callback));
    464   new_job->abort_callback = CreateErrorRunCallback(callback);
    465   StartJob(new_job);
    466 }
    467 
    468 void JobScheduler::UpdateResource(
    469     const std::string& resource_id,
    470     const std::string& parent_resource_id,
    471     const std::string& new_title,
    472     const base::Time& last_modified,
    473     const base::Time& last_viewed_by_me,
    474     const ClientContext& context,
    475     const google_apis::GetResourceEntryCallback& callback) {
    476   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    477   DCHECK(!callback.is_null());
    478 
    479   JobEntry* new_job = CreateNewJob(TYPE_UPDATE_RESOURCE);
    480   new_job->context = context;
    481   new_job->task = base::Bind(
    482       &DriveServiceInterface::UpdateResource,
    483       base::Unretained(drive_service_),
    484       resource_id,
    485       parent_resource_id,
    486       new_title,
    487       last_modified,
    488       last_viewed_by_me,
    489       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
    490                  weak_ptr_factory_.GetWeakPtr(),
    491                  new_job->job_info.job_id,
    492                  callback));
    493   new_job->abort_callback = CreateErrorRunCallback(callback);
    494   StartJob(new_job);
    495 }
    496 
    497 void JobScheduler::RenameResource(
    498     const std::string& resource_id,
    499     const std::string& new_title,
    500     const google_apis::EntryActionCallback& callback) {
    501   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    502   DCHECK(!callback.is_null());
    503 
    504   JobEntry* new_job = CreateNewJob(TYPE_RENAME_RESOURCE);
    505   new_job->task = base::Bind(
    506       &DriveServiceInterface::RenameResource,
    507       base::Unretained(drive_service_),
    508       resource_id,
    509       new_title,
    510       base::Bind(&JobScheduler::OnEntryActionJobDone,
    511                  weak_ptr_factory_.GetWeakPtr(),
    512                  new_job->job_info.job_id,
    513                  callback));
    514   new_job->abort_callback = callback;
    515   StartJob(new_job);
    516 }
    517 
    518 void JobScheduler::AddResourceToDirectory(
    519     const std::string& parent_resource_id,
    520     const std::string& resource_id,
    521     const google_apis::EntryActionCallback& callback) {
    522   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    523   DCHECK(!callback.is_null());
    524 
    525   JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY);
    526   new_job->task = base::Bind(
    527       &DriveServiceInterface::AddResourceToDirectory,
    528       base::Unretained(drive_service_),
    529       parent_resource_id,
    530       resource_id,
    531       base::Bind(&JobScheduler::OnEntryActionJobDone,
    532                  weak_ptr_factory_.GetWeakPtr(),
    533                  new_job->job_info.job_id,
    534                  callback));
    535   new_job->abort_callback = callback;
    536   StartJob(new_job);
    537 }
    538 
    539 void JobScheduler::RemoveResourceFromDirectory(
    540     const std::string& parent_resource_id,
    541     const std::string& resource_id,
    542     const ClientContext& context,
    543     const google_apis::EntryActionCallback& callback) {
    544   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    545 
    546   JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY);
    547   new_job->context = context;
    548   new_job->task = base::Bind(
    549       &DriveServiceInterface::RemoveResourceFromDirectory,
    550       base::Unretained(drive_service_),
    551       parent_resource_id,
    552       resource_id,
    553       base::Bind(&JobScheduler::OnEntryActionJobDone,
    554                  weak_ptr_factory_.GetWeakPtr(),
    555                  new_job->job_info.job_id,
    556                  callback));
    557   new_job->abort_callback = callback;
    558   StartJob(new_job);
    559 }
    560 
    561 void JobScheduler::AddNewDirectory(
    562     const std::string& parent_resource_id,
    563     const std::string& directory_title,
    564     const google_apis::GetResourceEntryCallback& callback) {
    565   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    566 
    567   JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY);
    568   new_job->task = base::Bind(
    569       &DriveServiceInterface::AddNewDirectory,
    570       base::Unretained(drive_service_),
    571       parent_resource_id,
    572       directory_title,
    573       base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
    574                  weak_ptr_factory_.GetWeakPtr(),
    575                  new_job->job_info.job_id,
    576                  callback));
    577   new_job->abort_callback = CreateErrorRunCallback(callback);
    578   StartJob(new_job);
    579 }
    580 
    581 JobID JobScheduler::DownloadFile(
    582     const base::FilePath& virtual_path,
    583     int64 expected_file_size,
    584     const base::FilePath& local_cache_path,
    585     const std::string& resource_id,
    586     const ClientContext& context,
    587     const google_apis::DownloadActionCallback& download_action_callback,
    588     const google_apis::GetContentCallback& get_content_callback) {
    589   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    590 
    591   JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE);
    592   new_job->job_info.file_path = virtual_path;
    593   new_job->job_info.num_total_bytes = expected_file_size;
    594   new_job->context = context;
    595   new_job->task = base::Bind(
    596       &DriveServiceInterface::DownloadFile,
    597       base::Unretained(drive_service_),
    598       local_cache_path,
    599       resource_id,
    600       base::Bind(&JobScheduler::OnDownloadActionJobDone,
    601                  weak_ptr_factory_.GetWeakPtr(),
    602                  new_job->job_info.job_id,
    603                  download_action_callback),
    604       get_content_callback,
    605       base::Bind(&JobScheduler::UpdateProgress,
    606                  weak_ptr_factory_.GetWeakPtr(),
    607                  new_job->job_info.job_id));
    608   new_job->abort_callback = CreateErrorRunCallback(download_action_callback);
    609   StartJob(new_job);
    610   return new_job->job_info.job_id;
    611 }
    612 
    613 void JobScheduler::UploadNewFile(
    614     const std::string& parent_resource_id,
    615     const base::FilePath& drive_file_path,
    616     const base::FilePath& local_file_path,
    617     const std::string& title,
    618     const std::string& content_type,
    619     const ClientContext& context,
    620     const google_apis::GetResourceEntryCallback& callback) {
    621   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    622 
    623   JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE);
    624   new_job->job_info.file_path = drive_file_path;
    625   new_job->context = context;
    626 
    627   UploadNewFileParams params;
    628   params.parent_resource_id = parent_resource_id;
    629   params.local_file_path = local_file_path;
    630   params.title = title;
    631   params.content_type = content_type;
    632 
    633   ResumeUploadParams resume_params;
    634   resume_params.local_file_path = params.local_file_path;
    635   resume_params.content_type = params.content_type;
    636 
    637   params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
    638                                weak_ptr_factory_.GetWeakPtr(),
    639                                new_job->job_info.job_id,
    640                                resume_params,
    641                                callback);
    642   params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
    643                                         weak_ptr_factory_.GetWeakPtr(),
    644                                         new_job->job_info.job_id);
    645   new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
    646   new_job->abort_callback = CreateErrorRunCallback(callback);
    647   StartJob(new_job);
    648 }
    649 
    650 void JobScheduler::UploadExistingFile(
    651     const std::string& resource_id,
    652     const base::FilePath& drive_file_path,
    653     const base::FilePath& local_file_path,
    654     const std::string& content_type,
    655     const std::string& etag,
    656     const ClientContext& context,
    657     const google_apis::GetResourceEntryCallback& callback) {
    658   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    659 
    660   JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE);
    661   new_job->job_info.file_path = drive_file_path;
    662   new_job->context = context;
    663 
    664   UploadExistingFileParams params;
    665   params.resource_id = resource_id;
    666   params.local_file_path = local_file_path;
    667   params.content_type = content_type;
    668   params.etag = etag;
    669 
    670   ResumeUploadParams resume_params;
    671   resume_params.local_file_path = params.local_file_path;
    672   resume_params.content_type = params.content_type;
    673 
    674   params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
    675                                weak_ptr_factory_.GetWeakPtr(),
    676                                new_job->job_info.job_id,
    677                                resume_params,
    678                                callback);
    679   params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
    680                                         weak_ptr_factory_.GetWeakPtr(),
    681                                         new_job->job_info.job_id);
    682   new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params);
    683   new_job->abort_callback = CreateErrorRunCallback(callback);
    684   StartJob(new_job);
    685 }
    686 
    687 void JobScheduler::CreateFile(
    688     const std::string& parent_resource_id,
    689     const base::FilePath& drive_file_path,
    690     const std::string& title,
    691     const std::string& content_type,
    692     const ClientContext& context,
    693     const google_apis::GetResourceEntryCallback& callback) {
    694   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    695 
    696   const base::FilePath kDevNull(FILE_PATH_LITERAL("/dev/null"));
    697 
    698   JobEntry* new_job = CreateNewJob(TYPE_CREATE_FILE);
    699   new_job->job_info.file_path = drive_file_path;
    700   new_job->context = context;
    701 
    702   UploadNewFileParams params;
    703   params.parent_resource_id = parent_resource_id;
    704   params.local_file_path = kDevNull;  // Upload an empty file.
    705   params.title = title;
    706   params.content_type = content_type;
    707 
    708   ResumeUploadParams resume_params;
    709   resume_params.local_file_path = params.local_file_path;
    710   resume_params.content_type = params.content_type;
    711 
    712   params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
    713                                weak_ptr_factory_.GetWeakPtr(),
    714                                new_job->job_info.job_id,
    715                                resume_params,
    716                                callback);
    717   params.progress_callback = google_apis::ProgressCallback();
    718 
    719   new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
    720   new_job->abort_callback = CreateErrorRunCallback(callback);
    721   StartJob(new_job);
    722 }
    723 
    724 void JobScheduler::GetResourceListInDirectoryByWapi(
    725     const std::string& directory_resource_id,
    726     const google_apis::GetResourceListCallback& callback) {
    727   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    728   DCHECK(!callback.is_null());
    729 
    730   JobEntry* new_job = CreateNewJob(
    731       TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI);
    732   new_job->task = base::Bind(
    733       &DriveServiceInterface::GetResourceListInDirectoryByWapi,
    734       base::Unretained(drive_service_),
    735       directory_resource_id,
    736       base::Bind(&JobScheduler::OnGetResourceListJobDone,
    737                  weak_ptr_factory_.GetWeakPtr(),
    738                  new_job->job_info.job_id,
    739                  callback));
    740   new_job->abort_callback = CreateErrorRunCallback(callback);
    741   StartJob(new_job);
    742 }
    743 
    744 void JobScheduler::GetRemainingResourceList(
    745     const GURL& next_link,
    746     const google_apis::GetResourceListCallback& callback) {
    747   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    748   DCHECK(!callback.is_null());
    749 
    750   JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_RESOURCE_LIST);
    751   new_job->task = base::Bind(
    752       &DriveServiceInterface::GetRemainingResourceList,
    753       base::Unretained(drive_service_),
    754       next_link,
    755       base::Bind(&JobScheduler::OnGetResourceListJobDone,
    756                  weak_ptr_factory_.GetWeakPtr(),
    757                  new_job->job_info.job_id,
    758                  callback));
    759   new_job->abort_callback = CreateErrorRunCallback(callback);
    760   StartJob(new_job);
    761 }
    762 
    763 JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) {
    764   JobEntry* job = new JobEntry(type);
    765   job->job_info.job_id = job_map_.Add(job);  // Takes the ownership of |job|.
    766   return job;
    767 }
    768 
    769 void JobScheduler::StartJob(JobEntry* job) {
    770   DCHECK(!job->task.is_null());
    771 
    772   QueueJob(job->job_info.job_id);
    773   NotifyJobAdded(job->job_info);
    774   DoJobLoop(GetJobQueueType(job->job_info.job_type));
    775 }
    776 
    777 void JobScheduler::QueueJob(JobID job_id) {
    778   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    779 
    780   JobEntry* job_entry = job_map_.Lookup(job_id);
    781   DCHECK(job_entry);
    782   const JobInfo& job_info = job_entry->job_info;
    783 
    784   QueueType queue_type = GetJobQueueType(job_info.job_type);
    785   queue_[queue_type]->Push(job_id, job_entry->context.type);
    786 
    787   const std::string retry_prefix = job_entry->retry_count > 0 ?
    788       base::StringPrintf(" (retry %d)", job_entry->retry_count) : "";
    789   util::Log(logging::LOG_INFO,
    790             "Job queued%s: %s - %s",
    791             retry_prefix.c_str(),
    792             job_info.ToString().c_str(),
    793             GetQueueInfo(queue_type).c_str());
    794 }
    795 
    796 void JobScheduler::DoJobLoop(QueueType queue_type) {
    797   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    798 
    799   const int accepted_priority = GetCurrentAcceptedPriority(queue_type);
    800 
    801   // Abort all USER_INITAITED jobs when not accepted.
    802   if (accepted_priority < USER_INITIATED) {
    803     std::vector<JobID> jobs;
    804     queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs);
    805     for (size_t i = 0; i < jobs.size(); ++i) {
    806       JobEntry* job = job_map_.Lookup(jobs[i]);
    807       DCHECK(job);
    808       AbortNotRunningJob(job, google_apis::GDATA_NO_CONNECTION);
    809     }
    810   }
    811 
    812   // Wait when throttled.
    813   const base::Time now = base::Time::Now();
    814   if (now < wait_until_) {
    815     base::MessageLoopProxy::current()->PostDelayedTask(
    816         FROM_HERE,
    817         base::Bind(&JobScheduler::DoJobLoop,
    818                    weak_ptr_factory_.GetWeakPtr(),
    819                    queue_type),
    820         wait_until_ - now);
    821     return;
    822   }
    823 
    824   // Run the job with the highest priority in the queue.
    825   JobID job_id = -1;
    826   if (!queue_[queue_type]->PopForRun(accepted_priority, &job_id))
    827     return;
    828 
    829   JobEntry* entry = job_map_.Lookup(job_id);
    830   DCHECK(entry);
    831 
    832   JobInfo* job_info = &entry->job_info;
    833   job_info->state = STATE_RUNNING;
    834   job_info->start_time = now;
    835   NotifyJobUpdated(*job_info);
    836 
    837   entry->cancel_callback = entry->task.Run();
    838 
    839   UpdateWait();
    840 
    841   util::Log(logging::LOG_INFO,
    842             "Job started: %s - %s",
    843             job_info->ToString().c_str(),
    844             GetQueueInfo(queue_type).c_str());
    845 }
    846 
    847 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
    848   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    849 
    850   const int kNoJobShouldRun = -1;
    851 
    852   // Should stop if Drive was disabled while running the fetch loop.
    853   if (pref_service_->GetBoolean(prefs::kDisableDrive))
    854     return kNoJobShouldRun;
    855 
    856   // Should stop if the network is not online.
    857   if (net::NetworkChangeNotifier::IsOffline())
    858     return kNoJobShouldRun;
    859 
    860   // For the file queue, if it is on cellular network, only user initiated
    861   // operations are allowed to start.
    862   if (queue_type == FILE_QUEUE &&
    863       pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) &&
    864       net::NetworkChangeNotifier::IsConnectionCellular(
    865           net::NetworkChangeNotifier::GetConnectionType()))
    866     return USER_INITIATED;
    867 
    868   // Otherwise, every operations including background tasks are allowed.
    869   return BACKGROUND;
    870 }
    871 
    872 void JobScheduler::UpdateWait() {
    873   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    874 
    875   if (disable_throttling_ || throttle_count_ == 0)
    876     return;
    877 
    878   // Exponential backoff: https://developers.google.com/drive/handle-errors.
    879   base::TimeDelta delay =
    880       base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) +
    881       base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
    882   VLOG(1) << "Throttling for " << delay.InMillisecondsF();
    883 
    884   wait_until_ = std::max(wait_until_, base::Time::Now() + delay);
    885 }
    886 
    887 bool JobScheduler::OnJobDone(JobID job_id, google_apis::GDataErrorCode error) {
    888   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    889 
    890   JobEntry* job_entry = job_map_.Lookup(job_id);
    891   DCHECK(job_entry);
    892   JobInfo* job_info = &job_entry->job_info;
    893   QueueType queue_type = GetJobQueueType(job_info->job_type);
    894   queue_[queue_type]->MarkFinished(job_id);
    895 
    896   const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time;
    897   bool success = (GDataToFileError(error) == FILE_ERROR_OK);
    898   util::Log(success ? logging::LOG_INFO : logging::LOG_WARNING,
    899             "Job done: %s => %s (elapsed time: %sms) - %s",
    900             job_info->ToString().c_str(),
    901             GDataErrorCodeToString(error).c_str(),
    902             base::Int64ToString(elapsed.InMilliseconds()).c_str(),
    903             GetQueueInfo(queue_type).c_str());
    904 
    905   // Retry, depending on the error.
    906   const bool is_server_error =
    907       error == google_apis::HTTP_SERVICE_UNAVAILABLE ||
    908       error == google_apis::HTTP_INTERNAL_SERVER_ERROR;
    909   if (is_server_error) {
    910     if (throttle_count_ < kMaxThrottleCount)
    911       ++throttle_count_;
    912     UpdateWait();
    913   } else {
    914     throttle_count_ = 0;
    915   }
    916 
    917   const bool should_retry =
    918       is_server_error && job_entry->retry_count < kMaxRetryCount;
    919   if (should_retry) {
    920     job_entry->cancel_callback.Reset();
    921     job_info->state = STATE_RETRY;
    922     NotifyJobUpdated(*job_info);
    923 
    924     ++job_entry->retry_count;
    925 
    926     // Requeue the job.
    927     QueueJob(job_id);
    928   } else {
    929     NotifyJobDone(*job_info, error);
    930     // The job has finished, no retry will happen in the scheduler. Now we can
    931     // remove the job info from the map.
    932     job_map_.Remove(job_id);
    933   }
    934 
    935   // Post a task to continue the job loop.  This allows us to finish handling
    936   // the current job before starting the next one.
    937   base::MessageLoopProxy::current()->PostTask(FROM_HERE,
    938       base::Bind(&JobScheduler::DoJobLoop,
    939                  weak_ptr_factory_.GetWeakPtr(),
    940                  queue_type));
    941   return !should_retry;
    942 }
    943 
    944 void JobScheduler::OnGetResourceListJobDone(
    945     JobID job_id,
    946     const google_apis::GetResourceListCallback& callback,
    947     google_apis::GDataErrorCode error,
    948     scoped_ptr<google_apis::ResourceList> resource_list) {
    949   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    950   DCHECK(!callback.is_null());
    951 
    952   if (OnJobDone(job_id, error))
    953     callback.Run(error, resource_list.Pass());
    954 }
    955 
    956 void JobScheduler::OnGetResourceEntryJobDone(
    957     JobID job_id,
    958     const google_apis::GetResourceEntryCallback& callback,
    959     google_apis::GDataErrorCode error,
    960     scoped_ptr<google_apis::ResourceEntry> entry) {
    961   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    962   DCHECK(!callback.is_null());
    963 
    964   if (OnJobDone(job_id, error))
    965     callback.Run(error, entry.Pass());
    966 }
    967 
    968 void JobScheduler::OnGetAboutResourceJobDone(
    969     JobID job_id,
    970     const google_apis::AboutResourceCallback& callback,
    971     google_apis::GDataErrorCode error,
    972     scoped_ptr<google_apis::AboutResource> about_resource) {
    973   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    974   DCHECK(!callback.is_null());
    975 
    976   if (OnJobDone(job_id, error))
    977     callback.Run(error, about_resource.Pass());
    978 }
    979 
    980 void JobScheduler::OnGetShareUrlJobDone(
    981     JobID job_id,
    982     const google_apis::GetShareUrlCallback& callback,
    983     google_apis::GDataErrorCode error,
    984     const GURL& share_url) {
    985   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    986   DCHECK(!callback.is_null());
    987 
    988   if (OnJobDone(job_id, error))
    989     callback.Run(error, share_url);
    990 }
    991 
    992 void JobScheduler::OnGetAppListJobDone(
    993     JobID job_id,
    994     const google_apis::AppListCallback& callback,
    995     google_apis::GDataErrorCode error,
    996     scoped_ptr<google_apis::AppList> app_list) {
    997   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
    998   DCHECK(!callback.is_null());
    999 
   1000   if (OnJobDone(job_id, error))
   1001     callback.Run(error, app_list.Pass());
   1002 }
   1003 
   1004 void JobScheduler::OnEntryActionJobDone(
   1005     JobID job_id,
   1006     const google_apis::EntryActionCallback& callback,
   1007     google_apis::GDataErrorCode error) {
   1008   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
   1009   DCHECK(!callback.is_null());
   1010 
   1011   if (OnJobDone(job_id, error))
   1012     callback.Run(error);
   1013 }
   1014 
   1015 void JobScheduler::OnDownloadActionJobDone(
   1016     JobID job_id,
   1017     const google_apis::DownloadActionCallback& callback,
   1018     google_apis::GDataErrorCode error,
   1019     const base::FilePath& temp_file) {
   1020   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
   1021   DCHECK(!callback.is_null());
   1022 
   1023   if (OnJobDone(job_id, error))
   1024     callback.Run(error, temp_file);
   1025 }
   1026 
   1027 void JobScheduler::OnUploadCompletionJobDone(
   1028     JobID job_id,
   1029     const ResumeUploadParams& resume_params,
   1030     const google_apis::GetResourceEntryCallback& callback,
   1031     google_apis::GDataErrorCode error,
   1032     const GURL& upload_location,
   1033     scoped_ptr<google_apis::ResourceEntry> resource_entry) {
   1034   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
   1035   DCHECK(!callback.is_null());
   1036 
   1037   if (!upload_location.is_empty()) {
   1038     // If upload_location is available, update the task to resume the
   1039     // upload process from the terminated point.
   1040     // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
   1041     // so OnJobDone called below will be in charge to re-queue the job.
   1042     JobEntry* job_entry = job_map_.Lookup(job_id);
   1043     DCHECK(job_entry);
   1044 
   1045     ResumeUploadFileParams params;
   1046     params.upload_location = upload_location;
   1047     params.local_file_path = resume_params.local_file_path;
   1048     params.content_type = resume_params.content_type;
   1049     params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone,
   1050                                  weak_ptr_factory_.GetWeakPtr(),
   1051                                  job_id,
   1052                                  job_entry->task,
   1053                                  callback);
   1054     params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
   1055                                           weak_ptr_factory_.GetWeakPtr(),
   1056                                           job_id);
   1057     job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params);
   1058   }
   1059 
   1060   if (OnJobDone(job_id, error))
   1061     callback.Run(error, resource_entry.Pass());
   1062 }
   1063 
   1064 void JobScheduler::OnResumeUploadFileDone(
   1065     JobID job_id,
   1066     const base::Callback<google_apis::CancelCallback()>& original_task,
   1067     const google_apis::GetResourceEntryCallback& callback,
   1068     google_apis::GDataErrorCode error,
   1069     const GURL& upload_location,
   1070     scoped_ptr<google_apis::ResourceEntry> resource_entry) {
   1071   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
   1072   DCHECK(!original_task.is_null());
   1073   DCHECK(!callback.is_null());
   1074 
   1075   if (upload_location.is_empty()) {
   1076     // If upload_location is not available, we should discard it and stop trying
   1077     // to resume. Restore the original task.
   1078     JobEntry* job_entry = job_map_.Lookup(job_id);
   1079     DCHECK(job_entry);
   1080     job_entry->task = original_task;
   1081   }
   1082 
   1083   if (OnJobDone(job_id, error))
   1084     callback.Run(error, resource_entry.Pass());
   1085 }
   1086 
   1087 void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) {
   1088   JobEntry* job_entry = job_map_.Lookup(job_id);
   1089   DCHECK(job_entry);
   1090 
   1091   job_entry->job_info.num_completed_bytes = progress;
   1092   if (total != -1)
   1093     job_entry->job_info.num_total_bytes = total;
   1094   NotifyJobUpdated(job_entry->job_info);
   1095 }
   1096 
   1097 void JobScheduler::OnConnectionTypeChanged(
   1098     net::NetworkChangeNotifier::ConnectionType type) {
   1099   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
   1100 
   1101   // Resume the job loop.
   1102   // Note that we don't need to check the network connection status as it will
   1103   // be checked in GetCurrentAcceptedPriority().
   1104   for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i)
   1105     DoJobLoop(static_cast<QueueType>(i));
   1106 }
   1107 
   1108 JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) {
   1109   switch (type) {
   1110     case TYPE_GET_ABOUT_RESOURCE:
   1111     case TYPE_GET_APP_LIST:
   1112     case TYPE_GET_ALL_RESOURCE_LIST:
   1113     case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY:
   1114     case TYPE_SEARCH:
   1115     case TYPE_GET_CHANGE_LIST:
   1116     case TYPE_GET_REMAINING_CHANGE_LIST:
   1117     case TYPE_GET_REMAINING_FILE_LIST:
   1118     case TYPE_GET_RESOURCE_ENTRY:
   1119     case TYPE_GET_SHARE_URL:
   1120     case TYPE_TRASH_RESOURCE:
   1121     case TYPE_COPY_RESOURCE:
   1122     case TYPE_UPDATE_RESOURCE:
   1123     case TYPE_RENAME_RESOURCE:
   1124     case TYPE_ADD_RESOURCE_TO_DIRECTORY:
   1125     case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY:
   1126     case TYPE_ADD_NEW_DIRECTORY:
   1127     case TYPE_CREATE_FILE:
   1128     case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI:
   1129     case TYPE_GET_REMAINING_RESOURCE_LIST:
   1130       return METADATA_QUEUE;
   1131 
   1132     case TYPE_DOWNLOAD_FILE:
   1133     case TYPE_UPLOAD_NEW_FILE:
   1134     case TYPE_UPLOAD_EXISTING_FILE:
   1135       return FILE_QUEUE;
   1136   }
   1137   NOTREACHED();
   1138   return FILE_QUEUE;
   1139 }
   1140 
   1141 void JobScheduler::AbortNotRunningJob(JobEntry* job,
   1142                                       google_apis::GDataErrorCode error) {
   1143   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
   1144 
   1145   const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time;
   1146   const QueueType queue_type = GetJobQueueType(job->job_info.job_type);
   1147   util::Log(logging::LOG_INFO,
   1148             "Job aborted: %s => %s (elapsed time: %sms) - %s",
   1149             job->job_info.ToString().c_str(),
   1150             GDataErrorCodeToString(error).c_str(),
   1151             base::Int64ToString(elapsed.InMilliseconds()).c_str(),
   1152             GetQueueInfo(queue_type).c_str());
   1153 
   1154   base::Callback<void(google_apis::GDataErrorCode)> callback =
   1155       job->abort_callback;
   1156   queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id);
   1157   NotifyJobDone(job->job_info, error);
   1158   job_map_.Remove(job->job_info.job_id);
   1159   base::MessageLoopProxy::current()->PostTask(FROM_HERE,
   1160                                               base::Bind(callback, error));
   1161 }
   1162 
   1163 void JobScheduler::NotifyJobAdded(const JobInfo& job_info) {
   1164   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
   1165   FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info));
   1166 }
   1167 
   1168 void JobScheduler::NotifyJobDone(const JobInfo& job_info,
   1169                                  google_apis::GDataErrorCode error) {
   1170   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
   1171   FOR_EACH_OBSERVER(JobListObserver, observer_list_,
   1172                     OnJobDone(job_info, GDataToFileError(error)));
   1173 }
   1174 
   1175 void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) {
   1176   DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
   1177   FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info));
   1178 }
   1179 
   1180 std::string JobScheduler::GetQueueInfo(QueueType type) const {
   1181   return QueueTypeToString(type) + " " + queue_[type]->ToString();
   1182 }
   1183 
   1184 // static
   1185 std::string JobScheduler::QueueTypeToString(QueueType type) {
   1186   switch (type) {
   1187     case METADATA_QUEUE:
   1188       return "METADATA_QUEUE";
   1189     case FILE_QUEUE:
   1190       return "FILE_QUEUE";
   1191     case NUM_QUEUES:
   1192       break;  // This value is just a sentinel. Should never be used.
   1193   }
   1194   NOTREACHED();
   1195   return "";
   1196 }
   1197 
   1198 }  // namespace drive
   1199