1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "sync/internal_api/sync_manager_impl.h" 6 7 #include <string> 8 9 #include "base/base64.h" 10 #include "base/bind.h" 11 #include "base/callback.h" 12 #include "base/compiler_specific.h" 13 #include "base/json/json_writer.h" 14 #include "base/memory/ref_counted.h" 15 #include "base/metrics/histogram.h" 16 #include "base/observer_list.h" 17 #include "base/strings/string_number_conversions.h" 18 #include "base/thread_task_runner_handle.h" 19 #include "base/values.h" 20 #include "sync/engine/sync_scheduler.h" 21 #include "sync/engine/syncer_types.h" 22 #include "sync/internal_api/change_reorder_buffer.h" 23 #include "sync/internal_api/public/base/cancelation_signal.h" 24 #include "sync/internal_api/public/base/invalidation_interface.h" 25 #include "sync/internal_api/public/base/model_type.h" 26 #include "sync/internal_api/public/base_node.h" 27 #include "sync/internal_api/public/configure_reason.h" 28 #include "sync/internal_api/public/engine/polling_constants.h" 29 #include "sync/internal_api/public/http_post_provider_factory.h" 30 #include "sync/internal_api/public/internal_components_factory.h" 31 #include "sync/internal_api/public/read_node.h" 32 #include "sync/internal_api/public/read_transaction.h" 33 #include "sync/internal_api/public/sync_context.h" 34 #include "sync/internal_api/public/sync_context_proxy.h" 35 #include "sync/internal_api/public/user_share.h" 36 #include "sync/internal_api/public/util/experiments.h" 37 #include "sync/internal_api/public/write_node.h" 38 #include "sync/internal_api/public/write_transaction.h" 39 #include "sync/internal_api/sync_context_proxy_impl.h" 40 #include "sync/internal_api/syncapi_internal.h" 41 #include "sync/internal_api/syncapi_server_connection_manager.h" 42 #include "sync/protocol/proto_value_conversions.h" 43 #include "sync/protocol/sync.pb.h" 44 #include "sync/sessions/directory_type_debug_info_emitter.h" 45 #include "sync/syncable/directory.h" 46 #include "sync/syncable/entry.h" 47 #include "sync/syncable/in_memory_directory_backing_store.h" 48 #include "sync/syncable/on_disk_directory_backing_store.h" 49 50 using base::TimeDelta; 51 using sync_pb::GetUpdatesCallerInfo; 52 53 class GURL; 54 55 namespace syncer { 56 57 using sessions::SyncSessionContext; 58 using syncable::ImmutableWriteTransactionInfo; 59 using syncable::SPECIFICS; 60 using syncable::UNIQUE_POSITION; 61 62 namespace { 63 64 GetUpdatesCallerInfo::GetUpdatesSource GetSourceFromReason( 65 ConfigureReason reason) { 66 switch (reason) { 67 case CONFIGURE_REASON_RECONFIGURATION: 68 return GetUpdatesCallerInfo::RECONFIGURATION; 69 case CONFIGURE_REASON_MIGRATION: 70 return GetUpdatesCallerInfo::MIGRATION; 71 case CONFIGURE_REASON_NEW_CLIENT: 72 return GetUpdatesCallerInfo::NEW_CLIENT; 73 case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE: 74 case CONFIGURE_REASON_CRYPTO: 75 return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE; 76 case CONFIGURE_REASON_PROGRAMMATIC: 77 return GetUpdatesCallerInfo::PROGRAMMATIC; 78 default: 79 NOTREACHED(); 80 } 81 return GetUpdatesCallerInfo::UNKNOWN; 82 } 83 84 } // namespace 85 86 SyncManagerImpl::SyncManagerImpl(const std::string& name) 87 : name_(name), 88 change_delegate_(NULL), 89 initialized_(false), 90 observing_network_connectivity_changes_(false), 91 report_unrecoverable_error_function_(NULL), 92 weak_ptr_factory_(this) { 93 // Pre-fill |notification_info_map_|. 94 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) { 95 notification_info_map_.insert( 96 std::make_pair(ModelTypeFromInt(i), NotificationInfo())); 97 } 98 } 99 100 SyncManagerImpl::~SyncManagerImpl() { 101 DCHECK(thread_checker_.CalledOnValidThread()); 102 CHECK(!initialized_); 103 } 104 105 SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {} 106 SyncManagerImpl::NotificationInfo::~NotificationInfo() {} 107 108 base::DictionaryValue* SyncManagerImpl::NotificationInfo::ToValue() const { 109 base::DictionaryValue* value = new base::DictionaryValue(); 110 value->SetInteger("totalCount", total_count); 111 value->SetString("payload", payload); 112 return value; 113 } 114 115 bool SyncManagerImpl::VisiblePositionsDiffer( 116 const syncable::EntryKernelMutation& mutation) const { 117 const syncable::EntryKernel& a = mutation.original; 118 const syncable::EntryKernel& b = mutation.mutated; 119 if (!b.ShouldMaintainPosition()) 120 return false; 121 if (!a.ref(UNIQUE_POSITION).Equals(b.ref(UNIQUE_POSITION))) 122 return true; 123 if (a.ref(syncable::PARENT_ID) != b.ref(syncable::PARENT_ID)) 124 return true; 125 return false; 126 } 127 128 bool SyncManagerImpl::VisiblePropertiesDiffer( 129 const syncable::EntryKernelMutation& mutation, 130 Cryptographer* cryptographer) const { 131 const syncable::EntryKernel& a = mutation.original; 132 const syncable::EntryKernel& b = mutation.mutated; 133 const sync_pb::EntitySpecifics& a_specifics = a.ref(SPECIFICS); 134 const sync_pb::EntitySpecifics& b_specifics = b.ref(SPECIFICS); 135 DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics), 136 GetModelTypeFromSpecifics(b_specifics)); 137 ModelType model_type = GetModelTypeFromSpecifics(b_specifics); 138 // Suppress updates to items that aren't tracked by any browser model. 139 if (model_type < FIRST_REAL_MODEL_TYPE || 140 !a.ref(syncable::UNIQUE_SERVER_TAG).empty()) { 141 return false; 142 } 143 if (a.ref(syncable::IS_DIR) != b.ref(syncable::IS_DIR)) 144 return true; 145 if (!AreSpecificsEqual(cryptographer, 146 a.ref(syncable::SPECIFICS), 147 b.ref(syncable::SPECIFICS))) { 148 return true; 149 } 150 if (!AreAttachmentMetadataEqual(a.ref(syncable::ATTACHMENT_METADATA), 151 b.ref(syncable::ATTACHMENT_METADATA))) { 152 return true; 153 } 154 // We only care if the name has changed if neither specifics is encrypted 155 // (encrypted nodes blow away the NON_UNIQUE_NAME). 156 if (!a_specifics.has_encrypted() && !b_specifics.has_encrypted() && 157 a.ref(syncable::NON_UNIQUE_NAME) != b.ref(syncable::NON_UNIQUE_NAME)) 158 return true; 159 if (VisiblePositionsDiffer(mutation)) 160 return true; 161 return false; 162 } 163 164 ModelTypeSet SyncManagerImpl::InitialSyncEndedTypes() { 165 return directory()->InitialSyncEndedTypes(); 166 } 167 168 ModelTypeSet SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken( 169 ModelTypeSet types) { 170 ModelTypeSet result; 171 for (ModelTypeSet::Iterator i = types.First(); i.Good(); i.Inc()) { 172 sync_pb::DataTypeProgressMarker marker; 173 directory()->GetDownloadProgress(i.Get(), &marker); 174 175 if (marker.token().empty()) 176 result.Put(i.Get()); 177 } 178 return result; 179 } 180 181 void SyncManagerImpl::ConfigureSyncer( 182 ConfigureReason reason, 183 ModelTypeSet to_download, 184 ModelTypeSet to_purge, 185 ModelTypeSet to_journal, 186 ModelTypeSet to_unapply, 187 const ModelSafeRoutingInfo& new_routing_info, 188 const base::Closure& ready_task, 189 const base::Closure& retry_task) { 190 DCHECK(thread_checker_.CalledOnValidThread()); 191 DCHECK(!ready_task.is_null()); 192 DCHECK(!retry_task.is_null()); 193 DCHECK(initialized_); 194 195 DVLOG(1) << "Configuring -" 196 << "\n\t" << "current types: " 197 << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info)) 198 << "\n\t" << "types to download: " 199 << ModelTypeSetToString(to_download) 200 << "\n\t" << "types to purge: " 201 << ModelTypeSetToString(to_purge) 202 << "\n\t" << "types to journal: " 203 << ModelTypeSetToString(to_journal) 204 << "\n\t" << "types to unapply: " 205 << ModelTypeSetToString(to_unapply); 206 if (!PurgeDisabledTypes(to_purge, 207 to_journal, 208 to_unapply)) { 209 // We failed to cleanup the types. Invoke the ready task without actually 210 // configuring any types. The caller should detect this as a configuration 211 // failure and act appropriately. 212 ready_task.Run(); 213 return; 214 } 215 216 ConfigurationParams params(GetSourceFromReason(reason), 217 to_download, 218 new_routing_info, 219 ready_task, 220 retry_task); 221 222 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE); 223 scheduler_->ScheduleConfiguration(params); 224 } 225 226 void SyncManagerImpl::Init(InitArgs* args) { 227 CHECK(!initialized_); 228 DCHECK(thread_checker_.CalledOnValidThread()); 229 DCHECK(args->post_factory.get()); 230 DCHECK(!args->credentials.email.empty()); 231 DCHECK(!args->credentials.sync_token.empty()); 232 DCHECK(!args->credentials.scope_set.empty()); 233 DCHECK(args->cancelation_signal); 234 DVLOG(1) << "SyncManager starting Init..."; 235 236 weak_handle_this_ = MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()); 237 238 change_delegate_ = args->change_delegate; 239 240 AddObserver(&js_sync_manager_observer_); 241 SetJsEventHandler(args->event_handler); 242 243 AddObserver(&debug_info_event_listener_); 244 245 database_path_ = args->database_location.Append( 246 syncable::Directory::kSyncDatabaseFilename); 247 unrecoverable_error_handler_ = args->unrecoverable_error_handler.Pass(); 248 report_unrecoverable_error_function_ = 249 args->report_unrecoverable_error_function; 250 251 allstatus_.SetHasKeystoreKey( 252 !args->restored_keystore_key_for_bootstrapping.empty()); 253 sync_encryption_handler_.reset(new SyncEncryptionHandlerImpl( 254 &share_, 255 args->encryptor, 256 args->restored_key_for_bootstrapping, 257 args->restored_keystore_key_for_bootstrapping)); 258 sync_encryption_handler_->AddObserver(this); 259 sync_encryption_handler_->AddObserver(&debug_info_event_listener_); 260 sync_encryption_handler_->AddObserver(&js_sync_encryption_handler_observer_); 261 262 base::FilePath absolute_db_path = database_path_; 263 DCHECK(absolute_db_path.IsAbsolute()); 264 265 scoped_ptr<syncable::DirectoryBackingStore> backing_store = 266 args->internal_components_factory->BuildDirectoryBackingStore( 267 InternalComponentsFactory::STORAGE_ON_DISK, 268 args->credentials.email, absolute_db_path).Pass(); 269 270 DCHECK(backing_store.get()); 271 share_.directory.reset( 272 new syncable::Directory( 273 backing_store.release(), 274 unrecoverable_error_handler_.get(), 275 report_unrecoverable_error_function_, 276 sync_encryption_handler_.get(), 277 sync_encryption_handler_->GetCryptographerUnsafe())); 278 share_.sync_credentials = args->credentials; 279 280 // UserShare is accessible to a lot of code that doesn't need access to the 281 // sync token so clear sync_token from the UserShare. 282 share_.sync_credentials.sync_token = ""; 283 284 const std::string& username = args->credentials.email; 285 DVLOG(1) << "Username: " << username; 286 if (!OpenDirectory(username)) { 287 NotifyInitializationFailure(); 288 LOG(ERROR) << "Sync manager initialization failed!"; 289 return; 290 } 291 292 connection_manager_.reset(new SyncAPIServerConnectionManager( 293 args->service_url.host() + args->service_url.path(), 294 args->service_url.EffectiveIntPort(), 295 args->service_url.SchemeIsSecure(), 296 args->post_factory.release(), 297 args->cancelation_signal)); 298 connection_manager_->set_client_id(directory()->cache_guid()); 299 connection_manager_->AddListener(this); 300 301 std::string sync_id = directory()->cache_guid(); 302 303 DVLOG(1) << "Setting sync client ID: " << sync_id; 304 allstatus_.SetSyncId(sync_id); 305 DVLOG(1) << "Setting invalidator client ID: " << args->invalidator_client_id; 306 allstatus_.SetInvalidatorClientId(args->invalidator_client_id); 307 308 model_type_registry_.reset( 309 new ModelTypeRegistry(args->workers, directory(), this)); 310 sync_encryption_handler_->AddObserver(model_type_registry_.get()); 311 312 // Bind the SyncContext WeakPtr to this thread. This helps us crash earlier 313 // if the pointer is misused in debug mode. 314 base::WeakPtr<SyncContext> weak_core = model_type_registry_->AsWeakPtr(); 315 weak_core.get(); 316 317 sync_context_proxy_.reset( 318 new SyncContextProxyImpl(base::ThreadTaskRunnerHandle::Get(), weak_core)); 319 320 // Build a SyncSessionContext and store the worker in it. 321 DVLOG(1) << "Sync is bringing up SyncSessionContext."; 322 std::vector<SyncEngineEventListener*> listeners; 323 listeners.push_back(&allstatus_); 324 listeners.push_back(this); 325 session_context_ = 326 args->internal_components_factory->BuildContext( 327 connection_manager_.get(), 328 directory(), 329 args->extensions_activity, 330 listeners, 331 &debug_info_event_listener_, 332 model_type_registry_.get(), 333 args->invalidator_client_id) 334 .Pass(); 335 session_context_->set_account_name(args->credentials.email); 336 scheduler_ = args->internal_components_factory->BuildScheduler( 337 name_, session_context_.get(), args->cancelation_signal).Pass(); 338 339 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE); 340 341 initialized_ = true; 342 343 net::NetworkChangeNotifier::AddIPAddressObserver(this); 344 net::NetworkChangeNotifier::AddConnectionTypeObserver(this); 345 observing_network_connectivity_changes_ = true; 346 347 UpdateCredentials(args->credentials); 348 349 NotifyInitializationSuccess(); 350 } 351 352 void SyncManagerImpl::NotifyInitializationSuccess() { 353 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 354 OnInitializationComplete( 355 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()), 356 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()), 357 true, InitialSyncEndedTypes())); 358 } 359 360 void SyncManagerImpl::NotifyInitializationFailure() { 361 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 362 OnInitializationComplete( 363 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()), 364 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()), 365 false, ModelTypeSet())); 366 } 367 368 void SyncManagerImpl::OnPassphraseRequired( 369 PassphraseRequiredReason reason, 370 const sync_pb::EncryptedData& pending_keys) { 371 // Does nothing. 372 } 373 374 void SyncManagerImpl::OnPassphraseAccepted() { 375 // Does nothing. 376 } 377 378 void SyncManagerImpl::OnBootstrapTokenUpdated( 379 const std::string& bootstrap_token, 380 BootstrapTokenType type) { 381 if (type == KEYSTORE_BOOTSTRAP_TOKEN) 382 allstatus_.SetHasKeystoreKey(true); 383 } 384 385 void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types, 386 bool encrypt_everything) { 387 allstatus_.SetEncryptedTypes(encrypted_types); 388 } 389 390 void SyncManagerImpl::OnEncryptionComplete() { 391 // Does nothing. 392 } 393 394 void SyncManagerImpl::OnCryptographerStateChanged( 395 Cryptographer* cryptographer) { 396 allstatus_.SetCryptographerReady(cryptographer->is_ready()); 397 allstatus_.SetCryptoHasPendingKeys(cryptographer->has_pending_keys()); 398 allstatus_.SetKeystoreMigrationTime( 399 sync_encryption_handler_->migration_time()); 400 } 401 402 void SyncManagerImpl::OnPassphraseTypeChanged( 403 PassphraseType type, 404 base::Time explicit_passphrase_time) { 405 allstatus_.SetPassphraseType(type); 406 allstatus_.SetKeystoreMigrationTime( 407 sync_encryption_handler_->migration_time()); 408 } 409 410 void SyncManagerImpl::StartSyncingNormally( 411 const ModelSafeRoutingInfo& routing_info) { 412 // Start the sync scheduler. 413 // TODO(sync): We always want the newest set of routes when we switch back 414 // to normal mode. Figure out how to enforce set_routing_info is always 415 // appropriately set and that it's only modified when switching to normal 416 // mode. 417 DCHECK(thread_checker_.CalledOnValidThread()); 418 session_context_->SetRoutingInfo(routing_info); 419 scheduler_->Start(SyncScheduler::NORMAL_MODE); 420 } 421 422 syncable::Directory* SyncManagerImpl::directory() { 423 return share_.directory.get(); 424 } 425 426 const SyncScheduler* SyncManagerImpl::scheduler() const { 427 return scheduler_.get(); 428 } 429 430 bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const { 431 return connection_manager_->HasInvalidAuthToken(); 432 } 433 434 bool SyncManagerImpl::OpenDirectory(const std::string& username) { 435 DCHECK(!initialized_) << "Should only happen once"; 436 437 // Set before Open(). 438 change_observer_ = MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr()); 439 WeakHandle<syncable::TransactionObserver> transaction_observer( 440 MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr())); 441 442 syncable::DirOpenResult open_result = syncable::NOT_INITIALIZED; 443 open_result = directory()->Open(username, this, transaction_observer); 444 if (open_result != syncable::OPENED) { 445 LOG(ERROR) << "Could not open share for:" << username; 446 return false; 447 } 448 449 // Unapplied datatypes (those that do not have initial sync ended set) get 450 // re-downloaded during any configuration. But, it's possible for a datatype 451 // to have a progress marker but not have initial sync ended yet, making 452 // it a candidate for migration. This is a problem, as the DataTypeManager 453 // does not support a migration while it's already in the middle of a 454 // configuration. As a result, any partially synced datatype can stall the 455 // DTM, waiting for the configuration to complete, which it never will due 456 // to the migration error. In addition, a partially synced nigori will 457 // trigger the migration logic before the backend is initialized, resulting 458 // in crashes. We therefore detect and purge any partially synced types as 459 // part of initialization. 460 if (!PurgePartiallySyncedTypes()) 461 return false; 462 463 return true; 464 } 465 466 bool SyncManagerImpl::PurgePartiallySyncedTypes() { 467 ModelTypeSet partially_synced_types = ModelTypeSet::All(); 468 partially_synced_types.RemoveAll(InitialSyncEndedTypes()); 469 partially_synced_types.RemoveAll(GetTypesWithEmptyProgressMarkerToken( 470 ModelTypeSet::All())); 471 472 DVLOG(1) << "Purging partially synced types " 473 << ModelTypeSetToString(partially_synced_types); 474 UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes", 475 partially_synced_types.Size()); 476 if (partially_synced_types.Empty()) 477 return true; 478 return directory()->PurgeEntriesWithTypeIn(partially_synced_types, 479 ModelTypeSet(), 480 ModelTypeSet()); 481 } 482 483 bool SyncManagerImpl::PurgeDisabledTypes( 484 ModelTypeSet to_purge, 485 ModelTypeSet to_journal, 486 ModelTypeSet to_unapply) { 487 if (to_purge.Empty()) 488 return true; 489 DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge); 490 DCHECK(to_purge.HasAll(to_journal)); 491 DCHECK(to_purge.HasAll(to_unapply)); 492 return directory()->PurgeEntriesWithTypeIn(to_purge, to_journal, to_unapply); 493 } 494 495 void SyncManagerImpl::UpdateCredentials(const SyncCredentials& credentials) { 496 DCHECK(thread_checker_.CalledOnValidThread()); 497 DCHECK(initialized_); 498 DCHECK(!credentials.email.empty()); 499 DCHECK(!credentials.sync_token.empty()); 500 DCHECK(!credentials.scope_set.empty()); 501 502 observing_network_connectivity_changes_ = true; 503 if (!connection_manager_->SetAuthToken(credentials.sync_token)) 504 return; // Auth token is known to be invalid, so exit early. 505 506 scheduler_->OnCredentialsUpdated(); 507 508 // TODO(zea): pass the credential age to the debug info event listener. 509 } 510 511 void SyncManagerImpl::AddObserver(SyncManager::Observer* observer) { 512 DCHECK(thread_checker_.CalledOnValidThread()); 513 observers_.AddObserver(observer); 514 } 515 516 void SyncManagerImpl::RemoveObserver(SyncManager::Observer* observer) { 517 DCHECK(thread_checker_.CalledOnValidThread()); 518 observers_.RemoveObserver(observer); 519 } 520 521 void SyncManagerImpl::ShutdownOnSyncThread(ShutdownReason reason) { 522 DCHECK(thread_checker_.CalledOnValidThread()); 523 524 // Prevent any in-flight method calls from running. Also 525 // invalidates |weak_handle_this_| and |change_observer_|. 526 weak_ptr_factory_.InvalidateWeakPtrs(); 527 js_mutation_event_observer_.InvalidateWeakPtrs(); 528 529 scheduler_.reset(); 530 session_context_.reset(); 531 532 if (model_type_registry_) 533 sync_encryption_handler_->RemoveObserver(model_type_registry_.get()); 534 535 model_type_registry_.reset(); 536 537 if (sync_encryption_handler_) { 538 sync_encryption_handler_->RemoveObserver(&debug_info_event_listener_); 539 sync_encryption_handler_->RemoveObserver(this); 540 } 541 542 SetJsEventHandler(WeakHandle<JsEventHandler>()); 543 RemoveObserver(&js_sync_manager_observer_); 544 545 RemoveObserver(&debug_info_event_listener_); 546 547 // |connection_manager_| may end up being NULL here in tests (in synchronous 548 // initialization mode). 549 // 550 // TODO(akalin): Fix this behavior. 551 if (connection_manager_) 552 connection_manager_->RemoveListener(this); 553 connection_manager_.reset(); 554 555 net::NetworkChangeNotifier::RemoveIPAddressObserver(this); 556 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this); 557 observing_network_connectivity_changes_ = false; 558 559 if (initialized_ && directory()) { 560 directory()->SaveChanges(); 561 } 562 563 share_.directory.reset(); 564 565 change_delegate_ = NULL; 566 567 initialized_ = false; 568 569 // We reset these here, since only now we know they will not be 570 // accessed from other threads (since we shut down everything). 571 change_observer_.Reset(); 572 weak_handle_this_.Reset(); 573 } 574 575 void SyncManagerImpl::OnIPAddressChanged() { 576 if (!observing_network_connectivity_changes_) { 577 DVLOG(1) << "IP address change dropped."; 578 return; 579 } 580 DVLOG(1) << "IP address change detected."; 581 OnNetworkConnectivityChangedImpl(); 582 } 583 584 void SyncManagerImpl::OnConnectionTypeChanged( 585 net::NetworkChangeNotifier::ConnectionType) { 586 if (!observing_network_connectivity_changes_) { 587 DVLOG(1) << "Connection type change dropped."; 588 return; 589 } 590 DVLOG(1) << "Connection type change detected."; 591 OnNetworkConnectivityChangedImpl(); 592 } 593 594 void SyncManagerImpl::OnNetworkConnectivityChangedImpl() { 595 DCHECK(thread_checker_.CalledOnValidThread()); 596 scheduler_->OnConnectionStatusChange(); 597 } 598 599 void SyncManagerImpl::OnServerConnectionEvent( 600 const ServerConnectionEvent& event) { 601 DCHECK(thread_checker_.CalledOnValidThread()); 602 if (event.connection_code == 603 HttpResponse::SERVER_CONNECTION_OK) { 604 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 605 OnConnectionStatusChange(CONNECTION_OK)); 606 } 607 608 if (event.connection_code == HttpResponse::SYNC_AUTH_ERROR) { 609 observing_network_connectivity_changes_ = false; 610 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 611 OnConnectionStatusChange(CONNECTION_AUTH_ERROR)); 612 } 613 614 if (event.connection_code == HttpResponse::SYNC_SERVER_ERROR) { 615 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 616 OnConnectionStatusChange(CONNECTION_SERVER_ERROR)); 617 } 618 } 619 620 void SyncManagerImpl::HandleTransactionCompleteChangeEvent( 621 ModelTypeSet models_with_changes) { 622 // This notification happens immediately after the transaction mutex is 623 // released. This allows work to be performed without blocking other threads 624 // from acquiring a transaction. 625 if (!change_delegate_) 626 return; 627 628 // Call commit. 629 for (ModelTypeSet::Iterator it = models_with_changes.First(); 630 it.Good(); it.Inc()) { 631 change_delegate_->OnChangesComplete(it.Get()); 632 change_observer_.Call( 633 FROM_HERE, 634 &SyncManager::ChangeObserver::OnChangesComplete, 635 it.Get()); 636 } 637 } 638 639 ModelTypeSet 640 SyncManagerImpl::HandleTransactionEndingChangeEvent( 641 const ImmutableWriteTransactionInfo& write_transaction_info, 642 syncable::BaseTransaction* trans) { 643 // This notification happens immediately before a syncable WriteTransaction 644 // falls out of scope. It happens while the channel mutex is still held, 645 // and while the transaction mutex is held, so it cannot be re-entrant. 646 if (!change_delegate_ || change_records_.empty()) 647 return ModelTypeSet(); 648 649 // This will continue the WriteTransaction using a read only wrapper. 650 // This is the last chance for read to occur in the WriteTransaction 651 // that's closing. This special ReadTransaction will not close the 652 // underlying transaction. 653 ReadTransaction read_trans(GetUserShare(), trans); 654 655 ModelTypeSet models_with_changes; 656 for (ChangeRecordMap::const_iterator it = change_records_.begin(); 657 it != change_records_.end(); ++it) { 658 DCHECK(!it->second.Get().empty()); 659 ModelType type = ModelTypeFromInt(it->first); 660 change_delegate_-> 661 OnChangesApplied(type, trans->directory()->GetTransactionVersion(type), 662 &read_trans, it->second); 663 change_observer_.Call(FROM_HERE, 664 &SyncManager::ChangeObserver::OnChangesApplied, 665 type, write_transaction_info.Get().id, it->second); 666 models_with_changes.Put(type); 667 } 668 change_records_.clear(); 669 return models_with_changes; 670 } 671 672 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi( 673 const ImmutableWriteTransactionInfo& write_transaction_info, 674 syncable::BaseTransaction* trans, 675 std::vector<int64>* entries_changed) { 676 // We have been notified about a user action changing a sync model. 677 LOG_IF(WARNING, !change_records_.empty()) << 678 "CALCULATE_CHANGES called with unapplied old changes."; 679 680 // The mutated model type, or UNSPECIFIED if nothing was mutated. 681 ModelTypeSet mutated_model_types; 682 683 const syncable::ImmutableEntryKernelMutationMap& mutations = 684 write_transaction_info.Get().mutations; 685 for (syncable::EntryKernelMutationMap::const_iterator it = 686 mutations.Get().begin(); it != mutations.Get().end(); ++it) { 687 if (!it->second.mutated.ref(syncable::IS_UNSYNCED)) { 688 continue; 689 } 690 691 ModelType model_type = 692 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS)); 693 if (model_type < FIRST_REAL_MODEL_TYPE) { 694 NOTREACHED() << "Permanent or underspecified item changed via syncapi."; 695 continue; 696 } 697 698 // Found real mutation. 699 if (model_type != UNSPECIFIED) { 700 mutated_model_types.Put(model_type); 701 entries_changed->push_back(it->second.mutated.ref(syncable::META_HANDLE)); 702 } 703 } 704 705 // Nudge if necessary. 706 if (!mutated_model_types.Empty()) { 707 if (weak_handle_this_.IsInitialized()) { 708 weak_handle_this_.Call(FROM_HERE, 709 &SyncManagerImpl::RequestNudgeForDataTypes, 710 FROM_HERE, 711 mutated_model_types); 712 } else { 713 NOTREACHED(); 714 } 715 } 716 } 717 718 void SyncManagerImpl::SetExtraChangeRecordData(int64 id, 719 ModelType type, ChangeReorderBuffer* buffer, 720 Cryptographer* cryptographer, const syncable::EntryKernel& original, 721 bool existed_before, bool exists_now) { 722 // If this is a deletion and the datatype was encrypted, we need to decrypt it 723 // and attach it to the buffer. 724 if (!exists_now && existed_before) { 725 sync_pb::EntitySpecifics original_specifics(original.ref(SPECIFICS)); 726 if (type == PASSWORDS) { 727 // Passwords must use their own legacy ExtraPasswordChangeRecordData. 728 scoped_ptr<sync_pb::PasswordSpecificsData> data( 729 DecryptPasswordSpecifics(original_specifics, cryptographer)); 730 if (!data) { 731 NOTREACHED(); 732 return; 733 } 734 buffer->SetExtraDataForId(id, new ExtraPasswordChangeRecordData(*data)); 735 } else if (original_specifics.has_encrypted()) { 736 // All other datatypes can just create a new unencrypted specifics and 737 // attach it. 738 const sync_pb::EncryptedData& encrypted = original_specifics.encrypted(); 739 if (!cryptographer->Decrypt(encrypted, &original_specifics)) { 740 NOTREACHED(); 741 return; 742 } 743 } 744 buffer->SetSpecificsForId(id, original_specifics); 745 } 746 } 747 748 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer( 749 const ImmutableWriteTransactionInfo& write_transaction_info, 750 syncable::BaseTransaction* trans, 751 std::vector<int64>* entries_changed) { 752 // We only expect one notification per sync step, so change_buffers_ should 753 // contain no pending entries. 754 LOG_IF(WARNING, !change_records_.empty()) << 755 "CALCULATE_CHANGES called with unapplied old changes."; 756 757 ChangeReorderBuffer change_buffers[MODEL_TYPE_COUNT]; 758 759 Cryptographer* crypto = directory()->GetCryptographer(trans); 760 const syncable::ImmutableEntryKernelMutationMap& mutations = 761 write_transaction_info.Get().mutations; 762 for (syncable::EntryKernelMutationMap::const_iterator it = 763 mutations.Get().begin(); it != mutations.Get().end(); ++it) { 764 bool existed_before = !it->second.original.ref(syncable::IS_DEL); 765 bool exists_now = !it->second.mutated.ref(syncable::IS_DEL); 766 767 // Omit items that aren't associated with a model. 768 ModelType type = 769 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS)); 770 if (type < FIRST_REAL_MODEL_TYPE) 771 continue; 772 773 int64 handle = it->first; 774 if (exists_now && !existed_before) 775 change_buffers[type].PushAddedItem(handle); 776 else if (!exists_now && existed_before) 777 change_buffers[type].PushDeletedItem(handle); 778 else if (exists_now && existed_before && 779 VisiblePropertiesDiffer(it->second, crypto)) { 780 change_buffers[type].PushUpdatedItem(handle); 781 } 782 783 SetExtraChangeRecordData(handle, type, &change_buffers[type], crypto, 784 it->second.original, existed_before, exists_now); 785 } 786 787 ReadTransaction read_trans(GetUserShare(), trans); 788 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) { 789 if (!change_buffers[i].IsEmpty()) { 790 if (change_buffers[i].GetAllChangesInTreeOrder(&read_trans, 791 &(change_records_[i]))) { 792 for (size_t j = 0; j < change_records_[i].Get().size(); ++j) 793 entries_changed->push_back((change_records_[i].Get())[j].id); 794 } 795 if (change_records_[i].Get().empty()) 796 change_records_.erase(i); 797 } 798 } 799 } 800 801 void SyncManagerImpl::RequestNudgeForDataTypes( 802 const tracked_objects::Location& nudge_location, 803 ModelTypeSet types) { 804 debug_info_event_listener_.OnNudgeFromDatatype(types.First().Get()); 805 806 scheduler_->ScheduleLocalNudge(types, nudge_location); 807 } 808 809 void SyncManagerImpl::NudgeForInitialDownload(syncer::ModelType type) { 810 DCHECK(thread_checker_.CalledOnValidThread()); 811 scheduler_->ScheduleInitialSyncNudge(type); 812 } 813 814 void SyncManagerImpl::NudgeForCommit(syncer::ModelType type) { 815 DCHECK(thread_checker_.CalledOnValidThread()); 816 RequestNudgeForDataTypes(FROM_HERE, ModelTypeSet(type)); 817 } 818 819 void SyncManagerImpl::NudgeForRefresh(syncer::ModelType type) { 820 DCHECK(thread_checker_.CalledOnValidThread()); 821 RefreshTypes(ModelTypeSet(type)); 822 } 823 824 void SyncManagerImpl::OnSyncCycleEvent(const SyncCycleEvent& event) { 825 DCHECK(thread_checker_.CalledOnValidThread()); 826 // Only send an event if this is due to a cycle ending and this cycle 827 // concludes a canonical "sync" process; that is, based on what is known 828 // locally we are "all happy" and up-to-date. There may be new changes on 829 // the server, but we'll get them on a subsequent sync. 830 // 831 // Notifications are sent at the end of every sync cycle, regardless of 832 // whether we should sync again. 833 if (event.what_happened == SyncCycleEvent::SYNC_CYCLE_ENDED) { 834 if (!initialized_) { 835 DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not " 836 << "initialized"; 837 return; 838 } 839 840 DVLOG(1) << "Sending OnSyncCycleCompleted"; 841 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 842 OnSyncCycleCompleted(event.snapshot)); 843 } 844 } 845 846 void SyncManagerImpl::OnActionableError(const SyncProtocolError& error) { 847 FOR_EACH_OBSERVER( 848 SyncManager::Observer, observers_, 849 OnActionableError(error)); 850 } 851 852 void SyncManagerImpl::OnRetryTimeChanged(base::Time) {} 853 854 void SyncManagerImpl::OnThrottledTypesChanged(ModelTypeSet) {} 855 856 void SyncManagerImpl::OnMigrationRequested(ModelTypeSet types) { 857 FOR_EACH_OBSERVER( 858 SyncManager::Observer, observers_, 859 OnMigrationRequested(types)); 860 } 861 862 void SyncManagerImpl::OnProtocolEvent(const ProtocolEvent& event) { 863 protocol_event_buffer_.RecordProtocolEvent(event); 864 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 865 OnProtocolEvent(event)); 866 } 867 868 void SyncManagerImpl::SetJsEventHandler( 869 const WeakHandle<JsEventHandler>& event_handler) { 870 js_sync_manager_observer_.SetJsEventHandler(event_handler); 871 js_mutation_event_observer_.SetJsEventHandler(event_handler); 872 js_sync_encryption_handler_observer_.SetJsEventHandler(event_handler); 873 } 874 875 scoped_ptr<base::ListValue> SyncManagerImpl::GetAllNodesForType( 876 syncer::ModelType type) { 877 DirectoryTypeDebugInfoEmitterMap* emitter_map = 878 model_type_registry_->directory_type_debug_info_emitter_map(); 879 DirectoryTypeDebugInfoEmitterMap::iterator it = emitter_map->find(type); 880 881 if (it == emitter_map->end()) { 882 // This can happen in some cases. The UI thread makes requests of us 883 // when it doesn't really know which types are enabled or disabled. 884 DLOG(WARNING) << "Asked to return debug info for invalid type " 885 << ModelTypeToString(type); 886 return scoped_ptr<base::ListValue>(new base::ListValue()); 887 } 888 889 return it->second->GetAllNodes(); 890 } 891 892 void SyncManagerImpl::SetInvalidatorEnabled(bool invalidator_enabled) { 893 DCHECK(thread_checker_.CalledOnValidThread()); 894 895 DVLOG(1) << "Invalidator enabled state is now: " << invalidator_enabled; 896 allstatus_.SetNotificationsEnabled(invalidator_enabled); 897 scheduler_->SetNotificationsEnabled(invalidator_enabled); 898 } 899 900 void SyncManagerImpl::OnIncomingInvalidation( 901 syncer::ModelType type, 902 scoped_ptr<InvalidationInterface> invalidation) { 903 DCHECK(thread_checker_.CalledOnValidThread()); 904 905 scheduler_->ScheduleInvalidationNudge( 906 type, 907 invalidation.Pass(), 908 FROM_HERE); 909 } 910 911 void SyncManagerImpl::RefreshTypes(ModelTypeSet types) { 912 DCHECK(thread_checker_.CalledOnValidThread()); 913 if (types.Empty()) { 914 LOG(WARNING) << "Sync received refresh request with no types specified."; 915 } else { 916 scheduler_->ScheduleLocalRefreshRequest( 917 types, FROM_HERE); 918 } 919 } 920 921 SyncStatus SyncManagerImpl::GetDetailedStatus() const { 922 return allstatus_.status(); 923 } 924 925 void SyncManagerImpl::SaveChanges() { 926 directory()->SaveChanges(); 927 } 928 929 UserShare* SyncManagerImpl::GetUserShare() { 930 DCHECK(initialized_); 931 return &share_; 932 } 933 934 syncer::SyncContextProxy* SyncManagerImpl::GetSyncContextProxy() { 935 DCHECK(initialized_); 936 return sync_context_proxy_.get(); 937 } 938 939 const std::string SyncManagerImpl::cache_guid() { 940 DCHECK(initialized_); 941 return directory()->cache_guid(); 942 } 943 944 bool SyncManagerImpl::ReceivedExperiment(Experiments* experiments) { 945 ReadTransaction trans(FROM_HERE, GetUserShare()); 946 ReadNode nigori_node(&trans); 947 if (nigori_node.InitTypeRoot(NIGORI) != BaseNode::INIT_OK) { 948 DVLOG(1) << "Couldn't find Nigori node."; 949 return false; 950 } 951 bool found_experiment = false; 952 953 ReadNode favicon_sync_node(&trans); 954 if (favicon_sync_node.InitByClientTagLookup( 955 syncer::EXPERIMENTS, 956 syncer::kFaviconSyncTag) == BaseNode::INIT_OK) { 957 experiments->favicon_sync_limit = 958 favicon_sync_node.GetExperimentsSpecifics().favicon_sync(). 959 favicon_sync_limit(); 960 found_experiment = true; 961 } 962 963 ReadNode pre_commit_update_avoidance_node(&trans); 964 if (pre_commit_update_avoidance_node.InitByClientTagLookup( 965 syncer::EXPERIMENTS, 966 syncer::kPreCommitUpdateAvoidanceTag) == BaseNode::INIT_OK) { 967 session_context_->set_server_enabled_pre_commit_update_avoidance( 968 pre_commit_update_avoidance_node.GetExperimentsSpecifics(). 969 pre_commit_update_avoidance().enabled()); 970 // We don't bother setting found_experiment. The frontend doesn't need to 971 // know about this. 972 } 973 974 ReadNode gcm_channel_node(&trans); 975 if (gcm_channel_node.InitByClientTagLookup( 976 syncer::EXPERIMENTS, 977 syncer::kGCMChannelTag) == BaseNode::INIT_OK && 978 gcm_channel_node.GetExperimentsSpecifics().gcm_channel().has_enabled()) { 979 experiments->gcm_channel_state = 980 (gcm_channel_node.GetExperimentsSpecifics().gcm_channel().enabled() ? 981 syncer::Experiments::ENABLED : syncer::Experiments::SUPPRESSED); 982 found_experiment = true; 983 } 984 985 ReadNode enhanced_bookmarks_node(&trans); 986 if (enhanced_bookmarks_node.InitByClientTagLookup( 987 syncer::EXPERIMENTS, syncer::kEnhancedBookmarksTag) == 988 BaseNode::INIT_OK && 989 enhanced_bookmarks_node.GetExperimentsSpecifics() 990 .has_enhanced_bookmarks()) { 991 const sync_pb::EnhancedBookmarksFlags& enhanced_bookmarks = 992 enhanced_bookmarks_node.GetExperimentsSpecifics().enhanced_bookmarks(); 993 if (enhanced_bookmarks.has_enabled()) 994 experiments->enhanced_bookmarks_enabled = enhanced_bookmarks.enabled(); 995 if (enhanced_bookmarks.has_extension_id()) { 996 experiments->enhanced_bookmarks_ext_id = 997 enhanced_bookmarks.extension_id(); 998 } 999 found_experiment = true; 1000 } 1001 1002 ReadNode gcm_invalidations_node(&trans); 1003 if (gcm_invalidations_node.InitByClientTagLookup( 1004 syncer::EXPERIMENTS, syncer::kGCMInvalidationsTag) == 1005 BaseNode::INIT_OK) { 1006 const sync_pb::GcmInvalidationsFlags& gcm_invalidations = 1007 gcm_invalidations_node.GetExperimentsSpecifics().gcm_invalidations(); 1008 if (gcm_invalidations.has_enabled()) { 1009 experiments->gcm_invalidations_enabled = gcm_invalidations.enabled(); 1010 found_experiment = true; 1011 } 1012 } 1013 1014 return found_experiment; 1015 } 1016 1017 bool SyncManagerImpl::HasUnsyncedItems() { 1018 ReadTransaction trans(FROM_HERE, GetUserShare()); 1019 return (trans.GetWrappedTrans()->directory()->unsynced_entity_count() != 0); 1020 } 1021 1022 SyncEncryptionHandler* SyncManagerImpl::GetEncryptionHandler() { 1023 return sync_encryption_handler_.get(); 1024 } 1025 1026 ScopedVector<syncer::ProtocolEvent> 1027 SyncManagerImpl::GetBufferedProtocolEvents() { 1028 return protocol_event_buffer_.GetBufferedProtocolEvents(); 1029 } 1030 1031 void SyncManagerImpl::RegisterDirectoryTypeDebugInfoObserver( 1032 syncer::TypeDebugInfoObserver* observer) { 1033 model_type_registry_->RegisterDirectoryTypeDebugInfoObserver(observer); 1034 } 1035 1036 void SyncManagerImpl::UnregisterDirectoryTypeDebugInfoObserver( 1037 syncer::TypeDebugInfoObserver* observer) { 1038 model_type_registry_->UnregisterDirectoryTypeDebugInfoObserver(observer); 1039 } 1040 1041 bool SyncManagerImpl::HasDirectoryTypeDebugInfoObserver( 1042 syncer::TypeDebugInfoObserver* observer) { 1043 return model_type_registry_->HasDirectoryTypeDebugInfoObserver(observer); 1044 } 1045 1046 void SyncManagerImpl::RequestEmitDebugInfo() { 1047 model_type_registry_->RequestEmitDebugInfo(); 1048 } 1049 1050 } // namespace syncer 1051