Home | History | Annotate | Download | only in internal_api
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "sync/internal_api/sync_manager_impl.h"
      6 
      7 #include <string>
      8 
      9 #include "base/base64.h"
     10 #include "base/bind.h"
     11 #include "base/callback.h"
     12 #include "base/compiler_specific.h"
     13 #include "base/json/json_writer.h"
     14 #include "base/memory/ref_counted.h"
     15 #include "base/metrics/histogram.h"
     16 #include "base/observer_list.h"
     17 #include "base/strings/string_number_conversions.h"
     18 #include "base/thread_task_runner_handle.h"
     19 #include "base/values.h"
     20 #include "sync/engine/sync_scheduler.h"
     21 #include "sync/engine/syncer_types.h"
     22 #include "sync/internal_api/change_reorder_buffer.h"
     23 #include "sync/internal_api/public/base/cancelation_signal.h"
     24 #include "sync/internal_api/public/base/invalidation_interface.h"
     25 #include "sync/internal_api/public/base/model_type.h"
     26 #include "sync/internal_api/public/base_node.h"
     27 #include "sync/internal_api/public/configure_reason.h"
     28 #include "sync/internal_api/public/engine/polling_constants.h"
     29 #include "sync/internal_api/public/http_post_provider_factory.h"
     30 #include "sync/internal_api/public/internal_components_factory.h"
     31 #include "sync/internal_api/public/read_node.h"
     32 #include "sync/internal_api/public/read_transaction.h"
     33 #include "sync/internal_api/public/sync_context.h"
     34 #include "sync/internal_api/public/sync_context_proxy.h"
     35 #include "sync/internal_api/public/user_share.h"
     36 #include "sync/internal_api/public/util/experiments.h"
     37 #include "sync/internal_api/public/write_node.h"
     38 #include "sync/internal_api/public/write_transaction.h"
     39 #include "sync/internal_api/sync_context_proxy_impl.h"
     40 #include "sync/internal_api/syncapi_internal.h"
     41 #include "sync/internal_api/syncapi_server_connection_manager.h"
     42 #include "sync/protocol/proto_value_conversions.h"
     43 #include "sync/protocol/sync.pb.h"
     44 #include "sync/sessions/directory_type_debug_info_emitter.h"
     45 #include "sync/syncable/directory.h"
     46 #include "sync/syncable/entry.h"
     47 #include "sync/syncable/in_memory_directory_backing_store.h"
     48 #include "sync/syncable/on_disk_directory_backing_store.h"
     49 
     50 using base::TimeDelta;
     51 using sync_pb::GetUpdatesCallerInfo;
     52 
     53 class GURL;
     54 
     55 namespace syncer {
     56 
     57 using sessions::SyncSessionContext;
     58 using syncable::ImmutableWriteTransactionInfo;
     59 using syncable::SPECIFICS;
     60 using syncable::UNIQUE_POSITION;
     61 
     62 namespace {
     63 
     64 GetUpdatesCallerInfo::GetUpdatesSource GetSourceFromReason(
     65     ConfigureReason reason) {
     66   switch (reason) {
     67     case CONFIGURE_REASON_RECONFIGURATION:
     68       return GetUpdatesCallerInfo::RECONFIGURATION;
     69     case CONFIGURE_REASON_MIGRATION:
     70       return GetUpdatesCallerInfo::MIGRATION;
     71     case CONFIGURE_REASON_NEW_CLIENT:
     72       return GetUpdatesCallerInfo::NEW_CLIENT;
     73     case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE:
     74     case CONFIGURE_REASON_CRYPTO:
     75       return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE;
     76     case CONFIGURE_REASON_PROGRAMMATIC:
     77       return GetUpdatesCallerInfo::PROGRAMMATIC;
     78     default:
     79       NOTREACHED();
     80   }
     81   return GetUpdatesCallerInfo::UNKNOWN;
     82 }
     83 
     84 }  // namespace
     85 
     86 SyncManagerImpl::SyncManagerImpl(const std::string& name)
     87     : name_(name),
     88       change_delegate_(NULL),
     89       initialized_(false),
     90       observing_network_connectivity_changes_(false),
     91       report_unrecoverable_error_function_(NULL),
     92       weak_ptr_factory_(this) {
     93   // Pre-fill |notification_info_map_|.
     94   for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
     95     notification_info_map_.insert(
     96         std::make_pair(ModelTypeFromInt(i), NotificationInfo()));
     97   }
     98 }
     99 
    100 SyncManagerImpl::~SyncManagerImpl() {
    101   DCHECK(thread_checker_.CalledOnValidThread());
    102   CHECK(!initialized_);
    103 }
    104 
    105 SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {}
    106 SyncManagerImpl::NotificationInfo::~NotificationInfo() {}
    107 
    108 base::DictionaryValue* SyncManagerImpl::NotificationInfo::ToValue() const {
    109   base::DictionaryValue* value = new base::DictionaryValue();
    110   value->SetInteger("totalCount", total_count);
    111   value->SetString("payload", payload);
    112   return value;
    113 }
    114 
    115 bool SyncManagerImpl::VisiblePositionsDiffer(
    116     const syncable::EntryKernelMutation& mutation) const {
    117   const syncable::EntryKernel& a = mutation.original;
    118   const syncable::EntryKernel& b = mutation.mutated;
    119   if (!b.ShouldMaintainPosition())
    120     return false;
    121   if (!a.ref(UNIQUE_POSITION).Equals(b.ref(UNIQUE_POSITION)))
    122     return true;
    123   if (a.ref(syncable::PARENT_ID) != b.ref(syncable::PARENT_ID))
    124     return true;
    125   return false;
    126 }
    127 
    128 bool SyncManagerImpl::VisiblePropertiesDiffer(
    129     const syncable::EntryKernelMutation& mutation,
    130     Cryptographer* cryptographer) const {
    131   const syncable::EntryKernel& a = mutation.original;
    132   const syncable::EntryKernel& b = mutation.mutated;
    133   const sync_pb::EntitySpecifics& a_specifics = a.ref(SPECIFICS);
    134   const sync_pb::EntitySpecifics& b_specifics = b.ref(SPECIFICS);
    135   DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics),
    136             GetModelTypeFromSpecifics(b_specifics));
    137   ModelType model_type = GetModelTypeFromSpecifics(b_specifics);
    138   // Suppress updates to items that aren't tracked by any browser model.
    139   if (model_type < FIRST_REAL_MODEL_TYPE ||
    140       !a.ref(syncable::UNIQUE_SERVER_TAG).empty()) {
    141     return false;
    142   }
    143   if (a.ref(syncable::IS_DIR) != b.ref(syncable::IS_DIR))
    144     return true;
    145   if (!AreSpecificsEqual(cryptographer,
    146                          a.ref(syncable::SPECIFICS),
    147                          b.ref(syncable::SPECIFICS))) {
    148     return true;
    149   }
    150   if (!AreAttachmentMetadataEqual(a.ref(syncable::ATTACHMENT_METADATA),
    151                                   b.ref(syncable::ATTACHMENT_METADATA))) {
    152     return true;
    153   }
    154   // We only care if the name has changed if neither specifics is encrypted
    155   // (encrypted nodes blow away the NON_UNIQUE_NAME).
    156   if (!a_specifics.has_encrypted() && !b_specifics.has_encrypted() &&
    157       a.ref(syncable::NON_UNIQUE_NAME) != b.ref(syncable::NON_UNIQUE_NAME))
    158     return true;
    159   if (VisiblePositionsDiffer(mutation))
    160     return true;
    161   return false;
    162 }
    163 
    164 ModelTypeSet SyncManagerImpl::InitialSyncEndedTypes() {
    165   return directory()->InitialSyncEndedTypes();
    166 }
    167 
    168 ModelTypeSet SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken(
    169     ModelTypeSet types) {
    170   ModelTypeSet result;
    171   for (ModelTypeSet::Iterator i = types.First(); i.Good(); i.Inc()) {
    172     sync_pb::DataTypeProgressMarker marker;
    173     directory()->GetDownloadProgress(i.Get(), &marker);
    174 
    175     if (marker.token().empty())
    176       result.Put(i.Get());
    177   }
    178   return result;
    179 }
    180 
    181 void SyncManagerImpl::ConfigureSyncer(
    182     ConfigureReason reason,
    183     ModelTypeSet to_download,
    184     ModelTypeSet to_purge,
    185     ModelTypeSet to_journal,
    186     ModelTypeSet to_unapply,
    187     const ModelSafeRoutingInfo& new_routing_info,
    188     const base::Closure& ready_task,
    189     const base::Closure& retry_task) {
    190   DCHECK(thread_checker_.CalledOnValidThread());
    191   DCHECK(!ready_task.is_null());
    192   DCHECK(!retry_task.is_null());
    193   DCHECK(initialized_);
    194 
    195   DVLOG(1) << "Configuring -"
    196            << "\n\t" << "current types: "
    197            << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info))
    198            << "\n\t" << "types to download: "
    199            << ModelTypeSetToString(to_download)
    200            << "\n\t" << "types to purge: "
    201            << ModelTypeSetToString(to_purge)
    202            << "\n\t" << "types to journal: "
    203            << ModelTypeSetToString(to_journal)
    204            << "\n\t" << "types to unapply: "
    205            << ModelTypeSetToString(to_unapply);
    206   if (!PurgeDisabledTypes(to_purge,
    207                           to_journal,
    208                           to_unapply)) {
    209     // We failed to cleanup the types. Invoke the ready task without actually
    210     // configuring any types. The caller should detect this as a configuration
    211     // failure and act appropriately.
    212     ready_task.Run();
    213     return;
    214   }
    215 
    216   ConfigurationParams params(GetSourceFromReason(reason),
    217                              to_download,
    218                              new_routing_info,
    219                              ready_task,
    220                              retry_task);
    221 
    222   scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
    223   scheduler_->ScheduleConfiguration(params);
    224 }
    225 
    226 void SyncManagerImpl::Init(InitArgs* args) {
    227   CHECK(!initialized_);
    228   DCHECK(thread_checker_.CalledOnValidThread());
    229   DCHECK(args->post_factory.get());
    230   DCHECK(!args->credentials.email.empty());
    231   DCHECK(!args->credentials.sync_token.empty());
    232   DCHECK(!args->credentials.scope_set.empty());
    233   DCHECK(args->cancelation_signal);
    234   DVLOG(1) << "SyncManager starting Init...";
    235 
    236   weak_handle_this_ = MakeWeakHandle(weak_ptr_factory_.GetWeakPtr());
    237 
    238   change_delegate_ = args->change_delegate;
    239 
    240   AddObserver(&js_sync_manager_observer_);
    241   SetJsEventHandler(args->event_handler);
    242 
    243   AddObserver(&debug_info_event_listener_);
    244 
    245   database_path_ = args->database_location.Append(
    246       syncable::Directory::kSyncDatabaseFilename);
    247   unrecoverable_error_handler_ = args->unrecoverable_error_handler.Pass();
    248   report_unrecoverable_error_function_ =
    249       args->report_unrecoverable_error_function;
    250 
    251   allstatus_.SetHasKeystoreKey(
    252       !args->restored_keystore_key_for_bootstrapping.empty());
    253   sync_encryption_handler_.reset(new SyncEncryptionHandlerImpl(
    254       &share_,
    255       args->encryptor,
    256       args->restored_key_for_bootstrapping,
    257       args->restored_keystore_key_for_bootstrapping));
    258   sync_encryption_handler_->AddObserver(this);
    259   sync_encryption_handler_->AddObserver(&debug_info_event_listener_);
    260   sync_encryption_handler_->AddObserver(&js_sync_encryption_handler_observer_);
    261 
    262   base::FilePath absolute_db_path = database_path_;
    263   DCHECK(absolute_db_path.IsAbsolute());
    264 
    265   scoped_ptr<syncable::DirectoryBackingStore> backing_store =
    266       args->internal_components_factory->BuildDirectoryBackingStore(
    267           InternalComponentsFactory::STORAGE_ON_DISK,
    268           args->credentials.email, absolute_db_path).Pass();
    269 
    270   DCHECK(backing_store.get());
    271   share_.directory.reset(
    272       new syncable::Directory(
    273           backing_store.release(),
    274           unrecoverable_error_handler_.get(),
    275           report_unrecoverable_error_function_,
    276           sync_encryption_handler_.get(),
    277           sync_encryption_handler_->GetCryptographerUnsafe()));
    278   share_.sync_credentials = args->credentials;
    279 
    280   // UserShare is accessible to a lot of code that doesn't need access to the
    281   // sync token so clear sync_token from the UserShare.
    282   share_.sync_credentials.sync_token = "";
    283 
    284   const std::string& username = args->credentials.email;
    285   DVLOG(1) << "Username: " << username;
    286   if (!OpenDirectory(username)) {
    287     NotifyInitializationFailure();
    288     LOG(ERROR) << "Sync manager initialization failed!";
    289     return;
    290   }
    291 
    292   connection_manager_.reset(new SyncAPIServerConnectionManager(
    293       args->service_url.host() + args->service_url.path(),
    294       args->service_url.EffectiveIntPort(),
    295       args->service_url.SchemeIsSecure(),
    296       args->post_factory.release(),
    297       args->cancelation_signal));
    298   connection_manager_->set_client_id(directory()->cache_guid());
    299   connection_manager_->AddListener(this);
    300 
    301   std::string sync_id = directory()->cache_guid();
    302 
    303   DVLOG(1) << "Setting sync client ID: " << sync_id;
    304   allstatus_.SetSyncId(sync_id);
    305   DVLOG(1) << "Setting invalidator client ID: " << args->invalidator_client_id;
    306   allstatus_.SetInvalidatorClientId(args->invalidator_client_id);
    307 
    308   model_type_registry_.reset(
    309       new ModelTypeRegistry(args->workers, directory(), this));
    310   sync_encryption_handler_->AddObserver(model_type_registry_.get());
    311 
    312   // Bind the SyncContext WeakPtr to this thread.  This helps us crash earlier
    313   // if the pointer is misused in debug mode.
    314   base::WeakPtr<SyncContext> weak_core = model_type_registry_->AsWeakPtr();
    315   weak_core.get();
    316 
    317   sync_context_proxy_.reset(
    318       new SyncContextProxyImpl(base::ThreadTaskRunnerHandle::Get(), weak_core));
    319 
    320   // Build a SyncSessionContext and store the worker in it.
    321   DVLOG(1) << "Sync is bringing up SyncSessionContext.";
    322   std::vector<SyncEngineEventListener*> listeners;
    323   listeners.push_back(&allstatus_);
    324   listeners.push_back(this);
    325   session_context_ =
    326       args->internal_components_factory->BuildContext(
    327                                              connection_manager_.get(),
    328                                              directory(),
    329                                              args->extensions_activity,
    330                                              listeners,
    331                                              &debug_info_event_listener_,
    332                                              model_type_registry_.get(),
    333                                              args->invalidator_client_id)
    334           .Pass();
    335   session_context_->set_account_name(args->credentials.email);
    336   scheduler_ = args->internal_components_factory->BuildScheduler(
    337       name_, session_context_.get(), args->cancelation_signal).Pass();
    338 
    339   scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
    340 
    341   initialized_ = true;
    342 
    343   net::NetworkChangeNotifier::AddIPAddressObserver(this);
    344   net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
    345   observing_network_connectivity_changes_ = true;
    346 
    347   UpdateCredentials(args->credentials);
    348 
    349   NotifyInitializationSuccess();
    350 }
    351 
    352 void SyncManagerImpl::NotifyInitializationSuccess() {
    353   FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
    354                     OnInitializationComplete(
    355                         MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
    356                         MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
    357                         true, InitialSyncEndedTypes()));
    358 }
    359 
    360 void SyncManagerImpl::NotifyInitializationFailure() {
    361   FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
    362                     OnInitializationComplete(
    363                         MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
    364                         MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
    365                         false, ModelTypeSet()));
    366 }
    367 
    368 void SyncManagerImpl::OnPassphraseRequired(
    369     PassphraseRequiredReason reason,
    370     const sync_pb::EncryptedData& pending_keys) {
    371   // Does nothing.
    372 }
    373 
    374 void SyncManagerImpl::OnPassphraseAccepted() {
    375   // Does nothing.
    376 }
    377 
    378 void SyncManagerImpl::OnBootstrapTokenUpdated(
    379     const std::string& bootstrap_token,
    380     BootstrapTokenType type) {
    381   if (type == KEYSTORE_BOOTSTRAP_TOKEN)
    382     allstatus_.SetHasKeystoreKey(true);
    383 }
    384 
    385 void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
    386                                               bool encrypt_everything) {
    387   allstatus_.SetEncryptedTypes(encrypted_types);
    388 }
    389 
    390 void SyncManagerImpl::OnEncryptionComplete() {
    391   // Does nothing.
    392 }
    393 
    394 void SyncManagerImpl::OnCryptographerStateChanged(
    395     Cryptographer* cryptographer) {
    396   allstatus_.SetCryptographerReady(cryptographer->is_ready());
    397   allstatus_.SetCryptoHasPendingKeys(cryptographer->has_pending_keys());
    398   allstatus_.SetKeystoreMigrationTime(
    399       sync_encryption_handler_->migration_time());
    400 }
    401 
    402 void SyncManagerImpl::OnPassphraseTypeChanged(
    403     PassphraseType type,
    404     base::Time explicit_passphrase_time) {
    405   allstatus_.SetPassphraseType(type);
    406   allstatus_.SetKeystoreMigrationTime(
    407       sync_encryption_handler_->migration_time());
    408 }
    409 
    410 void SyncManagerImpl::StartSyncingNormally(
    411     const ModelSafeRoutingInfo& routing_info) {
    412   // Start the sync scheduler.
    413   // TODO(sync): We always want the newest set of routes when we switch back
    414   // to normal mode. Figure out how to enforce set_routing_info is always
    415   // appropriately set and that it's only modified when switching to normal
    416   // mode.
    417   DCHECK(thread_checker_.CalledOnValidThread());
    418   session_context_->SetRoutingInfo(routing_info);
    419   scheduler_->Start(SyncScheduler::NORMAL_MODE);
    420 }
    421 
    422 syncable::Directory* SyncManagerImpl::directory() {
    423   return share_.directory.get();
    424 }
    425 
    426 const SyncScheduler* SyncManagerImpl::scheduler() const {
    427   return scheduler_.get();
    428 }
    429 
    430 bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const {
    431   return connection_manager_->HasInvalidAuthToken();
    432 }
    433 
    434 bool SyncManagerImpl::OpenDirectory(const std::string& username) {
    435   DCHECK(!initialized_) << "Should only happen once";
    436 
    437   // Set before Open().
    438   change_observer_ = MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr());
    439   WeakHandle<syncable::TransactionObserver> transaction_observer(
    440       MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr()));
    441 
    442   syncable::DirOpenResult open_result = syncable::NOT_INITIALIZED;
    443   open_result = directory()->Open(username, this, transaction_observer);
    444   if (open_result != syncable::OPENED) {
    445     LOG(ERROR) << "Could not open share for:" << username;
    446     return false;
    447   }
    448 
    449   // Unapplied datatypes (those that do not have initial sync ended set) get
    450   // re-downloaded during any configuration. But, it's possible for a datatype
    451   // to have a progress marker but not have initial sync ended yet, making
    452   // it a candidate for migration. This is a problem, as the DataTypeManager
    453   // does not support a migration while it's already in the middle of a
    454   // configuration. As a result, any partially synced datatype can stall the
    455   // DTM, waiting for the configuration to complete, which it never will due
    456   // to the migration error. In addition, a partially synced nigori will
    457   // trigger the migration logic before the backend is initialized, resulting
    458   // in crashes. We therefore detect and purge any partially synced types as
    459   // part of initialization.
    460   if (!PurgePartiallySyncedTypes())
    461     return false;
    462 
    463   return true;
    464 }
    465 
    466 bool SyncManagerImpl::PurgePartiallySyncedTypes() {
    467   ModelTypeSet partially_synced_types = ModelTypeSet::All();
    468   partially_synced_types.RemoveAll(InitialSyncEndedTypes());
    469   partially_synced_types.RemoveAll(GetTypesWithEmptyProgressMarkerToken(
    470       ModelTypeSet::All()));
    471 
    472   DVLOG(1) << "Purging partially synced types "
    473            << ModelTypeSetToString(partially_synced_types);
    474   UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes",
    475                        partially_synced_types.Size());
    476   if (partially_synced_types.Empty())
    477     return true;
    478   return directory()->PurgeEntriesWithTypeIn(partially_synced_types,
    479                                              ModelTypeSet(),
    480                                              ModelTypeSet());
    481 }
    482 
    483 bool SyncManagerImpl::PurgeDisabledTypes(
    484     ModelTypeSet to_purge,
    485     ModelTypeSet to_journal,
    486     ModelTypeSet to_unapply) {
    487   if (to_purge.Empty())
    488     return true;
    489   DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge);
    490   DCHECK(to_purge.HasAll(to_journal));
    491   DCHECK(to_purge.HasAll(to_unapply));
    492   return directory()->PurgeEntriesWithTypeIn(to_purge, to_journal, to_unapply);
    493 }
    494 
    495 void SyncManagerImpl::UpdateCredentials(const SyncCredentials& credentials) {
    496   DCHECK(thread_checker_.CalledOnValidThread());
    497   DCHECK(initialized_);
    498   DCHECK(!credentials.email.empty());
    499   DCHECK(!credentials.sync_token.empty());
    500   DCHECK(!credentials.scope_set.empty());
    501 
    502   observing_network_connectivity_changes_ = true;
    503   if (!connection_manager_->SetAuthToken(credentials.sync_token))
    504     return;  // Auth token is known to be invalid, so exit early.
    505 
    506   scheduler_->OnCredentialsUpdated();
    507 
    508   // TODO(zea): pass the credential age to the debug info event listener.
    509 }
    510 
    511 void SyncManagerImpl::AddObserver(SyncManager::Observer* observer) {
    512   DCHECK(thread_checker_.CalledOnValidThread());
    513   observers_.AddObserver(observer);
    514 }
    515 
    516 void SyncManagerImpl::RemoveObserver(SyncManager::Observer* observer) {
    517   DCHECK(thread_checker_.CalledOnValidThread());
    518   observers_.RemoveObserver(observer);
    519 }
    520 
    521 void SyncManagerImpl::ShutdownOnSyncThread(ShutdownReason reason) {
    522   DCHECK(thread_checker_.CalledOnValidThread());
    523 
    524   // Prevent any in-flight method calls from running.  Also
    525   // invalidates |weak_handle_this_| and |change_observer_|.
    526   weak_ptr_factory_.InvalidateWeakPtrs();
    527   js_mutation_event_observer_.InvalidateWeakPtrs();
    528 
    529   scheduler_.reset();
    530   session_context_.reset();
    531 
    532   if (model_type_registry_)
    533     sync_encryption_handler_->RemoveObserver(model_type_registry_.get());
    534 
    535   model_type_registry_.reset();
    536 
    537   if (sync_encryption_handler_) {
    538     sync_encryption_handler_->RemoveObserver(&debug_info_event_listener_);
    539     sync_encryption_handler_->RemoveObserver(this);
    540   }
    541 
    542   SetJsEventHandler(WeakHandle<JsEventHandler>());
    543   RemoveObserver(&js_sync_manager_observer_);
    544 
    545   RemoveObserver(&debug_info_event_listener_);
    546 
    547   // |connection_manager_| may end up being NULL here in tests (in synchronous
    548   // initialization mode).
    549   //
    550   // TODO(akalin): Fix this behavior.
    551   if (connection_manager_)
    552     connection_manager_->RemoveListener(this);
    553   connection_manager_.reset();
    554 
    555   net::NetworkChangeNotifier::RemoveIPAddressObserver(this);
    556   net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
    557   observing_network_connectivity_changes_ = false;
    558 
    559   if (initialized_ && directory()) {
    560     directory()->SaveChanges();
    561   }
    562 
    563   share_.directory.reset();
    564 
    565   change_delegate_ = NULL;
    566 
    567   initialized_ = false;
    568 
    569   // We reset these here, since only now we know they will not be
    570   // accessed from other threads (since we shut down everything).
    571   change_observer_.Reset();
    572   weak_handle_this_.Reset();
    573 }
    574 
    575 void SyncManagerImpl::OnIPAddressChanged() {
    576   if (!observing_network_connectivity_changes_) {
    577     DVLOG(1) << "IP address change dropped.";
    578     return;
    579   }
    580   DVLOG(1) << "IP address change detected.";
    581   OnNetworkConnectivityChangedImpl();
    582 }
    583 
    584 void SyncManagerImpl::OnConnectionTypeChanged(
    585   net::NetworkChangeNotifier::ConnectionType) {
    586   if (!observing_network_connectivity_changes_) {
    587     DVLOG(1) << "Connection type change dropped.";
    588     return;
    589   }
    590   DVLOG(1) << "Connection type change detected.";
    591   OnNetworkConnectivityChangedImpl();
    592 }
    593 
    594 void SyncManagerImpl::OnNetworkConnectivityChangedImpl() {
    595   DCHECK(thread_checker_.CalledOnValidThread());
    596   scheduler_->OnConnectionStatusChange();
    597 }
    598 
    599 void SyncManagerImpl::OnServerConnectionEvent(
    600     const ServerConnectionEvent& event) {
    601   DCHECK(thread_checker_.CalledOnValidThread());
    602   if (event.connection_code ==
    603       HttpResponse::SERVER_CONNECTION_OK) {
    604     FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
    605                       OnConnectionStatusChange(CONNECTION_OK));
    606   }
    607 
    608   if (event.connection_code == HttpResponse::SYNC_AUTH_ERROR) {
    609     observing_network_connectivity_changes_ = false;
    610     FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
    611                       OnConnectionStatusChange(CONNECTION_AUTH_ERROR));
    612   }
    613 
    614   if (event.connection_code == HttpResponse::SYNC_SERVER_ERROR) {
    615     FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
    616                       OnConnectionStatusChange(CONNECTION_SERVER_ERROR));
    617   }
    618 }
    619 
    620 void SyncManagerImpl::HandleTransactionCompleteChangeEvent(
    621     ModelTypeSet models_with_changes) {
    622   // This notification happens immediately after the transaction mutex is
    623   // released. This allows work to be performed without blocking other threads
    624   // from acquiring a transaction.
    625   if (!change_delegate_)
    626     return;
    627 
    628   // Call commit.
    629   for (ModelTypeSet::Iterator it = models_with_changes.First();
    630        it.Good(); it.Inc()) {
    631     change_delegate_->OnChangesComplete(it.Get());
    632     change_observer_.Call(
    633         FROM_HERE,
    634         &SyncManager::ChangeObserver::OnChangesComplete,
    635         it.Get());
    636   }
    637 }
    638 
    639 ModelTypeSet
    640 SyncManagerImpl::HandleTransactionEndingChangeEvent(
    641     const ImmutableWriteTransactionInfo& write_transaction_info,
    642     syncable::BaseTransaction* trans) {
    643   // This notification happens immediately before a syncable WriteTransaction
    644   // falls out of scope. It happens while the channel mutex is still held,
    645   // and while the transaction mutex is held, so it cannot be re-entrant.
    646   if (!change_delegate_ || change_records_.empty())
    647     return ModelTypeSet();
    648 
    649   // This will continue the WriteTransaction using a read only wrapper.
    650   // This is the last chance for read to occur in the WriteTransaction
    651   // that's closing. This special ReadTransaction will not close the
    652   // underlying transaction.
    653   ReadTransaction read_trans(GetUserShare(), trans);
    654 
    655   ModelTypeSet models_with_changes;
    656   for (ChangeRecordMap::const_iterator it = change_records_.begin();
    657       it != change_records_.end(); ++it) {
    658     DCHECK(!it->second.Get().empty());
    659     ModelType type = ModelTypeFromInt(it->first);
    660     change_delegate_->
    661         OnChangesApplied(type, trans->directory()->GetTransactionVersion(type),
    662                          &read_trans, it->second);
    663     change_observer_.Call(FROM_HERE,
    664         &SyncManager::ChangeObserver::OnChangesApplied,
    665         type, write_transaction_info.Get().id, it->second);
    666     models_with_changes.Put(type);
    667   }
    668   change_records_.clear();
    669   return models_with_changes;
    670 }
    671 
    672 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi(
    673     const ImmutableWriteTransactionInfo& write_transaction_info,
    674     syncable::BaseTransaction* trans,
    675     std::vector<int64>* entries_changed) {
    676   // We have been notified about a user action changing a sync model.
    677   LOG_IF(WARNING, !change_records_.empty()) <<
    678       "CALCULATE_CHANGES called with unapplied old changes.";
    679 
    680   // The mutated model type, or UNSPECIFIED if nothing was mutated.
    681   ModelTypeSet mutated_model_types;
    682 
    683   const syncable::ImmutableEntryKernelMutationMap& mutations =
    684       write_transaction_info.Get().mutations;
    685   for (syncable::EntryKernelMutationMap::const_iterator it =
    686            mutations.Get().begin(); it != mutations.Get().end(); ++it) {
    687     if (!it->second.mutated.ref(syncable::IS_UNSYNCED)) {
    688       continue;
    689     }
    690 
    691     ModelType model_type =
    692         GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
    693     if (model_type < FIRST_REAL_MODEL_TYPE) {
    694       NOTREACHED() << "Permanent or underspecified item changed via syncapi.";
    695       continue;
    696     }
    697 
    698     // Found real mutation.
    699     if (model_type != UNSPECIFIED) {
    700       mutated_model_types.Put(model_type);
    701       entries_changed->push_back(it->second.mutated.ref(syncable::META_HANDLE));
    702     }
    703   }
    704 
    705   // Nudge if necessary.
    706   if (!mutated_model_types.Empty()) {
    707     if (weak_handle_this_.IsInitialized()) {
    708       weak_handle_this_.Call(FROM_HERE,
    709                              &SyncManagerImpl::RequestNudgeForDataTypes,
    710                              FROM_HERE,
    711                              mutated_model_types);
    712     } else {
    713       NOTREACHED();
    714     }
    715   }
    716 }
    717 
    718 void SyncManagerImpl::SetExtraChangeRecordData(int64 id,
    719     ModelType type, ChangeReorderBuffer* buffer,
    720     Cryptographer* cryptographer, const syncable::EntryKernel& original,
    721     bool existed_before, bool exists_now) {
    722   // If this is a deletion and the datatype was encrypted, we need to decrypt it
    723   // and attach it to the buffer.
    724   if (!exists_now && existed_before) {
    725     sync_pb::EntitySpecifics original_specifics(original.ref(SPECIFICS));
    726     if (type == PASSWORDS) {
    727       // Passwords must use their own legacy ExtraPasswordChangeRecordData.
    728       scoped_ptr<sync_pb::PasswordSpecificsData> data(
    729           DecryptPasswordSpecifics(original_specifics, cryptographer));
    730       if (!data) {
    731         NOTREACHED();
    732         return;
    733       }
    734       buffer->SetExtraDataForId(id, new ExtraPasswordChangeRecordData(*data));
    735     } else if (original_specifics.has_encrypted()) {
    736       // All other datatypes can just create a new unencrypted specifics and
    737       // attach it.
    738       const sync_pb::EncryptedData& encrypted = original_specifics.encrypted();
    739       if (!cryptographer->Decrypt(encrypted, &original_specifics)) {
    740         NOTREACHED();
    741         return;
    742       }
    743     }
    744     buffer->SetSpecificsForId(id, original_specifics);
    745   }
    746 }
    747 
    748 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer(
    749     const ImmutableWriteTransactionInfo& write_transaction_info,
    750     syncable::BaseTransaction* trans,
    751     std::vector<int64>* entries_changed) {
    752   // We only expect one notification per sync step, so change_buffers_ should
    753   // contain no pending entries.
    754   LOG_IF(WARNING, !change_records_.empty()) <<
    755       "CALCULATE_CHANGES called with unapplied old changes.";
    756 
    757   ChangeReorderBuffer change_buffers[MODEL_TYPE_COUNT];
    758 
    759   Cryptographer* crypto = directory()->GetCryptographer(trans);
    760   const syncable::ImmutableEntryKernelMutationMap& mutations =
    761       write_transaction_info.Get().mutations;
    762   for (syncable::EntryKernelMutationMap::const_iterator it =
    763            mutations.Get().begin(); it != mutations.Get().end(); ++it) {
    764     bool existed_before = !it->second.original.ref(syncable::IS_DEL);
    765     bool exists_now = !it->second.mutated.ref(syncable::IS_DEL);
    766 
    767     // Omit items that aren't associated with a model.
    768     ModelType type =
    769         GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
    770     if (type < FIRST_REAL_MODEL_TYPE)
    771       continue;
    772 
    773     int64 handle = it->first;
    774     if (exists_now && !existed_before)
    775       change_buffers[type].PushAddedItem(handle);
    776     else if (!exists_now && existed_before)
    777       change_buffers[type].PushDeletedItem(handle);
    778     else if (exists_now && existed_before &&
    779              VisiblePropertiesDiffer(it->second, crypto)) {
    780       change_buffers[type].PushUpdatedItem(handle);
    781     }
    782 
    783     SetExtraChangeRecordData(handle, type, &change_buffers[type], crypto,
    784                              it->second.original, existed_before, exists_now);
    785   }
    786 
    787   ReadTransaction read_trans(GetUserShare(), trans);
    788   for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
    789     if (!change_buffers[i].IsEmpty()) {
    790       if (change_buffers[i].GetAllChangesInTreeOrder(&read_trans,
    791                                                      &(change_records_[i]))) {
    792         for (size_t j = 0; j < change_records_[i].Get().size(); ++j)
    793           entries_changed->push_back((change_records_[i].Get())[j].id);
    794       }
    795       if (change_records_[i].Get().empty())
    796         change_records_.erase(i);
    797     }
    798   }
    799 }
    800 
    801 void SyncManagerImpl::RequestNudgeForDataTypes(
    802     const tracked_objects::Location& nudge_location,
    803     ModelTypeSet types) {
    804   debug_info_event_listener_.OnNudgeFromDatatype(types.First().Get());
    805 
    806   scheduler_->ScheduleLocalNudge(types, nudge_location);
    807 }
    808 
    809 void SyncManagerImpl::NudgeForInitialDownload(syncer::ModelType type) {
    810   DCHECK(thread_checker_.CalledOnValidThread());
    811   scheduler_->ScheduleInitialSyncNudge(type);
    812 }
    813 
    814 void SyncManagerImpl::NudgeForCommit(syncer::ModelType type) {
    815   DCHECK(thread_checker_.CalledOnValidThread());
    816   RequestNudgeForDataTypes(FROM_HERE, ModelTypeSet(type));
    817 }
    818 
    819 void SyncManagerImpl::NudgeForRefresh(syncer::ModelType type) {
    820   DCHECK(thread_checker_.CalledOnValidThread());
    821   RefreshTypes(ModelTypeSet(type));
    822 }
    823 
    824 void SyncManagerImpl::OnSyncCycleEvent(const SyncCycleEvent& event) {
    825   DCHECK(thread_checker_.CalledOnValidThread());
    826   // Only send an event if this is due to a cycle ending and this cycle
    827   // concludes a canonical "sync" process; that is, based on what is known
    828   // locally we are "all happy" and up-to-date.  There may be new changes on
    829   // the server, but we'll get them on a subsequent sync.
    830   //
    831   // Notifications are sent at the end of every sync cycle, regardless of
    832   // whether we should sync again.
    833   if (event.what_happened == SyncCycleEvent::SYNC_CYCLE_ENDED) {
    834     if (!initialized_) {
    835       DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not "
    836                << "initialized";
    837       return;
    838     }
    839 
    840     DVLOG(1) << "Sending OnSyncCycleCompleted";
    841     FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
    842                       OnSyncCycleCompleted(event.snapshot));
    843   }
    844 }
    845 
    846 void SyncManagerImpl::OnActionableError(const SyncProtocolError& error) {
    847   FOR_EACH_OBSERVER(
    848       SyncManager::Observer, observers_,
    849       OnActionableError(error));
    850 }
    851 
    852 void SyncManagerImpl::OnRetryTimeChanged(base::Time) {}
    853 
    854 void SyncManagerImpl::OnThrottledTypesChanged(ModelTypeSet) {}
    855 
    856 void SyncManagerImpl::OnMigrationRequested(ModelTypeSet types) {
    857   FOR_EACH_OBSERVER(
    858       SyncManager::Observer, observers_,
    859       OnMigrationRequested(types));
    860 }
    861 
    862 void SyncManagerImpl::OnProtocolEvent(const ProtocolEvent& event) {
    863   protocol_event_buffer_.RecordProtocolEvent(event);
    864   FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
    865                     OnProtocolEvent(event));
    866 }
    867 
    868 void SyncManagerImpl::SetJsEventHandler(
    869     const WeakHandle<JsEventHandler>& event_handler) {
    870   js_sync_manager_observer_.SetJsEventHandler(event_handler);
    871   js_mutation_event_observer_.SetJsEventHandler(event_handler);
    872   js_sync_encryption_handler_observer_.SetJsEventHandler(event_handler);
    873 }
    874 
    875 scoped_ptr<base::ListValue> SyncManagerImpl::GetAllNodesForType(
    876     syncer::ModelType type) {
    877   DirectoryTypeDebugInfoEmitterMap* emitter_map =
    878       model_type_registry_->directory_type_debug_info_emitter_map();
    879   DirectoryTypeDebugInfoEmitterMap::iterator it = emitter_map->find(type);
    880 
    881   if (it == emitter_map->end()) {
    882     // This can happen in some cases.  The UI thread makes requests of us
    883     // when it doesn't really know which types are enabled or disabled.
    884     DLOG(WARNING) << "Asked to return debug info for invalid type "
    885                   << ModelTypeToString(type);
    886     return scoped_ptr<base::ListValue>(new base::ListValue());
    887   }
    888 
    889   return it->second->GetAllNodes();
    890 }
    891 
    892 void SyncManagerImpl::SetInvalidatorEnabled(bool invalidator_enabled) {
    893   DCHECK(thread_checker_.CalledOnValidThread());
    894 
    895   DVLOG(1) << "Invalidator enabled state is now: " << invalidator_enabled;
    896   allstatus_.SetNotificationsEnabled(invalidator_enabled);
    897   scheduler_->SetNotificationsEnabled(invalidator_enabled);
    898 }
    899 
    900 void SyncManagerImpl::OnIncomingInvalidation(
    901     syncer::ModelType type,
    902     scoped_ptr<InvalidationInterface> invalidation) {
    903   DCHECK(thread_checker_.CalledOnValidThread());
    904 
    905   scheduler_->ScheduleInvalidationNudge(
    906       type,
    907       invalidation.Pass(),
    908       FROM_HERE);
    909 }
    910 
    911 void SyncManagerImpl::RefreshTypes(ModelTypeSet types) {
    912   DCHECK(thread_checker_.CalledOnValidThread());
    913   if (types.Empty()) {
    914     LOG(WARNING) << "Sync received refresh request with no types specified.";
    915   } else {
    916     scheduler_->ScheduleLocalRefreshRequest(
    917         types, FROM_HERE);
    918   }
    919 }
    920 
    921 SyncStatus SyncManagerImpl::GetDetailedStatus() const {
    922   return allstatus_.status();
    923 }
    924 
    925 void SyncManagerImpl::SaveChanges() {
    926   directory()->SaveChanges();
    927 }
    928 
    929 UserShare* SyncManagerImpl::GetUserShare() {
    930   DCHECK(initialized_);
    931   return &share_;
    932 }
    933 
    934 syncer::SyncContextProxy* SyncManagerImpl::GetSyncContextProxy() {
    935   DCHECK(initialized_);
    936   return sync_context_proxy_.get();
    937 }
    938 
    939 const std::string SyncManagerImpl::cache_guid() {
    940   DCHECK(initialized_);
    941   return directory()->cache_guid();
    942 }
    943 
    944 bool SyncManagerImpl::ReceivedExperiment(Experiments* experiments) {
    945   ReadTransaction trans(FROM_HERE, GetUserShare());
    946   ReadNode nigori_node(&trans);
    947   if (nigori_node.InitTypeRoot(NIGORI) != BaseNode::INIT_OK) {
    948     DVLOG(1) << "Couldn't find Nigori node.";
    949     return false;
    950   }
    951   bool found_experiment = false;
    952 
    953   ReadNode favicon_sync_node(&trans);
    954   if (favicon_sync_node.InitByClientTagLookup(
    955           syncer::EXPERIMENTS,
    956           syncer::kFaviconSyncTag) == BaseNode::INIT_OK) {
    957     experiments->favicon_sync_limit =
    958         favicon_sync_node.GetExperimentsSpecifics().favicon_sync().
    959             favicon_sync_limit();
    960     found_experiment = true;
    961   }
    962 
    963   ReadNode pre_commit_update_avoidance_node(&trans);
    964   if (pre_commit_update_avoidance_node.InitByClientTagLookup(
    965           syncer::EXPERIMENTS,
    966           syncer::kPreCommitUpdateAvoidanceTag) == BaseNode::INIT_OK) {
    967     session_context_->set_server_enabled_pre_commit_update_avoidance(
    968         pre_commit_update_avoidance_node.GetExperimentsSpecifics().
    969             pre_commit_update_avoidance().enabled());
    970     // We don't bother setting found_experiment.  The frontend doesn't need to
    971     // know about this.
    972   }
    973 
    974   ReadNode gcm_channel_node(&trans);
    975   if (gcm_channel_node.InitByClientTagLookup(
    976           syncer::EXPERIMENTS,
    977           syncer::kGCMChannelTag) == BaseNode::INIT_OK &&
    978       gcm_channel_node.GetExperimentsSpecifics().gcm_channel().has_enabled()) {
    979     experiments->gcm_channel_state =
    980         (gcm_channel_node.GetExperimentsSpecifics().gcm_channel().enabled() ?
    981          syncer::Experiments::ENABLED : syncer::Experiments::SUPPRESSED);
    982     found_experiment = true;
    983   }
    984 
    985   ReadNode enhanced_bookmarks_node(&trans);
    986   if (enhanced_bookmarks_node.InitByClientTagLookup(
    987           syncer::EXPERIMENTS, syncer::kEnhancedBookmarksTag) ==
    988           BaseNode::INIT_OK &&
    989       enhanced_bookmarks_node.GetExperimentsSpecifics()
    990           .has_enhanced_bookmarks()) {
    991     const sync_pb::EnhancedBookmarksFlags& enhanced_bookmarks =
    992         enhanced_bookmarks_node.GetExperimentsSpecifics().enhanced_bookmarks();
    993     if (enhanced_bookmarks.has_enabled())
    994       experiments->enhanced_bookmarks_enabled = enhanced_bookmarks.enabled();
    995     if (enhanced_bookmarks.has_extension_id()) {
    996       experiments->enhanced_bookmarks_ext_id =
    997           enhanced_bookmarks.extension_id();
    998     }
    999     found_experiment = true;
   1000   }
   1001 
   1002   ReadNode gcm_invalidations_node(&trans);
   1003   if (gcm_invalidations_node.InitByClientTagLookup(
   1004           syncer::EXPERIMENTS, syncer::kGCMInvalidationsTag) ==
   1005       BaseNode::INIT_OK) {
   1006     const sync_pb::GcmInvalidationsFlags& gcm_invalidations =
   1007         gcm_invalidations_node.GetExperimentsSpecifics().gcm_invalidations();
   1008     if (gcm_invalidations.has_enabled()) {
   1009       experiments->gcm_invalidations_enabled = gcm_invalidations.enabled();
   1010       found_experiment = true;
   1011     }
   1012   }
   1013 
   1014   return found_experiment;
   1015 }
   1016 
   1017 bool SyncManagerImpl::HasUnsyncedItems() {
   1018   ReadTransaction trans(FROM_HERE, GetUserShare());
   1019   return (trans.GetWrappedTrans()->directory()->unsynced_entity_count() != 0);
   1020 }
   1021 
   1022 SyncEncryptionHandler* SyncManagerImpl::GetEncryptionHandler() {
   1023   return sync_encryption_handler_.get();
   1024 }
   1025 
   1026 ScopedVector<syncer::ProtocolEvent>
   1027     SyncManagerImpl::GetBufferedProtocolEvents() {
   1028   return protocol_event_buffer_.GetBufferedProtocolEvents();
   1029 }
   1030 
   1031 void SyncManagerImpl::RegisterDirectoryTypeDebugInfoObserver(
   1032     syncer::TypeDebugInfoObserver* observer) {
   1033   model_type_registry_->RegisterDirectoryTypeDebugInfoObserver(observer);
   1034 }
   1035 
   1036 void SyncManagerImpl::UnregisterDirectoryTypeDebugInfoObserver(
   1037     syncer::TypeDebugInfoObserver* observer) {
   1038   model_type_registry_->UnregisterDirectoryTypeDebugInfoObserver(observer);
   1039 }
   1040 
   1041 bool SyncManagerImpl::HasDirectoryTypeDebugInfoObserver(
   1042     syncer::TypeDebugInfoObserver* observer) {
   1043   return model_type_registry_->HasDirectoryTypeDebugInfoObserver(observer);
   1044 }
   1045 
   1046 void SyncManagerImpl::RequestEmitDebugInfo() {
   1047   model_type_registry_->RequestEmitDebugInfo();
   1048 }
   1049 
   1050 }  // namespace syncer
   1051