Home | History | Annotate | Download | only in drive_backend
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "chrome/browser/sync_file_system/drive_backend/sync_engine.h"
      6 
      7 #include <vector>
      8 
      9 #include "base/bind.h"
     10 #include "base/metrics/histogram.h"
     11 #include "base/threading/sequenced_worker_pool.h"
     12 #include "base/time/time.h"
     13 #include "base/values.h"
     14 #include "chrome/browser/drive/drive_api_service.h"
     15 #include "chrome/browser/drive/drive_notification_manager.h"
     16 #include "chrome/browser/drive/drive_notification_manager_factory.h"
     17 #include "chrome/browser/drive/drive_service_interface.h"
     18 #include "chrome/browser/drive/drive_uploader.h"
     19 #include "chrome/browser/extensions/extension_service.h"
     20 #include "chrome/browser/profiles/profile.h"
     21 #include "chrome/browser/signin/profile_oauth2_token_service_factory.h"
     22 #include "chrome/browser/signin/signin_manager_factory.h"
     23 #include "chrome/browser/sync_file_system/drive_backend/callback_helper.h"
     24 #include "chrome/browser/sync_file_system/drive_backend/conflict_resolver.h"
     25 #include "chrome/browser/sync_file_system/drive_backend/drive_backend_constants.h"
     26 #include "chrome/browser/sync_file_system/drive_backend/drive_service_on_worker.h"
     27 #include "chrome/browser/sync_file_system/drive_backend/drive_service_wrapper.h"
     28 #include "chrome/browser/sync_file_system/drive_backend/drive_uploader_on_worker.h"
     29 #include "chrome/browser/sync_file_system/drive_backend/drive_uploader_wrapper.h"
     30 #include "chrome/browser/sync_file_system/drive_backend/list_changes_task.h"
     31 #include "chrome/browser/sync_file_system/drive_backend/local_to_remote_syncer.h"
     32 #include "chrome/browser/sync_file_system/drive_backend/metadata_database.h"
     33 #include "chrome/browser/sync_file_system/drive_backend/register_app_task.h"
     34 #include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_on_worker.h"
     35 #include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_wrapper.h"
     36 #include "chrome/browser/sync_file_system/drive_backend/remote_to_local_syncer.h"
     37 #include "chrome/browser/sync_file_system/drive_backend/sync_engine_context.h"
     38 #include "chrome/browser/sync_file_system/drive_backend/sync_engine_initializer.h"
     39 #include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
     40 #include "chrome/browser/sync_file_system/drive_backend/sync_worker.h"
     41 #include "chrome/browser/sync_file_system/drive_backend/sync_worker_interface.h"
     42 #include "chrome/browser/sync_file_system/drive_backend/uninstall_app_task.h"
     43 #include "chrome/browser/sync_file_system/file_status_observer.h"
     44 #include "chrome/browser/sync_file_system/logger.h"
     45 #include "chrome/browser/sync_file_system/syncable_file_system_util.h"
     46 #include "components/signin/core/browser/profile_oauth2_token_service.h"
     47 #include "components/signin/core/browser/signin_manager.h"
     48 #include "content/public/browser/browser_thread.h"
     49 #include "extensions/browser/extension_system.h"
     50 #include "extensions/browser/extension_system_provider.h"
     51 #include "extensions/browser/extensions_browser_client.h"
     52 #include "extensions/common/extension.h"
     53 #include "google_apis/drive/drive_api_url_generator.h"
     54 #include "google_apis/drive/gdata_wapi_url_generator.h"
     55 #include "net/url_request/url_request_context_getter.h"
     56 #include "webkit/common/blob/scoped_file.h"
     57 #include "webkit/common/fileapi/file_system_util.h"
     58 
     59 namespace sync_file_system {
     60 
     61 class RemoteChangeProcessor;
     62 
     63 namespace drive_backend {
     64 
     65 class SyncEngine::WorkerObserver : public SyncWorkerInterface::Observer {
     66  public:
     67   WorkerObserver(base::SequencedTaskRunner* ui_task_runner,
     68                  base::WeakPtr<SyncEngine> sync_engine)
     69       : ui_task_runner_(ui_task_runner),
     70         sync_engine_(sync_engine) {
     71     sequence_checker_.DetachFromSequence();
     72   }
     73 
     74   virtual ~WorkerObserver() {
     75     DCHECK(sequence_checker_.CalledOnValidSequencedThread());
     76   }
     77 
     78   virtual void OnPendingFileListUpdated(int item_count) OVERRIDE {
     79     if (ui_task_runner_->RunsTasksOnCurrentThread()) {
     80       if (sync_engine_)
     81         sync_engine_->OnPendingFileListUpdated(item_count);
     82       return;
     83     }
     84 
     85     DCHECK(sequence_checker_.CalledOnValidSequencedThread());
     86     ui_task_runner_->PostTask(
     87         FROM_HERE,
     88         base::Bind(&SyncEngine::OnPendingFileListUpdated,
     89                    sync_engine_,
     90                    item_count));
     91   }
     92 
     93   virtual void OnFileStatusChanged(const fileapi::FileSystemURL& url,
     94                                    SyncFileStatus file_status,
     95                                    SyncAction sync_action,
     96                                    SyncDirection direction) OVERRIDE {
     97     if (ui_task_runner_->RunsTasksOnCurrentThread()) {
     98       if (sync_engine_)
     99         sync_engine_->OnFileStatusChanged(
    100             url, file_status, sync_action, direction);
    101       return;
    102     }
    103 
    104     DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    105     ui_task_runner_->PostTask(
    106         FROM_HERE,
    107         base::Bind(&SyncEngine::OnFileStatusChanged,
    108                    sync_engine_,
    109                    url, file_status, sync_action, direction));
    110   }
    111 
    112   virtual void UpdateServiceState(RemoteServiceState state,
    113                                   const std::string& description) OVERRIDE {
    114     if (ui_task_runner_->RunsTasksOnCurrentThread()) {
    115       if (sync_engine_)
    116         sync_engine_->UpdateServiceState(state, description);
    117       return;
    118     }
    119 
    120     DCHECK(sequence_checker_.CalledOnValidSequencedThread());
    121     ui_task_runner_->PostTask(
    122         FROM_HERE,
    123         base::Bind(&SyncEngine::UpdateServiceState,
    124                    sync_engine_, state, description));
    125   }
    126 
    127   void DetachFromSequence() {
    128     sequence_checker_.DetachFromSequence();
    129   }
    130 
    131  private:
    132   scoped_refptr<base::SequencedTaskRunner> ui_task_runner_;
    133   base::WeakPtr<SyncEngine> sync_engine_;
    134 
    135   base::SequenceChecker sequence_checker_;
    136 
    137   DISALLOW_COPY_AND_ASSIGN(WorkerObserver);
    138 };
    139 
    140 namespace {
    141 
    142 void DidRegisterOrigin(const base::TimeTicks& start_time,
    143                        const SyncStatusCallback& callback,
    144                        SyncStatusCode status) {
    145   base::TimeDelta delta(base::TimeTicks::Now() - start_time);
    146   HISTOGRAM_TIMES("SyncFileSystem.RegisterOriginTime", delta);
    147   callback.Run(status);
    148 }
    149 
    150 template <typename T>
    151 void DeleteSoonHelper(scoped_ptr<T>) {}
    152 
    153 template <typename T>
    154 void DeleteSoon(const tracked_objects::Location& from_here,
    155                 base::TaskRunner* task_runner,
    156                 scoped_ptr<T> obj) {
    157   if (!obj)
    158     return;
    159 
    160   T* obj_ptr = obj.get();
    161   base::Closure deleter =
    162       base::Bind(&DeleteSoonHelper<T>, base::Passed(&obj));
    163   if (!task_runner->PostTask(from_here, deleter)) {
    164     obj_ptr->DetachFromSequence();
    165     deleter.Run();
    166   }
    167 }
    168 
    169 }  // namespace
    170 
    171 scoped_ptr<SyncEngine> SyncEngine::CreateForBrowserContext(
    172     content::BrowserContext* context,
    173     TaskLogger* task_logger) {
    174   scoped_refptr<base::SequencedWorkerPool> worker_pool =
    175       content::BrowserThread::GetBlockingPool();
    176 
    177   scoped_refptr<base::SingleThreadTaskRunner> ui_task_runner =
    178       base::MessageLoopProxy::current();
    179   scoped_refptr<base::SequencedTaskRunner> worker_task_runner =
    180       worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
    181           worker_pool->GetSequenceToken(),
    182           base::SequencedWorkerPool::SKIP_ON_SHUTDOWN);
    183   scoped_refptr<base::SequencedTaskRunner> file_task_runner =
    184       worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
    185           worker_pool->GetSequenceToken(),
    186           base::SequencedWorkerPool::SKIP_ON_SHUTDOWN);
    187   scoped_refptr<base::SequencedTaskRunner> drive_task_runner =
    188       worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
    189           worker_pool->GetSequenceToken(),
    190           base::SequencedWorkerPool::SKIP_ON_SHUTDOWN);
    191 
    192   Profile* profile = Profile::FromBrowserContext(context);
    193   drive::DriveNotificationManager* notification_manager =
    194       drive::DriveNotificationManagerFactory::GetForBrowserContext(context);
    195   ExtensionService* extension_service =
    196       extensions::ExtensionSystem::Get(context)->extension_service();
    197   SigninManagerBase* signin_manager =
    198       SigninManagerFactory::GetForProfile(profile);
    199   ProfileOAuth2TokenService* token_service =
    200       ProfileOAuth2TokenServiceFactory::GetForProfile(profile);
    201   scoped_refptr<net::URLRequestContextGetter> request_context =
    202       context->GetRequestContext();
    203 
    204   scoped_ptr<drive_backend::SyncEngine> sync_engine(
    205       new SyncEngine(ui_task_runner,
    206                      worker_task_runner,
    207                      file_task_runner,
    208                      drive_task_runner,
    209                      GetSyncFileSystemDir(context->GetPath()),
    210                      task_logger,
    211                      notification_manager,
    212                      extension_service,
    213                      signin_manager,
    214                      token_service,
    215                      request_context,
    216                      NULL  /* env_override */));
    217 
    218   sync_engine->Initialize();
    219   return sync_engine.Pass();
    220 }
    221 
    222 void SyncEngine::AppendDependsOnFactories(
    223     std::set<BrowserContextKeyedServiceFactory*>* factories) {
    224   DCHECK(factories);
    225   factories->insert(drive::DriveNotificationManagerFactory::GetInstance());
    226   factories->insert(SigninManagerFactory::GetInstance());
    227   factories->insert(
    228       extensions::ExtensionsBrowserClient::Get()->GetExtensionSystemFactory());
    229   factories->insert(ProfileOAuth2TokenServiceFactory::GetInstance());
    230 }
    231 
    232 SyncEngine::~SyncEngine() {
    233   Reset();
    234 
    235   net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this);
    236   if (signin_manager_)
    237     signin_manager_->RemoveObserver(this);
    238   if (notification_manager_)
    239     notification_manager_->RemoveObserver(this);
    240 }
    241 
    242 void SyncEngine::Reset() {
    243   if (drive_service_)
    244     drive_service_->RemoveObserver(this);
    245 
    246   DeleteSoon(FROM_HERE, worker_task_runner_, sync_worker_.Pass());
    247   DeleteSoon(FROM_HERE, worker_task_runner_, worker_observer_.Pass());
    248   DeleteSoon(FROM_HERE, worker_task_runner_,
    249              remote_change_processor_on_worker_.Pass());
    250 
    251   drive_service_wrapper_.reset();
    252   drive_service_.reset();
    253   drive_uploader_wrapper_.reset();
    254   drive_uploader_.reset();
    255   remote_change_processor_wrapper_.reset();
    256   callback_tracker_.AbortAll();
    257 }
    258 
    259 void SyncEngine::Initialize() {
    260   Reset();
    261 
    262   if (!signin_manager_ ||
    263       signin_manager_->GetAuthenticatedAccountId().empty())
    264     return;
    265 
    266   scoped_ptr<drive::DriveServiceInterface> drive_service(
    267       new drive::DriveAPIService(
    268           token_service_,
    269           request_context_,
    270           drive_task_runner_,
    271           GURL(google_apis::DriveApiUrlGenerator::kBaseUrlForProduction),
    272           GURL(google_apis::DriveApiUrlGenerator::
    273                kBaseDownloadUrlForProduction),
    274           GURL(google_apis::GDataWapiUrlGenerator::kBaseUrlForProduction),
    275           std::string() /* custom_user_agent */));
    276   scoped_ptr<drive::DriveUploaderInterface> drive_uploader(
    277       new drive::DriveUploader(drive_service.get(), drive_task_runner_));
    278 
    279   InitializeInternal(drive_service.Pass(), drive_uploader.Pass(),
    280                      scoped_ptr<SyncWorkerInterface>());
    281 }
    282 
    283 void SyncEngine::InitializeForTesting(
    284     scoped_ptr<drive::DriveServiceInterface> drive_service,
    285     scoped_ptr<drive::DriveUploaderInterface> drive_uploader,
    286     scoped_ptr<SyncWorkerInterface> sync_worker) {
    287   Reset();
    288   InitializeInternal(drive_service.Pass(), drive_uploader.Pass(),
    289                      sync_worker.Pass());
    290 }
    291 
    292 void SyncEngine::InitializeInternal(
    293     scoped_ptr<drive::DriveServiceInterface> drive_service,
    294     scoped_ptr<drive::DriveUploaderInterface> drive_uploader,
    295     scoped_ptr<SyncWorkerInterface> sync_worker) {
    296   drive_service_ = drive_service.Pass();
    297   drive_service_wrapper_.reset(new DriveServiceWrapper(drive_service_.get()));
    298 
    299   std::string account_id;
    300   if (signin_manager_)
    301     account_id = signin_manager_->GetAuthenticatedAccountId();
    302   drive_service_->Initialize(account_id);
    303 
    304   drive_uploader_ = drive_uploader.Pass();
    305   drive_uploader_wrapper_.reset(
    306       new DriveUploaderWrapper(drive_uploader_.get()));
    307 
    308   // DriveServiceWrapper and DriveServiceOnWorker relay communications
    309   // between DriveService and syncers in SyncWorker.
    310   scoped_ptr<drive::DriveServiceInterface> drive_service_on_worker(
    311       new DriveServiceOnWorker(drive_service_wrapper_->AsWeakPtr(),
    312                                ui_task_runner_,
    313                                worker_task_runner_));
    314   scoped_ptr<drive::DriveUploaderInterface> drive_uploader_on_worker(
    315       new DriveUploaderOnWorker(drive_uploader_wrapper_->AsWeakPtr(),
    316                                 ui_task_runner_,
    317                                 worker_task_runner_));
    318   scoped_ptr<SyncEngineContext> sync_engine_context(
    319       new SyncEngineContext(drive_service_on_worker.Pass(),
    320                             drive_uploader_on_worker.Pass(),
    321                             task_logger_,
    322                             ui_task_runner_,
    323                             worker_task_runner_,
    324                             file_task_runner_));
    325 
    326   worker_observer_.reset(
    327       new WorkerObserver(ui_task_runner_, weak_ptr_factory_.GetWeakPtr()));
    328 
    329   base::WeakPtr<ExtensionServiceInterface> extension_service_weak_ptr;
    330   if (extension_service_)
    331     extension_service_weak_ptr = extension_service_->AsWeakPtr();
    332 
    333   if (!sync_worker) {
    334     sync_worker.reset(new SyncWorker(
    335         sync_file_system_dir_,
    336         extension_service_weak_ptr,
    337         env_override_));
    338   }
    339 
    340   sync_worker_ = sync_worker.Pass();
    341   sync_worker_->AddObserver(worker_observer_.get());
    342 
    343   worker_task_runner_->PostTask(
    344       FROM_HERE,
    345       base::Bind(&SyncWorkerInterface::Initialize,
    346                  base::Unretained(sync_worker_.get()),
    347                  base::Passed(&sync_engine_context)));
    348   if (remote_change_processor_)
    349     SetRemoteChangeProcessor(remote_change_processor_);
    350 
    351   drive_service_->AddObserver(this);
    352 
    353   service_state_ = REMOTE_SERVICE_TEMPORARY_UNAVAILABLE;
    354   SetSyncEnabled(sync_enabled_);
    355   OnNetworkChanged(net::NetworkChangeNotifier::GetConnectionType());
    356 }
    357 
    358 void SyncEngine::AddServiceObserver(SyncServiceObserver* observer) {
    359   service_observers_.AddObserver(observer);
    360 }
    361 
    362 void SyncEngine::AddFileStatusObserver(FileStatusObserver* observer) {
    363   file_status_observers_.AddObserver(observer);
    364 }
    365 
    366 void SyncEngine::RegisterOrigin(const GURL& origin,
    367                                 const SyncStatusCallback& callback) {
    368   if (!sync_worker_) {
    369     // TODO(tzik): Record |origin| and retry the registration after late
    370     // sign-in.  Then, return SYNC_STATUS_OK.
    371     if (!signin_manager_ ||
    372         signin_manager_->GetAuthenticatedAccountId().empty())
    373       callback.Run(SYNC_STATUS_AUTHENTICATION_FAILED);
    374     else
    375       callback.Run(SYNC_STATUS_ABORT);
    376     return;
    377   }
    378 
    379   SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
    380       FROM_HERE, base::Bind(&DidRegisterOrigin, base::TimeTicks::Now(),
    381                             TrackCallback(callback)));
    382 
    383   worker_task_runner_->PostTask(
    384       FROM_HERE,
    385       base::Bind(&SyncWorkerInterface::RegisterOrigin,
    386                  base::Unretained(sync_worker_.get()),
    387                  origin, relayed_callback));
    388 }
    389 
    390 void SyncEngine::EnableOrigin(
    391     const GURL& origin, const SyncStatusCallback& callback) {
    392   if (!sync_worker_) {
    393     // It's safe to return OK immediately since this is also checked in
    394     // SyncWorker initialization.
    395     callback.Run(SYNC_STATUS_OK);
    396     return;
    397   }
    398 
    399   SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
    400       FROM_HERE, TrackCallback(callback));
    401 
    402   worker_task_runner_->PostTask(
    403       FROM_HERE,
    404       base::Bind(&SyncWorkerInterface::EnableOrigin,
    405                  base::Unretained(sync_worker_.get()),
    406                  origin, relayed_callback));
    407 }
    408 
    409 void SyncEngine::DisableOrigin(
    410     const GURL& origin, const SyncStatusCallback& callback) {
    411   if (!sync_worker_) {
    412     // It's safe to return OK immediately since this is also checked in
    413     // SyncWorker initialization.
    414     callback.Run(SYNC_STATUS_OK);
    415     return;
    416   }
    417 
    418   SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
    419       FROM_HERE, TrackCallback(callback));
    420 
    421   worker_task_runner_->PostTask(
    422       FROM_HERE,
    423       base::Bind(&SyncWorkerInterface::DisableOrigin,
    424                  base::Unretained(sync_worker_.get()),
    425                  origin,
    426                  relayed_callback));
    427 }
    428 
    429 void SyncEngine::UninstallOrigin(
    430     const GURL& origin,
    431     UninstallFlag flag,
    432     const SyncStatusCallback& callback) {
    433   if (!sync_worker_) {
    434     // It's safe to return OK immediately since this is also checked in
    435     // SyncWorker initialization.
    436     callback.Run(SYNC_STATUS_OK);
    437     return;
    438   }
    439 
    440   SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
    441       FROM_HERE, TrackCallback(callback));
    442   worker_task_runner_->PostTask(
    443       FROM_HERE,
    444       base::Bind(&SyncWorkerInterface::UninstallOrigin,
    445                  base::Unretained(sync_worker_.get()),
    446                  origin, flag, relayed_callback));
    447 }
    448 
    449 void SyncEngine::ProcessRemoteChange(const SyncFileCallback& callback) {
    450   base::Closure abort_closure =
    451       base::Bind(callback, SYNC_STATUS_ABORT, fileapi::FileSystemURL());
    452 
    453   if (!sync_worker_) {
    454     abort_closure.Run();
    455     return;
    456   }
    457 
    458   SyncFileCallback tracked_callback = callback_tracker_.Register(
    459       abort_closure, callback);
    460   SyncFileCallback relayed_callback = RelayCallbackToCurrentThread(
    461       FROM_HERE, tracked_callback);
    462   worker_task_runner_->PostTask(
    463       FROM_HERE,
    464       base::Bind(&SyncWorkerInterface::ProcessRemoteChange,
    465                  base::Unretained(sync_worker_.get()),
    466                  relayed_callback));
    467 }
    468 
    469 void SyncEngine::SetRemoteChangeProcessor(RemoteChangeProcessor* processor) {
    470   remote_change_processor_ = processor;
    471 
    472   if (!sync_worker_)
    473     return;
    474 
    475   remote_change_processor_wrapper_.reset(
    476       new RemoteChangeProcessorWrapper(processor));
    477 
    478   remote_change_processor_on_worker_.reset(new RemoteChangeProcessorOnWorker(
    479       remote_change_processor_wrapper_->AsWeakPtr(),
    480       ui_task_runner_, worker_task_runner_));
    481 
    482   worker_task_runner_->PostTask(
    483       FROM_HERE,
    484       base::Bind(&SyncWorkerInterface::SetRemoteChangeProcessor,
    485                  base::Unretained(sync_worker_.get()),
    486                  remote_change_processor_on_worker_.get()));
    487 }
    488 
    489 LocalChangeProcessor* SyncEngine::GetLocalChangeProcessor() {
    490   return this;
    491 }
    492 
    493 RemoteServiceState SyncEngine::GetCurrentState() const {
    494   return service_state_;
    495 }
    496 
    497 void SyncEngine::GetOriginStatusMap(const StatusMapCallback& callback) {
    498   base::Closure abort_closure =
    499       base::Bind(callback, base::Passed(scoped_ptr<OriginStatusMap>()));
    500 
    501   if (!sync_worker_) {
    502     abort_closure.Run();
    503     return;
    504   }
    505 
    506   StatusMapCallback tracked_callback =
    507       callback_tracker_.Register(abort_closure, callback);
    508   StatusMapCallback relayed_callback =
    509       RelayCallbackToCurrentThread(FROM_HERE, tracked_callback);
    510 
    511   worker_task_runner_->PostTask(
    512       FROM_HERE,
    513       base::Bind(&SyncWorkerInterface::GetOriginStatusMap,
    514                  base::Unretained(sync_worker_.get()),
    515                  relayed_callback));
    516 }
    517 
    518 void SyncEngine::DumpFiles(const GURL& origin,
    519                            const ListCallback& callback) {
    520   base::Closure abort_closure =
    521       base::Bind(callback, base::Passed(scoped_ptr<base::ListValue>()));
    522 
    523   if (!sync_worker_) {
    524     abort_closure.Run();
    525     return;
    526   }
    527 
    528   ListCallback tracked_callback =
    529       callback_tracker_.Register(abort_closure, callback);
    530 
    531   PostTaskAndReplyWithResult(
    532       worker_task_runner_,
    533       FROM_HERE,
    534       base::Bind(&SyncWorkerInterface::DumpFiles,
    535                  base::Unretained(sync_worker_.get()),
    536                  origin),
    537       tracked_callback);
    538 }
    539 
    540 void SyncEngine::DumpDatabase(const ListCallback& callback) {
    541   base::Closure abort_closure =
    542       base::Bind(callback, base::Passed(scoped_ptr<base::ListValue>()));
    543 
    544   if (!sync_worker_) {
    545     abort_closure.Run();
    546     return;
    547   }
    548 
    549   ListCallback tracked_callback =
    550       callback_tracker_.Register(abort_closure, callback);
    551 
    552   PostTaskAndReplyWithResult(
    553       worker_task_runner_,
    554       FROM_HERE,
    555       base::Bind(&SyncWorkerInterface::DumpDatabase,
    556                  base::Unretained(sync_worker_.get())),
    557       tracked_callback);
    558 }
    559 
    560 void SyncEngine::SetSyncEnabled(bool sync_enabled) {
    561   sync_enabled_ = sync_enabled;
    562 
    563   if (!sync_worker_)
    564     return;
    565 
    566   worker_task_runner_->PostTask(
    567       FROM_HERE,
    568       base::Bind(&SyncWorkerInterface::SetSyncEnabled,
    569                  base::Unretained(sync_worker_.get()),
    570                  sync_enabled));
    571 }
    572 
    573 void SyncEngine::PromoteDemotedChanges() {
    574   if (!sync_worker_)
    575     return;
    576 
    577   worker_task_runner_->PostTask(
    578       FROM_HERE,
    579       base::Bind(&SyncWorkerInterface::PromoteDemotedChanges,
    580                  base::Unretained(sync_worker_.get())));
    581 }
    582 
    583 void SyncEngine::ApplyLocalChange(
    584     const FileChange& local_change,
    585     const base::FilePath& local_path,
    586     const SyncFileMetadata& local_metadata,
    587     const fileapi::FileSystemURL& url,
    588     const SyncStatusCallback& callback) {
    589   if (!sync_worker_) {
    590     callback.Run(SYNC_STATUS_ABORT);
    591     return;
    592   }
    593 
    594   SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
    595       FROM_HERE, TrackCallback(callback));
    596   worker_task_runner_->PostTask(
    597       FROM_HERE,
    598       base::Bind(&SyncWorkerInterface::ApplyLocalChange,
    599                  base::Unretained(sync_worker_.get()),
    600                  local_change,
    601                  local_path,
    602                  local_metadata,
    603                  url,
    604                  relayed_callback));
    605 }
    606 
    607 void SyncEngine::OnNotificationReceived() {
    608   if (!sync_worker_)
    609     return;
    610 
    611   worker_task_runner_->PostTask(
    612       FROM_HERE,
    613       base::Bind(&SyncWorkerInterface::OnNotificationReceived,
    614                  base::Unretained(sync_worker_.get())));
    615 }
    616 
    617 void SyncEngine::OnPushNotificationEnabled(bool) {}
    618 
    619 void SyncEngine::OnReadyToSendRequests() {
    620   if (!sync_worker_)
    621     return;
    622 
    623   worker_task_runner_->PostTask(
    624       FROM_HERE,
    625       base::Bind(&SyncWorkerInterface::OnReadyToSendRequests,
    626                  base::Unretained(sync_worker_.get())));
    627 }
    628 
    629 void SyncEngine::OnRefreshTokenInvalid() {
    630   if (!sync_worker_)
    631     return;
    632 
    633   worker_task_runner_->PostTask(
    634       FROM_HERE,
    635       base::Bind(&SyncWorkerInterface::OnRefreshTokenInvalid,
    636                  base::Unretained(sync_worker_.get())));
    637 }
    638 
    639 void SyncEngine::OnNetworkChanged(
    640     net::NetworkChangeNotifier::ConnectionType type) {
    641   if (!sync_worker_)
    642     return;
    643 
    644   worker_task_runner_->PostTask(
    645       FROM_HERE,
    646       base::Bind(&SyncWorkerInterface::OnNetworkChanged,
    647                  base::Unretained(sync_worker_.get()),
    648                  type));
    649 }
    650 
    651 void SyncEngine::GoogleSigninFailed(const GoogleServiceAuthError& error) {
    652   Reset();
    653   UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
    654                      "Failed to sign in.");
    655 }
    656 
    657 void SyncEngine::GoogleSigninSucceeded(const std::string& username,
    658                                        const std::string& password) {
    659   Initialize();
    660 }
    661 
    662 void SyncEngine::GoogleSignedOut(const std::string& username) {
    663   Reset();
    664   UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
    665                      "User signed out.");
    666 }
    667 
    668 SyncEngine::SyncEngine(
    669     base::SingleThreadTaskRunner* ui_task_runner,
    670     base::SequencedTaskRunner* worker_task_runner,
    671     base::SequencedTaskRunner* file_task_runner,
    672     base::SequencedTaskRunner* drive_task_runner,
    673     const base::FilePath& sync_file_system_dir,
    674     TaskLogger* task_logger,
    675     drive::DriveNotificationManager* notification_manager,
    676     ExtensionServiceInterface* extension_service,
    677     SigninManagerBase* signin_manager,
    678     ProfileOAuth2TokenService* token_service,
    679     net::URLRequestContextGetter* request_context,
    680     leveldb::Env* env_override)
    681     : ui_task_runner_(ui_task_runner),
    682       worker_task_runner_(worker_task_runner),
    683       file_task_runner_(file_task_runner),
    684       drive_task_runner_(drive_task_runner),
    685       sync_file_system_dir_(sync_file_system_dir),
    686       task_logger_(task_logger),
    687       notification_manager_(notification_manager),
    688       extension_service_(extension_service),
    689       signin_manager_(signin_manager),
    690       token_service_(token_service),
    691       request_context_(request_context),
    692       remote_change_processor_(NULL),
    693       service_state_(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE),
    694       sync_enabled_(false),
    695       env_override_(env_override),
    696       weak_ptr_factory_(this) {
    697   DCHECK(sync_file_system_dir_.IsAbsolute());
    698   if (notification_manager_)
    699     notification_manager_->AddObserver(this);
    700   if (signin_manager_)
    701     signin_manager_->AddObserver(this);
    702   net::NetworkChangeNotifier::AddNetworkChangeObserver(this);
    703 }
    704 
    705 void SyncEngine::OnPendingFileListUpdated(int item_count) {
    706   FOR_EACH_OBSERVER(
    707       SyncServiceObserver,
    708       service_observers_,
    709       OnRemoteChangeQueueUpdated(item_count));
    710 }
    711 
    712 void SyncEngine::OnFileStatusChanged(const fileapi::FileSystemURL& url,
    713                                      SyncFileStatus file_status,
    714                                      SyncAction sync_action,
    715                                      SyncDirection direction) {
    716   FOR_EACH_OBSERVER(FileStatusObserver,
    717                     file_status_observers_,
    718                     OnFileStatusChanged(
    719                         url, file_status, sync_action, direction));
    720 }
    721 
    722 void SyncEngine::UpdateServiceState(RemoteServiceState state,
    723                                     const std::string& description) {
    724   service_state_ = state;
    725 
    726   FOR_EACH_OBSERVER(
    727       SyncServiceObserver, service_observers_,
    728       OnRemoteServiceStateUpdated(state, description));
    729 }
    730 
    731 SyncStatusCallback SyncEngine::TrackCallback(
    732     const SyncStatusCallback& callback) {
    733   return callback_tracker_.Register(
    734       base::Bind(callback, SYNC_STATUS_ABORT),
    735       callback);
    736 }
    737 
    738 }  // namespace drive_backend
    739 }  // namespace sync_file_system
    740