1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "components/invalidation/sync_invalidation_listener.h" 6 7 #include <vector> 8 9 #include "base/bind.h" 10 #include "base/callback.h" 11 #include "base/compiler_specific.h" 12 #include "base/logging.h" 13 #include "base/tracked_objects.h" 14 #include "google/cacheinvalidation/include/invalidation-client.h" 15 #include "google/cacheinvalidation/include/types.h" 16 #include "jingle/notifier/listener/push_client.h" 17 #include "sync/notifier/invalidation_util.h" 18 #include "sync/notifier/object_id_invalidation_map.h" 19 #include "sync/notifier/registration_manager.h" 20 21 namespace { 22 23 const char kApplicationName[] = "chrome-sync"; 24 25 } // namespace 26 27 namespace syncer { 28 29 SyncInvalidationListener::Delegate::~Delegate() {} 30 31 SyncInvalidationListener::SyncInvalidationListener( 32 scoped_ptr<SyncNetworkChannel> network_channel) 33 : sync_network_channel_(network_channel.Pass()), 34 sync_system_resources_(sync_network_channel_.get(), this), 35 delegate_(NULL), 36 ticl_state_(DEFAULT_INVALIDATION_ERROR), 37 push_client_state_(DEFAULT_INVALIDATION_ERROR), 38 weak_ptr_factory_(this) { 39 DCHECK(CalledOnValidThread()); 40 sync_network_channel_->AddObserver(this); 41 } 42 43 SyncInvalidationListener::~SyncInvalidationListener() { 44 DCHECK(CalledOnValidThread()); 45 sync_network_channel_->RemoveObserver(this); 46 Stop(); 47 DCHECK(!delegate_); 48 } 49 50 void SyncInvalidationListener::Start( 51 const CreateInvalidationClientCallback& 52 create_invalidation_client_callback, 53 const std::string& client_id, const std::string& client_info, 54 const std::string& invalidation_bootstrap_data, 55 const UnackedInvalidationsMap& initial_unacked_invalidations, 56 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, 57 Delegate* delegate) { 58 DCHECK(CalledOnValidThread()); 59 Stop(); 60 61 sync_system_resources_.set_platform(client_info); 62 sync_system_resources_.Start(); 63 64 // The Storage resource is implemented as a write-through cache. We populate 65 // it with the initial state on startup, so subsequent writes go to disk and 66 // update the in-memory cache, while reads just return the cached state. 67 sync_system_resources_.storage()->SetInitialState( 68 invalidation_bootstrap_data); 69 70 unacked_invalidations_map_ = initial_unacked_invalidations; 71 invalidation_state_tracker_ = invalidation_state_tracker; 72 DCHECK(invalidation_state_tracker_.IsInitialized()); 73 74 DCHECK(!delegate_); 75 DCHECK(delegate); 76 delegate_ = delegate; 77 78 invalidation_client_.reset(create_invalidation_client_callback.Run( 79 &sync_system_resources_, 80 sync_network_channel_->GetInvalidationClientType(), 81 client_id, 82 kApplicationName, 83 this)); 84 invalidation_client_->Start(); 85 86 registration_manager_.reset( 87 new RegistrationManager(invalidation_client_.get())); 88 } 89 90 void SyncInvalidationListener::UpdateCredentials( 91 const std::string& email, const std::string& token) { 92 DCHECK(CalledOnValidThread()); 93 sync_network_channel_->UpdateCredentials(email, token); 94 } 95 96 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { 97 DCHECK(CalledOnValidThread()); 98 registered_ids_ = ids; 99 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a 100 // working XMPP connection (as observed by us), so check it instead 101 // of GetState() (see http://crbug.com/139424). 102 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) { 103 DoRegistrationUpdate(); 104 } 105 } 106 107 void SyncInvalidationListener::Ready( 108 invalidation::InvalidationClient* client) { 109 DCHECK(CalledOnValidThread()); 110 DCHECK_EQ(client, invalidation_client_.get()); 111 ticl_state_ = INVALIDATIONS_ENABLED; 112 EmitStateChange(); 113 DoRegistrationUpdate(); 114 } 115 116 void SyncInvalidationListener::Invalidate( 117 invalidation::InvalidationClient* client, 118 const invalidation::Invalidation& invalidation, 119 const invalidation::AckHandle& ack_handle) { 120 DCHECK(CalledOnValidThread()); 121 DCHECK_EQ(client, invalidation_client_.get()); 122 client->Acknowledge(ack_handle); 123 124 const invalidation::ObjectId& id = invalidation.object_id(); 125 126 std::string payload; 127 // payload() CHECK()'s has_payload(), so we must check it ourselves first. 128 if (invalidation.has_payload()) 129 payload = invalidation.payload(); 130 131 DVLOG(2) << "Received invalidation with version " << invalidation.version() 132 << " for " << ObjectIdToString(id); 133 134 ObjectIdInvalidationMap invalidations; 135 Invalidation inv = Invalidation::Init(id, invalidation.version(), payload); 136 inv.set_ack_handler(GetThisAsAckHandler()); 137 invalidations.Insert(inv); 138 139 DispatchInvalidations(invalidations); 140 } 141 142 void SyncInvalidationListener::InvalidateUnknownVersion( 143 invalidation::InvalidationClient* client, 144 const invalidation::ObjectId& object_id, 145 const invalidation::AckHandle& ack_handle) { 146 DCHECK(CalledOnValidThread()); 147 DCHECK_EQ(client, invalidation_client_.get()); 148 DVLOG(1) << "InvalidateUnknownVersion"; 149 client->Acknowledge(ack_handle); 150 151 ObjectIdInvalidationMap invalidations; 152 Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id); 153 unknown_version.set_ack_handler(GetThisAsAckHandler()); 154 invalidations.Insert(unknown_version); 155 156 DispatchInvalidations(invalidations); 157 } 158 159 // This should behave as if we got an invalidation with version 160 // UNKNOWN_OBJECT_VERSION for all known data types. 161 void SyncInvalidationListener::InvalidateAll( 162 invalidation::InvalidationClient* client, 163 const invalidation::AckHandle& ack_handle) { 164 DCHECK(CalledOnValidThread()); 165 DCHECK_EQ(client, invalidation_client_.get()); 166 DVLOG(1) << "InvalidateAll"; 167 client->Acknowledge(ack_handle); 168 169 ObjectIdInvalidationMap invalidations; 170 for (ObjectIdSet::iterator it = registered_ids_.begin(); 171 it != registered_ids_.end(); ++it) { 172 Invalidation unknown_version = Invalidation::InitUnknownVersion(*it); 173 unknown_version.set_ack_handler(GetThisAsAckHandler()); 174 invalidations.Insert(unknown_version); 175 } 176 177 DispatchInvalidations(invalidations); 178 } 179 180 // If a handler is registered, emit right away. Otherwise, save it for later. 181 void SyncInvalidationListener::DispatchInvalidations( 182 const ObjectIdInvalidationMap& invalidations) { 183 DCHECK(CalledOnValidThread()); 184 185 ObjectIdInvalidationMap to_save = invalidations; 186 ObjectIdInvalidationMap to_emit = 187 invalidations.GetSubsetWithObjectIds(registered_ids_); 188 189 SaveInvalidations(to_save); 190 EmitSavedInvalidations(to_emit); 191 } 192 193 void SyncInvalidationListener::SaveInvalidations( 194 const ObjectIdInvalidationMap& to_save) { 195 ObjectIdSet objects_to_save = to_save.GetObjectIds(); 196 for (ObjectIdSet::const_iterator it = objects_to_save.begin(); 197 it != objects_to_save.end(); ++it) { 198 UnackedInvalidationsMap::iterator lookup = 199 unacked_invalidations_map_.find(*it); 200 if (lookup == unacked_invalidations_map_.end()) { 201 lookup = unacked_invalidations_map_.insert( 202 std::make_pair(*it, UnackedInvalidationSet(*it))).first; 203 } 204 lookup->second.AddSet(to_save.ForObject(*it)); 205 } 206 207 invalidation_state_tracker_.Call( 208 FROM_HERE, 209 &InvalidationStateTracker::SetSavedInvalidations, 210 unacked_invalidations_map_); 211 } 212 213 void SyncInvalidationListener::EmitSavedInvalidations( 214 const ObjectIdInvalidationMap& to_emit) { 215 DVLOG(2) << "Emitting invalidations: " << to_emit.ToString(); 216 delegate_->OnInvalidate(to_emit); 217 } 218 219 void SyncInvalidationListener::InformRegistrationStatus( 220 invalidation::InvalidationClient* client, 221 const invalidation::ObjectId& object_id, 222 InvalidationListener::RegistrationState new_state) { 223 DCHECK(CalledOnValidThread()); 224 DCHECK_EQ(client, invalidation_client_.get()); 225 DVLOG(1) << "InformRegistrationStatus: " 226 << ObjectIdToString(object_id) << " " << new_state; 227 228 if (new_state != InvalidationListener::REGISTERED) { 229 // Let |registration_manager_| handle the registration backoff policy. 230 registration_manager_->MarkRegistrationLost(object_id); 231 } 232 } 233 234 void SyncInvalidationListener::InformRegistrationFailure( 235 invalidation::InvalidationClient* client, 236 const invalidation::ObjectId& object_id, 237 bool is_transient, 238 const std::string& error_message) { 239 DCHECK(CalledOnValidThread()); 240 DCHECK_EQ(client, invalidation_client_.get()); 241 DVLOG(1) << "InformRegistrationFailure: " 242 << ObjectIdToString(object_id) 243 << "is_transient=" << is_transient 244 << ", message=" << error_message; 245 246 if (is_transient) { 247 // We don't care about |unknown_hint|; we let 248 // |registration_manager_| handle the registration backoff policy. 249 registration_manager_->MarkRegistrationLost(object_id); 250 } else { 251 // Non-transient failures require an action to resolve. This could happen 252 // because: 253 // - the server doesn't yet recognize the data type, which could happen for 254 // brand-new data types. 255 // - the user has changed his password and hasn't updated it yet locally. 256 // Either way, block future registration attempts for |object_id|. However, 257 // we don't forget any saved invalidation state since we may use it once the 258 // error is addressed. 259 registration_manager_->DisableId(object_id); 260 } 261 } 262 263 void SyncInvalidationListener::ReissueRegistrations( 264 invalidation::InvalidationClient* client, 265 const std::string& prefix, 266 int prefix_length) { 267 DCHECK(CalledOnValidThread()); 268 DCHECK_EQ(client, invalidation_client_.get()); 269 DVLOG(1) << "AllRegistrationsLost"; 270 registration_manager_->MarkAllRegistrationsLost(); 271 } 272 273 void SyncInvalidationListener::InformError( 274 invalidation::InvalidationClient* client, 275 const invalidation::ErrorInfo& error_info) { 276 DCHECK(CalledOnValidThread()); 277 DCHECK_EQ(client, invalidation_client_.get()); 278 LOG(ERROR) << "Ticl error " << error_info.error_reason() << ": " 279 << error_info.error_message() 280 << " (transient = " << error_info.is_transient() << ")"; 281 if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) { 282 ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED; 283 } else { 284 ticl_state_ = TRANSIENT_INVALIDATION_ERROR; 285 } 286 EmitStateChange(); 287 } 288 289 void SyncInvalidationListener::Acknowledge( 290 const invalidation::ObjectId& id, 291 const syncer::AckHandle& handle) { 292 UnackedInvalidationsMap::iterator lookup = 293 unacked_invalidations_map_.find(id); 294 if (lookup == unacked_invalidations_map_.end()) { 295 DLOG(WARNING) << "Received acknowledgement for untracked object ID"; 296 return; 297 } 298 lookup->second.Acknowledge(handle); 299 invalidation_state_tracker_.Call( 300 FROM_HERE, 301 &InvalidationStateTracker::SetSavedInvalidations, 302 unacked_invalidations_map_); 303 } 304 305 void SyncInvalidationListener::Drop( 306 const invalidation::ObjectId& id, 307 const syncer::AckHandle& handle) { 308 UnackedInvalidationsMap::iterator lookup = 309 unacked_invalidations_map_.find(id); 310 if (lookup == unacked_invalidations_map_.end()) { 311 DLOG(WARNING) << "Received drop for untracked object ID"; 312 return; 313 } 314 lookup->second.Drop(handle); 315 invalidation_state_tracker_.Call( 316 FROM_HERE, 317 &InvalidationStateTracker::SetSavedInvalidations, 318 unacked_invalidations_map_); 319 } 320 321 void SyncInvalidationListener::WriteState(const std::string& state) { 322 DCHECK(CalledOnValidThread()); 323 DVLOG(1) << "WriteState"; 324 invalidation_state_tracker_.Call( 325 FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state); 326 } 327 328 void SyncInvalidationListener::DoRegistrationUpdate() { 329 DCHECK(CalledOnValidThread()); 330 const ObjectIdSet& unregistered_ids = 331 registration_manager_->UpdateRegisteredIds(registered_ids_); 332 for (ObjectIdSet::iterator it = unregistered_ids.begin(); 333 it != unregistered_ids.end(); ++it) { 334 unacked_invalidations_map_.erase(*it); 335 } 336 invalidation_state_tracker_.Call( 337 FROM_HERE, 338 &InvalidationStateTracker::SetSavedInvalidations, 339 unacked_invalidations_map_); 340 341 ObjectIdInvalidationMap object_id_invalidation_map; 342 for (UnackedInvalidationsMap::iterator map_it = 343 unacked_invalidations_map_.begin(); 344 map_it != unacked_invalidations_map_.end(); ++map_it) { 345 if (registered_ids_.find(map_it->first) == registered_ids_.end()) { 346 continue; 347 } 348 map_it->second.ExportInvalidations( 349 GetThisAsAckHandler(), 350 &object_id_invalidation_map); 351 } 352 353 // There's no need to run these through DispatchInvalidations(); they've 354 // already been saved to storage (that's where we found them) so all we need 355 // to do now is emit them. 356 EmitSavedInvalidations(object_id_invalidation_map); 357 } 358 359 void SyncInvalidationListener::RequestDetailedStatus( 360 base::Callback<void(const base::DictionaryValue&)> callback) const { 361 DCHECK(CalledOnValidThread()); 362 sync_network_channel_->RequestDetailedStatus(callback); 363 callback.Run(*CollectDebugData()); 364 } 365 366 scoped_ptr<base::DictionaryValue> 367 SyncInvalidationListener::CollectDebugData() const { 368 scoped_ptr<base::DictionaryValue> return_value(new base::DictionaryValue()); 369 return_value->SetString( 370 "SyncInvalidationListener.PushClientState", 371 std::string(InvalidatorStateToString(push_client_state_))); 372 return_value->SetString("SyncInvalidationListener.TiclState", 373 std::string(InvalidatorStateToString(ticl_state_))); 374 scoped_ptr<base::DictionaryValue> unacked_map(new base::DictionaryValue()); 375 for (UnackedInvalidationsMap::const_iterator it = 376 unacked_invalidations_map_.begin(); 377 it != unacked_invalidations_map_.end(); 378 ++it) { 379 unacked_map->Set((it->first).name(), (it->second).ToValue().release()); 380 } 381 return_value->Set("SyncInvalidationListener.UnackedInvalidationsMap", 382 unacked_map.release()); 383 return return_value.Pass(); 384 } 385 386 void SyncInvalidationListener::StopForTest() { 387 DCHECK(CalledOnValidThread()); 388 Stop(); 389 } 390 391 void SyncInvalidationListener::Stop() { 392 DCHECK(CalledOnValidThread()); 393 if (!invalidation_client_) { 394 return; 395 } 396 397 registration_manager_.reset(); 398 sync_system_resources_.Stop(); 399 invalidation_client_->Stop(); 400 401 invalidation_client_.reset(); 402 delegate_ = NULL; 403 404 ticl_state_ = DEFAULT_INVALIDATION_ERROR; 405 push_client_state_ = DEFAULT_INVALIDATION_ERROR; 406 } 407 408 InvalidatorState SyncInvalidationListener::GetState() const { 409 DCHECK(CalledOnValidThread()); 410 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED || 411 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) { 412 // If either the ticl or the push client rejected our credentials, 413 // return INVALIDATION_CREDENTIALS_REJECTED. 414 return INVALIDATION_CREDENTIALS_REJECTED; 415 } 416 if (ticl_state_ == INVALIDATIONS_ENABLED && 417 push_client_state_ == INVALIDATIONS_ENABLED) { 418 // If the ticl is ready and the push client notifications are 419 // enabled, return INVALIDATIONS_ENABLED. 420 return INVALIDATIONS_ENABLED; 421 } 422 // Otherwise, we have a transient error. 423 return TRANSIENT_INVALIDATION_ERROR; 424 } 425 426 void SyncInvalidationListener::EmitStateChange() { 427 DCHECK(CalledOnValidThread()); 428 delegate_->OnInvalidatorStateChange(GetState()); 429 } 430 431 WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() { 432 DCHECK(CalledOnValidThread()); 433 return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr()); 434 } 435 436 void SyncInvalidationListener::OnNetworkChannelStateChanged( 437 InvalidatorState invalidator_state) { 438 DCHECK(CalledOnValidThread()); 439 push_client_state_ = invalidator_state; 440 EmitStateChange(); 441 } 442 443 } // namespace syncer 444