1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "sync/internal_api/sync_manager_impl.h" 6 7 #include <string> 8 9 #include "base/base64.h" 10 #include "base/bind.h" 11 #include "base/callback.h" 12 #include "base/compiler_specific.h" 13 #include "base/json/json_writer.h" 14 #include "base/memory/ref_counted.h" 15 #include "base/metrics/histogram.h" 16 #include "base/observer_list.h" 17 #include "base/strings/string_number_conversions.h" 18 #include "base/values.h" 19 #include "sync/engine/sync_scheduler.h" 20 #include "sync/engine/syncer_types.h" 21 #include "sync/internal_api/change_reorder_buffer.h" 22 #include "sync/internal_api/public/base/cancelation_signal.h" 23 #include "sync/internal_api/public/base/model_type.h" 24 #include "sync/internal_api/public/base_node.h" 25 #include "sync/internal_api/public/configure_reason.h" 26 #include "sync/internal_api/public/engine/polling_constants.h" 27 #include "sync/internal_api/public/http_post_provider_factory.h" 28 #include "sync/internal_api/public/internal_components_factory.h" 29 #include "sync/internal_api/public/read_node.h" 30 #include "sync/internal_api/public/read_transaction.h" 31 #include "sync/internal_api/public/sync_core_proxy.h" 32 #include "sync/internal_api/public/user_share.h" 33 #include "sync/internal_api/public/util/experiments.h" 34 #include "sync/internal_api/public/write_node.h" 35 #include "sync/internal_api/public/write_transaction.h" 36 #include "sync/internal_api/sync_core.h" 37 #include "sync/internal_api/sync_core_proxy_impl.h" 38 #include "sync/internal_api/syncapi_internal.h" 39 #include "sync/internal_api/syncapi_server_connection_manager.h" 40 #include "sync/notifier/invalidation_util.h" 41 #include "sync/notifier/invalidator.h" 42 #include "sync/notifier/object_id_invalidation_map.h" 43 #include "sync/protocol/proto_value_conversions.h" 44 #include "sync/protocol/sync.pb.h" 45 #include "sync/sessions/directory_type_debug_info_emitter.h" 46 #include "sync/syncable/directory.h" 47 #include "sync/syncable/entry.h" 48 #include "sync/syncable/in_memory_directory_backing_store.h" 49 #include "sync/syncable/on_disk_directory_backing_store.h" 50 51 using base::TimeDelta; 52 using sync_pb::GetUpdatesCallerInfo; 53 54 namespace syncer { 55 56 using sessions::SyncSessionContext; 57 using syncable::ImmutableWriteTransactionInfo; 58 using syncable::SPECIFICS; 59 using syncable::UNIQUE_POSITION; 60 61 namespace { 62 63 // Delays for syncer nudges. 64 static const int kDefaultNudgeDelayMilliseconds = 200; 65 static const int kPreferencesNudgeDelayMilliseconds = 2000; 66 static const int kSyncRefreshDelayMsec = 500; 67 static const int kSyncSchedulerDelayMsec = 250; 68 69 GetUpdatesCallerInfo::GetUpdatesSource GetSourceFromReason( 70 ConfigureReason reason) { 71 switch (reason) { 72 case CONFIGURE_REASON_RECONFIGURATION: 73 return GetUpdatesCallerInfo::RECONFIGURATION; 74 case CONFIGURE_REASON_MIGRATION: 75 return GetUpdatesCallerInfo::MIGRATION; 76 case CONFIGURE_REASON_NEW_CLIENT: 77 return GetUpdatesCallerInfo::NEW_CLIENT; 78 case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE: 79 case CONFIGURE_REASON_CRYPTO: 80 return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE; 81 default: 82 NOTREACHED(); 83 } 84 return GetUpdatesCallerInfo::UNKNOWN; 85 } 86 87 } // namespace 88 89 // A class to calculate nudge delays for types. 90 class NudgeStrategy { 91 public: 92 static TimeDelta GetNudgeDelayTimeDelta(const ModelType& model_type, 93 SyncManagerImpl* core) { 94 NudgeDelayStrategy delay_type = GetNudgeDelayStrategy(model_type); 95 return GetNudgeDelayTimeDeltaFromType(delay_type, 96 model_type, 97 core); 98 } 99 100 private: 101 // Possible types of nudge delay for datatypes. 102 // Note: These are just hints. If a sync happens then all dirty entries 103 // would be committed as part of the sync. 104 enum NudgeDelayStrategy { 105 // Sync right away. 106 IMMEDIATE, 107 108 // Sync this change while syncing another change. 109 ACCOMPANY_ONLY, 110 111 // The datatype does not use one of the predefined wait times but defines 112 // its own wait time logic for nudge. 113 CUSTOM, 114 }; 115 116 static NudgeDelayStrategy GetNudgeDelayStrategy(const ModelType& type) { 117 switch (type) { 118 case AUTOFILL: 119 return ACCOMPANY_ONLY; 120 case PREFERENCES: 121 case SESSIONS: 122 case FAVICON_IMAGES: 123 case FAVICON_TRACKING: 124 return CUSTOM; 125 default: 126 return IMMEDIATE; 127 } 128 } 129 130 static TimeDelta GetNudgeDelayTimeDeltaFromType( 131 const NudgeDelayStrategy& delay_type, const ModelType& model_type, 132 const SyncManagerImpl* core) { 133 CHECK(core); 134 TimeDelta delay = TimeDelta::FromMilliseconds( 135 kDefaultNudgeDelayMilliseconds); 136 switch (delay_type) { 137 case IMMEDIATE: 138 delay = TimeDelta::FromMilliseconds( 139 kDefaultNudgeDelayMilliseconds); 140 break; 141 case ACCOMPANY_ONLY: 142 delay = TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds); 143 break; 144 case CUSTOM: 145 switch (model_type) { 146 case PREFERENCES: 147 delay = TimeDelta::FromMilliseconds( 148 kPreferencesNudgeDelayMilliseconds); 149 break; 150 case SESSIONS: 151 case FAVICON_IMAGES: 152 case FAVICON_TRACKING: 153 delay = core->scheduler()->GetSessionsCommitDelay(); 154 break; 155 default: 156 NOTREACHED(); 157 } 158 break; 159 default: 160 NOTREACHED(); 161 } 162 return delay; 163 } 164 }; 165 166 SyncManagerImpl::SyncManagerImpl(const std::string& name) 167 : name_(name), 168 change_delegate_(NULL), 169 initialized_(false), 170 observing_network_connectivity_changes_(false), 171 invalidator_state_(DEFAULT_INVALIDATION_ERROR), 172 report_unrecoverable_error_function_(NULL), 173 weak_ptr_factory_(this) { 174 // Pre-fill |notification_info_map_|. 175 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) { 176 notification_info_map_.insert( 177 std::make_pair(ModelTypeFromInt(i), NotificationInfo())); 178 } 179 } 180 181 SyncManagerImpl::~SyncManagerImpl() { 182 DCHECK(thread_checker_.CalledOnValidThread()); 183 CHECK(!initialized_); 184 } 185 186 SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {} 187 SyncManagerImpl::NotificationInfo::~NotificationInfo() {} 188 189 base::DictionaryValue* SyncManagerImpl::NotificationInfo::ToValue() const { 190 base::DictionaryValue* value = new base::DictionaryValue(); 191 value->SetInteger("totalCount", total_count); 192 value->SetString("payload", payload); 193 return value; 194 } 195 196 bool SyncManagerImpl::VisiblePositionsDiffer( 197 const syncable::EntryKernelMutation& mutation) const { 198 const syncable::EntryKernel& a = mutation.original; 199 const syncable::EntryKernel& b = mutation.mutated; 200 if (!b.ShouldMaintainPosition()) 201 return false; 202 if (!a.ref(UNIQUE_POSITION).Equals(b.ref(UNIQUE_POSITION))) 203 return true; 204 if (a.ref(syncable::PARENT_ID) != b.ref(syncable::PARENT_ID)) 205 return true; 206 return false; 207 } 208 209 bool SyncManagerImpl::VisiblePropertiesDiffer( 210 const syncable::EntryKernelMutation& mutation, 211 Cryptographer* cryptographer) const { 212 const syncable::EntryKernel& a = mutation.original; 213 const syncable::EntryKernel& b = mutation.mutated; 214 const sync_pb::EntitySpecifics& a_specifics = a.ref(SPECIFICS); 215 const sync_pb::EntitySpecifics& b_specifics = b.ref(SPECIFICS); 216 DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics), 217 GetModelTypeFromSpecifics(b_specifics)); 218 ModelType model_type = GetModelTypeFromSpecifics(b_specifics); 219 // Suppress updates to items that aren't tracked by any browser model. 220 if (model_type < FIRST_REAL_MODEL_TYPE || 221 !a.ref(syncable::UNIQUE_SERVER_TAG).empty()) { 222 return false; 223 } 224 if (a.ref(syncable::IS_DIR) != b.ref(syncable::IS_DIR)) 225 return true; 226 if (!AreSpecificsEqual(cryptographer, 227 a.ref(syncable::SPECIFICS), 228 b.ref(syncable::SPECIFICS))) { 229 return true; 230 } 231 // We only care if the name has changed if neither specifics is encrypted 232 // (encrypted nodes blow away the NON_UNIQUE_NAME). 233 if (!a_specifics.has_encrypted() && !b_specifics.has_encrypted() && 234 a.ref(syncable::NON_UNIQUE_NAME) != b.ref(syncable::NON_UNIQUE_NAME)) 235 return true; 236 if (VisiblePositionsDiffer(mutation)) 237 return true; 238 return false; 239 } 240 241 ModelTypeSet SyncManagerImpl::InitialSyncEndedTypes() { 242 return directory()->InitialSyncEndedTypes(); 243 } 244 245 ModelTypeSet SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken( 246 ModelTypeSet types) { 247 ModelTypeSet result; 248 for (ModelTypeSet::Iterator i = types.First(); i.Good(); i.Inc()) { 249 sync_pb::DataTypeProgressMarker marker; 250 directory()->GetDownloadProgress(i.Get(), &marker); 251 252 if (marker.token().empty()) 253 result.Put(i.Get()); 254 } 255 return result; 256 } 257 258 void SyncManagerImpl::ConfigureSyncer( 259 ConfigureReason reason, 260 ModelTypeSet to_download, 261 ModelTypeSet to_purge, 262 ModelTypeSet to_journal, 263 ModelTypeSet to_unapply, 264 const ModelSafeRoutingInfo& new_routing_info, 265 const base::Closure& ready_task, 266 const base::Closure& retry_task) { 267 DCHECK(thread_checker_.CalledOnValidThread()); 268 DCHECK(!ready_task.is_null()); 269 DCHECK(!retry_task.is_null()); 270 271 DVLOG(1) << "Configuring -" 272 << "\n\t" << "current types: " 273 << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info)) 274 << "\n\t" << "types to download: " 275 << ModelTypeSetToString(to_download) 276 << "\n\t" << "types to purge: " 277 << ModelTypeSetToString(to_purge) 278 << "\n\t" << "types to journal: " 279 << ModelTypeSetToString(to_journal) 280 << "\n\t" << "types to unapply: " 281 << ModelTypeSetToString(to_unapply); 282 if (!PurgeDisabledTypes(to_purge, 283 to_journal, 284 to_unapply)) { 285 // We failed to cleanup the types. Invoke the ready task without actually 286 // configuring any types. The caller should detect this as a configuration 287 // failure and act appropriately. 288 ready_task.Run(); 289 return; 290 } 291 292 ConfigurationParams params(GetSourceFromReason(reason), 293 to_download, 294 new_routing_info, 295 ready_task, 296 retry_task); 297 298 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE); 299 scheduler_->ScheduleConfiguration(params); 300 } 301 302 void SyncManagerImpl::Init( 303 const base::FilePath& database_location, 304 const WeakHandle<JsEventHandler>& event_handler, 305 const std::string& sync_server_and_path, 306 int port, 307 bool use_ssl, 308 scoped_ptr<HttpPostProviderFactory> post_factory, 309 const std::vector<scoped_refptr<ModelSafeWorker> >& workers, 310 ExtensionsActivity* extensions_activity, 311 SyncManager::ChangeDelegate* change_delegate, 312 const SyncCredentials& credentials, 313 const std::string& invalidator_client_id, 314 const std::string& restored_key_for_bootstrapping, 315 const std::string& restored_keystore_key_for_bootstrapping, 316 InternalComponentsFactory* internal_components_factory, 317 Encryptor* encryptor, 318 scoped_ptr<UnrecoverableErrorHandler> unrecoverable_error_handler, 319 ReportUnrecoverableErrorFunction report_unrecoverable_error_function, 320 CancelationSignal* cancelation_signal) { 321 CHECK(!initialized_); 322 DCHECK(thread_checker_.CalledOnValidThread()); 323 DCHECK(post_factory.get()); 324 DCHECK(!credentials.email.empty()); 325 DCHECK(!credentials.sync_token.empty()); 326 DCHECK(cancelation_signal); 327 DVLOG(1) << "SyncManager starting Init..."; 328 329 weak_handle_this_ = MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()); 330 331 change_delegate_ = change_delegate; 332 333 AddObserver(&js_sync_manager_observer_); 334 SetJsEventHandler(event_handler); 335 336 AddObserver(&debug_info_event_listener_); 337 338 database_path_ = database_location.Append( 339 syncable::Directory::kSyncDatabaseFilename); 340 unrecoverable_error_handler_ = unrecoverable_error_handler.Pass(); 341 report_unrecoverable_error_function_ = report_unrecoverable_error_function; 342 343 allstatus_.SetHasKeystoreKey( 344 !restored_keystore_key_for_bootstrapping.empty()); 345 sync_encryption_handler_.reset(new SyncEncryptionHandlerImpl( 346 &share_, 347 encryptor, 348 restored_key_for_bootstrapping, 349 restored_keystore_key_for_bootstrapping)); 350 sync_encryption_handler_->AddObserver(this); 351 sync_encryption_handler_->AddObserver(&debug_info_event_listener_); 352 sync_encryption_handler_->AddObserver(&js_sync_encryption_handler_observer_); 353 354 base::FilePath absolute_db_path = database_path_; 355 DCHECK(absolute_db_path.IsAbsolute()); 356 357 scoped_ptr<syncable::DirectoryBackingStore> backing_store = 358 internal_components_factory->BuildDirectoryBackingStore( 359 credentials.email, absolute_db_path).Pass(); 360 361 DCHECK(backing_store.get()); 362 const std::string& username = credentials.email; 363 share_.directory.reset( 364 new syncable::Directory( 365 backing_store.release(), 366 unrecoverable_error_handler_.get(), 367 report_unrecoverable_error_function_, 368 sync_encryption_handler_.get(), 369 sync_encryption_handler_->GetCryptographerUnsafe())); 370 371 DVLOG(1) << "Username: " << username; 372 if (!OpenDirectory(username)) { 373 NotifyInitializationFailure(); 374 LOG(ERROR) << "Sync manager initialization failed!"; 375 return; 376 } 377 378 connection_manager_.reset(new SyncAPIServerConnectionManager( 379 sync_server_and_path, port, use_ssl, 380 post_factory.release(), cancelation_signal)); 381 connection_manager_->set_client_id(directory()->cache_guid()); 382 connection_manager_->AddListener(this); 383 384 std::string sync_id = directory()->cache_guid(); 385 386 DVLOG(1) << "Setting sync client ID: " << sync_id; 387 allstatus_.SetSyncId(sync_id); 388 DVLOG(1) << "Setting invalidator client ID: " << invalidator_client_id; 389 allstatus_.SetInvalidatorClientId(invalidator_client_id); 390 391 model_type_registry_.reset(new ModelTypeRegistry(workers, directory())); 392 393 sync_core_.reset(new SyncCore(model_type_registry_.get())); 394 395 // Bind the SyncCore WeakPtr to this thread. This helps us crash earlier if 396 // the pointer is misused in debug mode. 397 base::WeakPtr<SyncCore> weak_core = sync_core_->AsWeakPtr(); 398 weak_core.get(); 399 400 sync_core_proxy_.reset( 401 new SyncCoreProxyImpl(base::MessageLoopProxy::current(), weak_core)); 402 403 // Build a SyncSessionContext and store the worker in it. 404 DVLOG(1) << "Sync is bringing up SyncSessionContext."; 405 std::vector<SyncEngineEventListener*> listeners; 406 listeners.push_back(&allstatus_); 407 listeners.push_back(this); 408 session_context_ = internal_components_factory->BuildContext( 409 connection_manager_.get(), 410 directory(), 411 extensions_activity, 412 listeners, 413 &debug_info_event_listener_, 414 model_type_registry_.get(), 415 invalidator_client_id).Pass(); 416 session_context_->set_account_name(credentials.email); 417 scheduler_ = internal_components_factory->BuildScheduler( 418 name_, session_context_.get(), cancelation_signal).Pass(); 419 420 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE); 421 422 initialized_ = true; 423 424 net::NetworkChangeNotifier::AddIPAddressObserver(this); 425 net::NetworkChangeNotifier::AddConnectionTypeObserver(this); 426 observing_network_connectivity_changes_ = true; 427 428 UpdateCredentials(credentials); 429 430 NotifyInitializationSuccess(); 431 } 432 433 void SyncManagerImpl::NotifyInitializationSuccess() { 434 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 435 OnInitializationComplete( 436 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()), 437 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()), 438 true, InitialSyncEndedTypes())); 439 } 440 441 void SyncManagerImpl::NotifyInitializationFailure() { 442 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 443 OnInitializationComplete( 444 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()), 445 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()), 446 false, ModelTypeSet())); 447 } 448 449 void SyncManagerImpl::OnPassphraseRequired( 450 PassphraseRequiredReason reason, 451 const sync_pb::EncryptedData& pending_keys) { 452 // Does nothing. 453 } 454 455 void SyncManagerImpl::OnPassphraseAccepted() { 456 // Does nothing. 457 } 458 459 void SyncManagerImpl::OnBootstrapTokenUpdated( 460 const std::string& bootstrap_token, 461 BootstrapTokenType type) { 462 if (type == KEYSTORE_BOOTSTRAP_TOKEN) 463 allstatus_.SetHasKeystoreKey(true); 464 } 465 466 void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types, 467 bool encrypt_everything) { 468 allstatus_.SetEncryptedTypes(encrypted_types); 469 } 470 471 void SyncManagerImpl::OnEncryptionComplete() { 472 // Does nothing. 473 } 474 475 void SyncManagerImpl::OnCryptographerStateChanged( 476 Cryptographer* cryptographer) { 477 allstatus_.SetCryptographerReady(cryptographer->is_ready()); 478 allstatus_.SetCryptoHasPendingKeys(cryptographer->has_pending_keys()); 479 allstatus_.SetKeystoreMigrationTime( 480 sync_encryption_handler_->migration_time()); 481 } 482 483 void SyncManagerImpl::OnPassphraseTypeChanged( 484 PassphraseType type, 485 base::Time explicit_passphrase_time) { 486 allstatus_.SetPassphraseType(type); 487 allstatus_.SetKeystoreMigrationTime( 488 sync_encryption_handler_->migration_time()); 489 } 490 491 void SyncManagerImpl::StartSyncingNormally( 492 const ModelSafeRoutingInfo& routing_info) { 493 // Start the sync scheduler. 494 // TODO(sync): We always want the newest set of routes when we switch back 495 // to normal mode. Figure out how to enforce set_routing_info is always 496 // appropriately set and that it's only modified when switching to normal 497 // mode. 498 DCHECK(thread_checker_.CalledOnValidThread()); 499 session_context_->SetRoutingInfo(routing_info); 500 scheduler_->Start(SyncScheduler::NORMAL_MODE); 501 } 502 503 syncable::Directory* SyncManagerImpl::directory() { 504 return share_.directory.get(); 505 } 506 507 const SyncScheduler* SyncManagerImpl::scheduler() const { 508 return scheduler_.get(); 509 } 510 511 bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const { 512 return connection_manager_->HasInvalidAuthToken(); 513 } 514 515 bool SyncManagerImpl::OpenDirectory(const std::string& username) { 516 DCHECK(!initialized_) << "Should only happen once"; 517 518 // Set before Open(). 519 change_observer_ = MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr()); 520 WeakHandle<syncable::TransactionObserver> transaction_observer( 521 MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr())); 522 523 syncable::DirOpenResult open_result = syncable::NOT_INITIALIZED; 524 open_result = directory()->Open(username, this, transaction_observer); 525 if (open_result != syncable::OPENED) { 526 LOG(ERROR) << "Could not open share for:" << username; 527 return false; 528 } 529 530 // Unapplied datatypes (those that do not have initial sync ended set) get 531 // re-downloaded during any configuration. But, it's possible for a datatype 532 // to have a progress marker but not have initial sync ended yet, making 533 // it a candidate for migration. This is a problem, as the DataTypeManager 534 // does not support a migration while it's already in the middle of a 535 // configuration. As a result, any partially synced datatype can stall the 536 // DTM, waiting for the configuration to complete, which it never will due 537 // to the migration error. In addition, a partially synced nigori will 538 // trigger the migration logic before the backend is initialized, resulting 539 // in crashes. We therefore detect and purge any partially synced types as 540 // part of initialization. 541 if (!PurgePartiallySyncedTypes()) 542 return false; 543 544 return true; 545 } 546 547 bool SyncManagerImpl::PurgePartiallySyncedTypes() { 548 ModelTypeSet partially_synced_types = ModelTypeSet::All(); 549 partially_synced_types.RemoveAll(InitialSyncEndedTypes()); 550 partially_synced_types.RemoveAll(GetTypesWithEmptyProgressMarkerToken( 551 ModelTypeSet::All())); 552 553 DVLOG(1) << "Purging partially synced types " 554 << ModelTypeSetToString(partially_synced_types); 555 UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes", 556 partially_synced_types.Size()); 557 if (partially_synced_types.Empty()) 558 return true; 559 return directory()->PurgeEntriesWithTypeIn(partially_synced_types, 560 ModelTypeSet(), 561 ModelTypeSet()); 562 } 563 564 bool SyncManagerImpl::PurgeDisabledTypes( 565 ModelTypeSet to_purge, 566 ModelTypeSet to_journal, 567 ModelTypeSet to_unapply) { 568 if (to_purge.Empty()) 569 return true; 570 DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge); 571 DCHECK(to_purge.HasAll(to_journal)); 572 DCHECK(to_purge.HasAll(to_unapply)); 573 return directory()->PurgeEntriesWithTypeIn(to_purge, to_journal, to_unapply); 574 } 575 576 void SyncManagerImpl::UpdateCredentials(const SyncCredentials& credentials) { 577 DCHECK(thread_checker_.CalledOnValidThread()); 578 DCHECK(initialized_); 579 DCHECK(!credentials.email.empty()); 580 DCHECK(!credentials.sync_token.empty()); 581 582 observing_network_connectivity_changes_ = true; 583 if (!connection_manager_->SetAuthToken(credentials.sync_token)) 584 return; // Auth token is known to be invalid, so exit early. 585 586 scheduler_->OnCredentialsUpdated(); 587 588 // TODO(zea): pass the credential age to the debug info event listener. 589 } 590 591 void SyncManagerImpl::AddObserver(SyncManager::Observer* observer) { 592 DCHECK(thread_checker_.CalledOnValidThread()); 593 observers_.AddObserver(observer); 594 } 595 596 void SyncManagerImpl::RemoveObserver(SyncManager::Observer* observer) { 597 DCHECK(thread_checker_.CalledOnValidThread()); 598 observers_.RemoveObserver(observer); 599 } 600 601 void SyncManagerImpl::ShutdownOnSyncThread() { 602 DCHECK(thread_checker_.CalledOnValidThread()); 603 604 // Prevent any in-flight method calls from running. Also 605 // invalidates |weak_handle_this_| and |change_observer_|. 606 weak_ptr_factory_.InvalidateWeakPtrs(); 607 js_mutation_event_observer_.InvalidateWeakPtrs(); 608 609 scheduler_.reset(); 610 session_context_.reset(); 611 model_type_registry_.reset(); 612 613 if (sync_encryption_handler_) { 614 sync_encryption_handler_->RemoveObserver(&debug_info_event_listener_); 615 sync_encryption_handler_->RemoveObserver(this); 616 } 617 618 SetJsEventHandler(WeakHandle<JsEventHandler>()); 619 RemoveObserver(&js_sync_manager_observer_); 620 621 RemoveObserver(&debug_info_event_listener_); 622 623 // |connection_manager_| may end up being NULL here in tests (in synchronous 624 // initialization mode). 625 // 626 // TODO(akalin): Fix this behavior. 627 if (connection_manager_) 628 connection_manager_->RemoveListener(this); 629 connection_manager_.reset(); 630 631 net::NetworkChangeNotifier::RemoveIPAddressObserver(this); 632 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this); 633 observing_network_connectivity_changes_ = false; 634 635 if (initialized_ && directory()) { 636 directory()->SaveChanges(); 637 } 638 639 share_.directory.reset(); 640 641 change_delegate_ = NULL; 642 643 initialized_ = false; 644 645 // We reset these here, since only now we know they will not be 646 // accessed from other threads (since we shut down everything). 647 change_observer_.Reset(); 648 weak_handle_this_.Reset(); 649 } 650 651 void SyncManagerImpl::OnIPAddressChanged() { 652 if (!observing_network_connectivity_changes_) { 653 DVLOG(1) << "IP address change dropped."; 654 return; 655 } 656 DVLOG(1) << "IP address change detected."; 657 OnNetworkConnectivityChangedImpl(); 658 } 659 660 void SyncManagerImpl::OnConnectionTypeChanged( 661 net::NetworkChangeNotifier::ConnectionType) { 662 if (!observing_network_connectivity_changes_) { 663 DVLOG(1) << "Connection type change dropped."; 664 return; 665 } 666 DVLOG(1) << "Connection type change detected."; 667 OnNetworkConnectivityChangedImpl(); 668 } 669 670 void SyncManagerImpl::OnNetworkConnectivityChangedImpl() { 671 DCHECK(thread_checker_.CalledOnValidThread()); 672 scheduler_->OnConnectionStatusChange(); 673 } 674 675 void SyncManagerImpl::OnServerConnectionEvent( 676 const ServerConnectionEvent& event) { 677 DCHECK(thread_checker_.CalledOnValidThread()); 678 if (event.connection_code == 679 HttpResponse::SERVER_CONNECTION_OK) { 680 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 681 OnConnectionStatusChange(CONNECTION_OK)); 682 } 683 684 if (event.connection_code == HttpResponse::SYNC_AUTH_ERROR) { 685 observing_network_connectivity_changes_ = false; 686 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 687 OnConnectionStatusChange(CONNECTION_AUTH_ERROR)); 688 } 689 690 if (event.connection_code == HttpResponse::SYNC_SERVER_ERROR) { 691 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 692 OnConnectionStatusChange(CONNECTION_SERVER_ERROR)); 693 } 694 } 695 696 void SyncManagerImpl::HandleTransactionCompleteChangeEvent( 697 ModelTypeSet models_with_changes) { 698 // This notification happens immediately after the transaction mutex is 699 // released. This allows work to be performed without blocking other threads 700 // from acquiring a transaction. 701 if (!change_delegate_) 702 return; 703 704 // Call commit. 705 for (ModelTypeSet::Iterator it = models_with_changes.First(); 706 it.Good(); it.Inc()) { 707 change_delegate_->OnChangesComplete(it.Get()); 708 change_observer_.Call( 709 FROM_HERE, 710 &SyncManager::ChangeObserver::OnChangesComplete, 711 it.Get()); 712 } 713 } 714 715 ModelTypeSet 716 SyncManagerImpl::HandleTransactionEndingChangeEvent( 717 const ImmutableWriteTransactionInfo& write_transaction_info, 718 syncable::BaseTransaction* trans) { 719 // This notification happens immediately before a syncable WriteTransaction 720 // falls out of scope. It happens while the channel mutex is still held, 721 // and while the transaction mutex is held, so it cannot be re-entrant. 722 if (!change_delegate_ || change_records_.empty()) 723 return ModelTypeSet(); 724 725 // This will continue the WriteTransaction using a read only wrapper. 726 // This is the last chance for read to occur in the WriteTransaction 727 // that's closing. This special ReadTransaction will not close the 728 // underlying transaction. 729 ReadTransaction read_trans(GetUserShare(), trans); 730 731 ModelTypeSet models_with_changes; 732 for (ChangeRecordMap::const_iterator it = change_records_.begin(); 733 it != change_records_.end(); ++it) { 734 DCHECK(!it->second.Get().empty()); 735 ModelType type = ModelTypeFromInt(it->first); 736 change_delegate_-> 737 OnChangesApplied(type, trans->directory()->GetTransactionVersion(type), 738 &read_trans, it->second); 739 change_observer_.Call(FROM_HERE, 740 &SyncManager::ChangeObserver::OnChangesApplied, 741 type, write_transaction_info.Get().id, it->second); 742 models_with_changes.Put(type); 743 } 744 change_records_.clear(); 745 return models_with_changes; 746 } 747 748 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi( 749 const ImmutableWriteTransactionInfo& write_transaction_info, 750 syncable::BaseTransaction* trans, 751 std::vector<int64>* entries_changed) { 752 // We have been notified about a user action changing a sync model. 753 LOG_IF(WARNING, !change_records_.empty()) << 754 "CALCULATE_CHANGES called with unapplied old changes."; 755 756 // The mutated model type, or UNSPECIFIED if nothing was mutated. 757 ModelTypeSet mutated_model_types; 758 759 const syncable::ImmutableEntryKernelMutationMap& mutations = 760 write_transaction_info.Get().mutations; 761 for (syncable::EntryKernelMutationMap::const_iterator it = 762 mutations.Get().begin(); it != mutations.Get().end(); ++it) { 763 if (!it->second.mutated.ref(syncable::IS_UNSYNCED)) { 764 continue; 765 } 766 767 ModelType model_type = 768 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS)); 769 if (model_type < FIRST_REAL_MODEL_TYPE) { 770 NOTREACHED() << "Permanent or underspecified item changed via syncapi."; 771 continue; 772 } 773 774 // Found real mutation. 775 if (model_type != UNSPECIFIED) { 776 mutated_model_types.Put(model_type); 777 entries_changed->push_back(it->second.mutated.ref(syncable::META_HANDLE)); 778 } 779 } 780 781 // Nudge if necessary. 782 if (!mutated_model_types.Empty()) { 783 if (weak_handle_this_.IsInitialized()) { 784 weak_handle_this_.Call(FROM_HERE, 785 &SyncManagerImpl::RequestNudgeForDataTypes, 786 FROM_HERE, 787 mutated_model_types); 788 } else { 789 NOTREACHED(); 790 } 791 } 792 } 793 794 void SyncManagerImpl::SetExtraChangeRecordData(int64 id, 795 ModelType type, ChangeReorderBuffer* buffer, 796 Cryptographer* cryptographer, const syncable::EntryKernel& original, 797 bool existed_before, bool exists_now) { 798 // If this is a deletion and the datatype was encrypted, we need to decrypt it 799 // and attach it to the buffer. 800 if (!exists_now && existed_before) { 801 sync_pb::EntitySpecifics original_specifics(original.ref(SPECIFICS)); 802 if (type == PASSWORDS) { 803 // Passwords must use their own legacy ExtraPasswordChangeRecordData. 804 scoped_ptr<sync_pb::PasswordSpecificsData> data( 805 DecryptPasswordSpecifics(original_specifics, cryptographer)); 806 if (!data) { 807 NOTREACHED(); 808 return; 809 } 810 buffer->SetExtraDataForId(id, new ExtraPasswordChangeRecordData(*data)); 811 } else if (original_specifics.has_encrypted()) { 812 // All other datatypes can just create a new unencrypted specifics and 813 // attach it. 814 const sync_pb::EncryptedData& encrypted = original_specifics.encrypted(); 815 if (!cryptographer->Decrypt(encrypted, &original_specifics)) { 816 NOTREACHED(); 817 return; 818 } 819 } 820 buffer->SetSpecificsForId(id, original_specifics); 821 } 822 } 823 824 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer( 825 const ImmutableWriteTransactionInfo& write_transaction_info, 826 syncable::BaseTransaction* trans, 827 std::vector<int64>* entries_changed) { 828 // We only expect one notification per sync step, so change_buffers_ should 829 // contain no pending entries. 830 LOG_IF(WARNING, !change_records_.empty()) << 831 "CALCULATE_CHANGES called with unapplied old changes."; 832 833 ChangeReorderBuffer change_buffers[MODEL_TYPE_COUNT]; 834 835 Cryptographer* crypto = directory()->GetCryptographer(trans); 836 const syncable::ImmutableEntryKernelMutationMap& mutations = 837 write_transaction_info.Get().mutations; 838 for (syncable::EntryKernelMutationMap::const_iterator it = 839 mutations.Get().begin(); it != mutations.Get().end(); ++it) { 840 bool existed_before = !it->second.original.ref(syncable::IS_DEL); 841 bool exists_now = !it->second.mutated.ref(syncable::IS_DEL); 842 843 // Omit items that aren't associated with a model. 844 ModelType type = 845 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS)); 846 if (type < FIRST_REAL_MODEL_TYPE) 847 continue; 848 849 int64 handle = it->first; 850 if (exists_now && !existed_before) 851 change_buffers[type].PushAddedItem(handle); 852 else if (!exists_now && existed_before) 853 change_buffers[type].PushDeletedItem(handle); 854 else if (exists_now && existed_before && 855 VisiblePropertiesDiffer(it->second, crypto)) { 856 change_buffers[type].PushUpdatedItem(handle); 857 } 858 859 SetExtraChangeRecordData(handle, type, &change_buffers[type], crypto, 860 it->second.original, existed_before, exists_now); 861 } 862 863 ReadTransaction read_trans(GetUserShare(), trans); 864 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) { 865 if (!change_buffers[i].IsEmpty()) { 866 if (change_buffers[i].GetAllChangesInTreeOrder(&read_trans, 867 &(change_records_[i]))) { 868 for (size_t j = 0; j < change_records_[i].Get().size(); ++j) 869 entries_changed->push_back((change_records_[i].Get())[j].id); 870 } 871 if (change_records_[i].Get().empty()) 872 change_records_.erase(i); 873 } 874 } 875 } 876 877 TimeDelta SyncManagerImpl::GetNudgeDelayTimeDelta( 878 const ModelType& model_type) { 879 return NudgeStrategy::GetNudgeDelayTimeDelta(model_type, this); 880 } 881 882 void SyncManagerImpl::RequestNudgeForDataTypes( 883 const tracked_objects::Location& nudge_location, 884 ModelTypeSet types) { 885 debug_info_event_listener_.OnNudgeFromDatatype(types.First().Get()); 886 887 // TODO(lipalani) : Calculate the nudge delay based on all types. 888 base::TimeDelta nudge_delay = NudgeStrategy::GetNudgeDelayTimeDelta( 889 types.First().Get(), 890 this); 891 scheduler_->ScheduleLocalNudge(nudge_delay, 892 types, 893 nudge_location); 894 } 895 896 void SyncManagerImpl::OnSyncCycleEvent(const SyncCycleEvent& event) { 897 DCHECK(thread_checker_.CalledOnValidThread()); 898 // Only send an event if this is due to a cycle ending and this cycle 899 // concludes a canonical "sync" process; that is, based on what is known 900 // locally we are "all happy" and up-to-date. There may be new changes on 901 // the server, but we'll get them on a subsequent sync. 902 // 903 // Notifications are sent at the end of every sync cycle, regardless of 904 // whether we should sync again. 905 if (event.what_happened == SyncCycleEvent::SYNC_CYCLE_ENDED) { 906 if (!initialized_) { 907 DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not " 908 << "initialized"; 909 return; 910 } 911 912 DVLOG(1) << "Sending OnSyncCycleCompleted"; 913 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 914 OnSyncCycleCompleted(event.snapshot)); 915 } 916 } 917 918 void SyncManagerImpl::OnActionableError(const SyncProtocolError& error) { 919 FOR_EACH_OBSERVER( 920 SyncManager::Observer, observers_, 921 OnActionableError(error)); 922 } 923 924 void SyncManagerImpl::OnRetryTimeChanged(base::Time) {} 925 926 void SyncManagerImpl::OnThrottledTypesChanged(ModelTypeSet) {} 927 928 void SyncManagerImpl::OnMigrationRequested(ModelTypeSet types) { 929 FOR_EACH_OBSERVER( 930 SyncManager::Observer, observers_, 931 OnMigrationRequested(types)); 932 } 933 934 void SyncManagerImpl::OnProtocolEvent(const ProtocolEvent& event) { 935 protocol_event_buffer_.RecordProtocolEvent(event); 936 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 937 OnProtocolEvent(event)); 938 } 939 940 void SyncManagerImpl::SetJsEventHandler( 941 const WeakHandle<JsEventHandler>& event_handler) { 942 js_sync_manager_observer_.SetJsEventHandler(event_handler); 943 js_mutation_event_observer_.SetJsEventHandler(event_handler); 944 js_sync_encryption_handler_observer_.SetJsEventHandler(event_handler); 945 } 946 947 scoped_ptr<base::ListValue> SyncManagerImpl::GetAllNodesForType( 948 syncer::ModelType type) { 949 DirectoryTypeDebugInfoEmitterMap* emitter_map = 950 model_type_registry_->directory_type_debug_info_emitter_map(); 951 DirectoryTypeDebugInfoEmitterMap::iterator it = emitter_map->find(type); 952 953 if (it == emitter_map->end()) { 954 // This can happen in some cases. The UI thread makes requests of us 955 // when it doesn't really know which types are enabled or disabled. 956 DLOG(WARNING) << "Asked to return debug info for invalid type " 957 << ModelTypeToString(type); 958 return scoped_ptr<base::ListValue>(); 959 } 960 961 return it->second->GetAllNodes(); 962 } 963 964 void SyncManagerImpl::OnInvalidatorStateChange(InvalidatorState state) { 965 DCHECK(thread_checker_.CalledOnValidThread()); 966 967 const std::string& state_str = InvalidatorStateToString(state); 968 invalidator_state_ = state; 969 DVLOG(1) << "Invalidator state changed to: " << state_str; 970 const bool notifications_enabled = 971 (invalidator_state_ == INVALIDATIONS_ENABLED); 972 allstatus_.SetNotificationsEnabled(notifications_enabled); 973 scheduler_->SetNotificationsEnabled(notifications_enabled); 974 } 975 976 void SyncManagerImpl::OnIncomingInvalidation( 977 const ObjectIdInvalidationMap& invalidation_map) { 978 DCHECK(thread_checker_.CalledOnValidThread()); 979 980 // We should never receive IDs from non-sync objects. 981 ObjectIdSet ids = invalidation_map.GetObjectIds(); 982 for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) { 983 ModelType type; 984 if (!ObjectIdToRealModelType(*it, &type)) { 985 DLOG(WARNING) << "Notification has invalid id: " << ObjectIdToString(*it); 986 } 987 } 988 989 if (invalidation_map.Empty()) { 990 LOG(WARNING) << "Sync received invalidation without any type information."; 991 } else { 992 scheduler_->ScheduleInvalidationNudge( 993 TimeDelta::FromMilliseconds(kSyncSchedulerDelayMsec), 994 invalidation_map, FROM_HERE); 995 debug_info_event_listener_.OnIncomingNotification(invalidation_map); 996 } 997 } 998 999 std::string SyncManagerImpl::GetOwnerName() const { return "SyncManagerImpl"; } 1000 1001 void SyncManagerImpl::RefreshTypes(ModelTypeSet types) { 1002 DCHECK(thread_checker_.CalledOnValidThread()); 1003 if (types.Empty()) { 1004 LOG(WARNING) << "Sync received refresh request with no types specified."; 1005 } else { 1006 scheduler_->ScheduleLocalRefreshRequest( 1007 TimeDelta::FromMilliseconds(kSyncRefreshDelayMsec), 1008 types, FROM_HERE); 1009 } 1010 } 1011 1012 SyncStatus SyncManagerImpl::GetDetailedStatus() const { 1013 return allstatus_.status(); 1014 } 1015 1016 void SyncManagerImpl::SaveChanges() { 1017 directory()->SaveChanges(); 1018 } 1019 1020 UserShare* SyncManagerImpl::GetUserShare() { 1021 DCHECK(initialized_); 1022 return &share_; 1023 } 1024 1025 syncer::SyncCoreProxy* SyncManagerImpl::GetSyncCoreProxy() { 1026 DCHECK(initialized_); 1027 return sync_core_proxy_.get(); 1028 } 1029 1030 const std::string SyncManagerImpl::cache_guid() { 1031 DCHECK(initialized_); 1032 return directory()->cache_guid(); 1033 } 1034 1035 bool SyncManagerImpl::ReceivedExperiment(Experiments* experiments) { 1036 ReadTransaction trans(FROM_HERE, GetUserShare()); 1037 ReadNode nigori_node(&trans); 1038 if (nigori_node.InitTypeRoot(NIGORI) != BaseNode::INIT_OK) { 1039 DVLOG(1) << "Couldn't find Nigori node."; 1040 return false; 1041 } 1042 bool found_experiment = false; 1043 1044 ReadNode favicon_sync_node(&trans); 1045 if (favicon_sync_node.InitByClientTagLookup( 1046 syncer::EXPERIMENTS, 1047 syncer::kFaviconSyncTag) == BaseNode::INIT_OK) { 1048 experiments->favicon_sync_limit = 1049 favicon_sync_node.GetExperimentsSpecifics().favicon_sync(). 1050 favicon_sync_limit(); 1051 found_experiment = true; 1052 } 1053 1054 ReadNode pre_commit_update_avoidance_node(&trans); 1055 if (pre_commit_update_avoidance_node.InitByClientTagLookup( 1056 syncer::EXPERIMENTS, 1057 syncer::kPreCommitUpdateAvoidanceTag) == BaseNode::INIT_OK) { 1058 session_context_->set_server_enabled_pre_commit_update_avoidance( 1059 pre_commit_update_avoidance_node.GetExperimentsSpecifics(). 1060 pre_commit_update_avoidance().enabled()); 1061 // We don't bother setting found_experiment. The frontend doesn't need to 1062 // know about this. 1063 } 1064 1065 ReadNode gcm_channel_node(&trans); 1066 if (gcm_channel_node.InitByClientTagLookup( 1067 syncer::EXPERIMENTS, 1068 syncer::kGCMChannelTag) == BaseNode::INIT_OK && 1069 gcm_channel_node.GetExperimentsSpecifics().gcm_channel().has_enabled()) { 1070 experiments->gcm_channel_state = 1071 (gcm_channel_node.GetExperimentsSpecifics().gcm_channel().enabled() ? 1072 syncer::Experiments::ENABLED : syncer::Experiments::SUPPRESSED); 1073 found_experiment = true; 1074 } 1075 1076 ReadNode enhanced_bookmarks_node(&trans); 1077 if (enhanced_bookmarks_node.InitByClientTagLookup( 1078 syncer::EXPERIMENTS, syncer::kEnhancedBookmarksTag) == 1079 BaseNode::INIT_OK && 1080 enhanced_bookmarks_node.GetExperimentsSpecifics() 1081 .has_enhanced_bookmarks()) { 1082 const sync_pb::EnhancedBookmarksFlags& enhanced_bookmarks = 1083 enhanced_bookmarks_node.GetExperimentsSpecifics().enhanced_bookmarks(); 1084 if (enhanced_bookmarks.has_enabled()) 1085 experiments->enhanced_bookmarks_enabled = enhanced_bookmarks.enabled(); 1086 if (enhanced_bookmarks.has_extension_id()) { 1087 experiments->enhanced_bookmarks_ext_id = 1088 enhanced_bookmarks.extension_id(); 1089 } 1090 found_experiment = true; 1091 } 1092 1093 ReadNode gcm_invalidations_node(&trans); 1094 if (gcm_invalidations_node.InitByClientTagLookup( 1095 syncer::EXPERIMENTS, syncer::kGCMInvalidationsTag) == 1096 BaseNode::INIT_OK) { 1097 const sync_pb::GcmInvalidationsFlags& gcm_invalidations = 1098 gcm_invalidations_node.GetExperimentsSpecifics().gcm_invalidations(); 1099 if (gcm_invalidations.has_enabled()) { 1100 experiments->gcm_invalidations_enabled = gcm_invalidations.enabled(); 1101 found_experiment = true; 1102 } 1103 } 1104 1105 return found_experiment; 1106 } 1107 1108 bool SyncManagerImpl::HasUnsyncedItems() { 1109 ReadTransaction trans(FROM_HERE, GetUserShare()); 1110 return (trans.GetWrappedTrans()->directory()->unsynced_entity_count() != 0); 1111 } 1112 1113 SyncEncryptionHandler* SyncManagerImpl::GetEncryptionHandler() { 1114 return sync_encryption_handler_.get(); 1115 } 1116 1117 ScopedVector<syncer::ProtocolEvent> 1118 SyncManagerImpl::GetBufferedProtocolEvents() { 1119 return protocol_event_buffer_.GetBufferedProtocolEvents(); 1120 } 1121 1122 void SyncManagerImpl::RegisterDirectoryTypeDebugInfoObserver( 1123 syncer::TypeDebugInfoObserver* observer) { 1124 model_type_registry_->RegisterDirectoryTypeDebugInfoObserver(observer); 1125 } 1126 1127 void SyncManagerImpl::UnregisterDirectoryTypeDebugInfoObserver( 1128 syncer::TypeDebugInfoObserver* observer) { 1129 model_type_registry_->UnregisterDirectoryTypeDebugInfoObserver(observer); 1130 } 1131 1132 bool SyncManagerImpl::HasDirectoryTypeDebugInfoObserver( 1133 syncer::TypeDebugInfoObserver* observer) { 1134 return model_type_registry_->HasDirectoryTypeDebugInfoObserver(observer); 1135 } 1136 1137 void SyncManagerImpl::RequestEmitDebugInfo() { 1138 model_type_registry_->RequestEmitDebugInfo(); 1139 } 1140 1141 // static. 1142 int SyncManagerImpl::GetDefaultNudgeDelay() { 1143 return kDefaultNudgeDelayMilliseconds; 1144 } 1145 1146 // static. 1147 int SyncManagerImpl::GetPreferencesNudgeDelay() { 1148 return kPreferencesNudgeDelayMilliseconds; 1149 } 1150 1151 } // namespace syncer 1152