1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "sync/notifier/sync_invalidation_listener.h" 6 7 #include <vector> 8 9 #include "base/bind.h" 10 #include "base/callback.h" 11 #include "base/compiler_specific.h" 12 #include "base/logging.h" 13 #include "base/tracked_objects.h" 14 #include "google/cacheinvalidation/include/invalidation-client.h" 15 #include "google/cacheinvalidation/include/types.h" 16 #include "google/cacheinvalidation/types.pb.h" 17 #include "jingle/notifier/listener/push_client.h" 18 #include "sync/notifier/invalidation_util.h" 19 #include "sync/notifier/object_id_invalidation_map.h" 20 #include "sync/notifier/registration_manager.h" 21 22 namespace { 23 24 const char kApplicationName[] = "chrome-sync"; 25 26 } // namespace 27 28 namespace syncer { 29 30 SyncInvalidationListener::Delegate::~Delegate() {} 31 32 SyncInvalidationListener::SyncInvalidationListener( 33 scoped_ptr<notifier::PushClient> push_client) 34 : push_client_channel_(push_client.Pass()), 35 sync_system_resources_(&push_client_channel_, this), 36 delegate_(NULL), 37 ticl_state_(DEFAULT_INVALIDATION_ERROR), 38 push_client_state_(DEFAULT_INVALIDATION_ERROR), 39 weak_ptr_factory_(this) { 40 DCHECK(CalledOnValidThread()); 41 push_client_channel_.AddObserver(this); 42 } 43 44 SyncInvalidationListener::~SyncInvalidationListener() { 45 DCHECK(CalledOnValidThread()); 46 push_client_channel_.RemoveObserver(this); 47 Stop(); 48 DCHECK(!delegate_); 49 } 50 51 void SyncInvalidationListener::Start( 52 const CreateInvalidationClientCallback& 53 create_invalidation_client_callback, 54 const std::string& client_id, const std::string& client_info, 55 const std::string& invalidation_bootstrap_data, 56 const UnackedInvalidationsMap& initial_unacked_invalidations, 57 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, 58 Delegate* delegate) { 59 DCHECK(CalledOnValidThread()); 60 Stop(); 61 62 sync_system_resources_.set_platform(client_info); 63 sync_system_resources_.Start(); 64 65 // The Storage resource is implemented as a write-through cache. We populate 66 // it with the initial state on startup, so subsequent writes go to disk and 67 // update the in-memory cache, while reads just return the cached state. 68 sync_system_resources_.storage()->SetInitialState( 69 invalidation_bootstrap_data); 70 71 unacked_invalidations_map_ = initial_unacked_invalidations; 72 invalidation_state_tracker_ = invalidation_state_tracker; 73 DCHECK(invalidation_state_tracker_.IsInitialized()); 74 75 DCHECK(!delegate_); 76 DCHECK(delegate); 77 delegate_ = delegate; 78 79 #if defined(OS_IOS) 80 int client_type = ipc::invalidation::ClientType::CHROME_SYNC_IOS; 81 #else 82 int client_type = ipc::invalidation::ClientType::CHROME_SYNC; 83 #endif 84 invalidation_client_.reset( 85 create_invalidation_client_callback.Run( 86 &sync_system_resources_, client_type, client_id, 87 kApplicationName, this)); 88 invalidation_client_->Start(); 89 90 registration_manager_.reset( 91 new RegistrationManager(invalidation_client_.get())); 92 } 93 94 void SyncInvalidationListener::UpdateCredentials( 95 const std::string& email, const std::string& token) { 96 DCHECK(CalledOnValidThread()); 97 push_client_channel_.UpdateCredentials(email, token); 98 } 99 100 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { 101 DCHECK(CalledOnValidThread()); 102 registered_ids_ = ids; 103 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a 104 // working XMPP connection (as observed by us), so check it instead 105 // of GetState() (see http://crbug.com/139424). 106 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) { 107 DoRegistrationUpdate(); 108 } 109 } 110 111 void SyncInvalidationListener::Ready( 112 invalidation::InvalidationClient* client) { 113 DCHECK(CalledOnValidThread()); 114 DCHECK_EQ(client, invalidation_client_.get()); 115 ticl_state_ = INVALIDATIONS_ENABLED; 116 EmitStateChange(); 117 DoRegistrationUpdate(); 118 } 119 120 void SyncInvalidationListener::Invalidate( 121 invalidation::InvalidationClient* client, 122 const invalidation::Invalidation& invalidation, 123 const invalidation::AckHandle& ack_handle) { 124 DCHECK(CalledOnValidThread()); 125 DCHECK_EQ(client, invalidation_client_.get()); 126 client->Acknowledge(ack_handle); 127 128 const invalidation::ObjectId& id = invalidation.object_id(); 129 130 std::string payload; 131 // payload() CHECK()'s has_payload(), so we must check it ourselves first. 132 if (invalidation.has_payload()) 133 payload = invalidation.payload(); 134 135 DVLOG(2) << "Received invalidation with version " << invalidation.version() 136 << " for " << ObjectIdToString(id); 137 138 ObjectIdInvalidationMap invalidations; 139 Invalidation inv = Invalidation::Init(id, invalidation.version(), payload); 140 inv.set_ack_handler(GetThisAsAckHandler()); 141 invalidations.Insert(inv); 142 143 DispatchInvalidations(invalidations); 144 } 145 146 void SyncInvalidationListener::InvalidateUnknownVersion( 147 invalidation::InvalidationClient* client, 148 const invalidation::ObjectId& object_id, 149 const invalidation::AckHandle& ack_handle) { 150 DCHECK(CalledOnValidThread()); 151 DCHECK_EQ(client, invalidation_client_.get()); 152 DVLOG(1) << "InvalidateUnknownVersion"; 153 client->Acknowledge(ack_handle); 154 155 ObjectIdInvalidationMap invalidations; 156 Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id); 157 unknown_version.set_ack_handler(GetThisAsAckHandler()); 158 invalidations.Insert(unknown_version); 159 160 DispatchInvalidations(invalidations); 161 } 162 163 // This should behave as if we got an invalidation with version 164 // UNKNOWN_OBJECT_VERSION for all known data types. 165 void SyncInvalidationListener::InvalidateAll( 166 invalidation::InvalidationClient* client, 167 const invalidation::AckHandle& ack_handle) { 168 DCHECK(CalledOnValidThread()); 169 DCHECK_EQ(client, invalidation_client_.get()); 170 DVLOG(1) << "InvalidateAll"; 171 client->Acknowledge(ack_handle); 172 173 ObjectIdInvalidationMap invalidations; 174 for (ObjectIdSet::iterator it = registered_ids_.begin(); 175 it != registered_ids_.end(); ++it) { 176 Invalidation unknown_version = Invalidation::InitUnknownVersion(*it); 177 unknown_version.set_ack_handler(GetThisAsAckHandler()); 178 invalidations.Insert(unknown_version); 179 } 180 181 DispatchInvalidations(invalidations); 182 } 183 184 // If a handler is registered, emit right away. Otherwise, save it for later. 185 void SyncInvalidationListener::DispatchInvalidations( 186 const ObjectIdInvalidationMap& invalidations) { 187 DCHECK(CalledOnValidThread()); 188 189 ObjectIdInvalidationMap to_save = invalidations; 190 ObjectIdInvalidationMap to_emit = 191 invalidations.GetSubsetWithObjectIds(registered_ids_); 192 193 SaveInvalidations(to_save); 194 EmitSavedInvalidations(to_emit); 195 } 196 197 void SyncInvalidationListener::SaveInvalidations( 198 const ObjectIdInvalidationMap& to_save) { 199 ObjectIdSet objects_to_save = to_save.GetObjectIds(); 200 for (ObjectIdSet::const_iterator it = objects_to_save.begin(); 201 it != objects_to_save.end(); ++it) { 202 UnackedInvalidationsMap::iterator lookup = 203 unacked_invalidations_map_.find(*it); 204 if (lookup == unacked_invalidations_map_.end()) { 205 lookup = unacked_invalidations_map_.insert( 206 std::make_pair(*it, UnackedInvalidationSet(*it))).first; 207 } 208 lookup->second.AddSet(to_save.ForObject(*it)); 209 } 210 211 invalidation_state_tracker_.Call( 212 FROM_HERE, 213 &InvalidationStateTracker::SetSavedInvalidations, 214 unacked_invalidations_map_); 215 } 216 217 void SyncInvalidationListener::EmitSavedInvalidations( 218 const ObjectIdInvalidationMap& to_emit) { 219 DVLOG(2) << "Emitting invalidations: " << to_emit.ToString(); 220 delegate_->OnInvalidate(to_emit); 221 } 222 223 void SyncInvalidationListener::InformRegistrationStatus( 224 invalidation::InvalidationClient* client, 225 const invalidation::ObjectId& object_id, 226 InvalidationListener::RegistrationState new_state) { 227 DCHECK(CalledOnValidThread()); 228 DCHECK_EQ(client, invalidation_client_.get()); 229 DVLOG(1) << "InformRegistrationStatus: " 230 << ObjectIdToString(object_id) << " " << new_state; 231 232 if (new_state != InvalidationListener::REGISTERED) { 233 // Let |registration_manager_| handle the registration backoff policy. 234 registration_manager_->MarkRegistrationLost(object_id); 235 } 236 } 237 238 void SyncInvalidationListener::InformRegistrationFailure( 239 invalidation::InvalidationClient* client, 240 const invalidation::ObjectId& object_id, 241 bool is_transient, 242 const std::string& error_message) { 243 DCHECK(CalledOnValidThread()); 244 DCHECK_EQ(client, invalidation_client_.get()); 245 DVLOG(1) << "InformRegistrationFailure: " 246 << ObjectIdToString(object_id) 247 << "is_transient=" << is_transient 248 << ", message=" << error_message; 249 250 if (is_transient) { 251 // We don't care about |unknown_hint|; we let 252 // |registration_manager_| handle the registration backoff policy. 253 registration_manager_->MarkRegistrationLost(object_id); 254 } else { 255 // Non-transient failures require an action to resolve. This could happen 256 // because: 257 // - the server doesn't yet recognize the data type, which could happen for 258 // brand-new data types. 259 // - the user has changed his password and hasn't updated it yet locally. 260 // Either way, block future registration attempts for |object_id|. However, 261 // we don't forget any saved invalidation state since we may use it once the 262 // error is addressed. 263 registration_manager_->DisableId(object_id); 264 } 265 } 266 267 void SyncInvalidationListener::ReissueRegistrations( 268 invalidation::InvalidationClient* client, 269 const std::string& prefix, 270 int prefix_length) { 271 DCHECK(CalledOnValidThread()); 272 DCHECK_EQ(client, invalidation_client_.get()); 273 DVLOG(1) << "AllRegistrationsLost"; 274 registration_manager_->MarkAllRegistrationsLost(); 275 } 276 277 void SyncInvalidationListener::InformError( 278 invalidation::InvalidationClient* client, 279 const invalidation::ErrorInfo& error_info) { 280 DCHECK(CalledOnValidThread()); 281 DCHECK_EQ(client, invalidation_client_.get()); 282 LOG(ERROR) << "Ticl error " << error_info.error_reason() << ": " 283 << error_info.error_message() 284 << " (transient = " << error_info.is_transient() << ")"; 285 if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) { 286 ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED; 287 } else { 288 ticl_state_ = TRANSIENT_INVALIDATION_ERROR; 289 } 290 EmitStateChange(); 291 } 292 293 void SyncInvalidationListener::Acknowledge( 294 const invalidation::ObjectId& id, 295 const syncer::AckHandle& handle) { 296 UnackedInvalidationsMap::iterator lookup = 297 unacked_invalidations_map_.find(id); 298 if (lookup == unacked_invalidations_map_.end()) { 299 DLOG(WARNING) << "Received acknowledgement for untracked object ID"; 300 return; 301 } 302 lookup->second.Acknowledge(handle); 303 invalidation_state_tracker_.Call( 304 FROM_HERE, 305 &InvalidationStateTracker::SetSavedInvalidations, 306 unacked_invalidations_map_); 307 } 308 309 void SyncInvalidationListener::Drop( 310 const invalidation::ObjectId& id, 311 const syncer::AckHandle& handle) { 312 UnackedInvalidationsMap::iterator lookup = 313 unacked_invalidations_map_.find(id); 314 if (lookup == unacked_invalidations_map_.end()) { 315 DLOG(WARNING) << "Received drop for untracked object ID"; 316 return; 317 } 318 lookup->second.Drop(handle); 319 invalidation_state_tracker_.Call( 320 FROM_HERE, 321 &InvalidationStateTracker::SetSavedInvalidations, 322 unacked_invalidations_map_); 323 } 324 325 void SyncInvalidationListener::WriteState(const std::string& state) { 326 DCHECK(CalledOnValidThread()); 327 DVLOG(1) << "WriteState"; 328 invalidation_state_tracker_.Call( 329 FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state); 330 } 331 332 void SyncInvalidationListener::DoRegistrationUpdate() { 333 DCHECK(CalledOnValidThread()); 334 const ObjectIdSet& unregistered_ids = 335 registration_manager_->UpdateRegisteredIds(registered_ids_); 336 for (ObjectIdSet::iterator it = unregistered_ids.begin(); 337 it != unregistered_ids.end(); ++it) { 338 unacked_invalidations_map_.erase(*it); 339 } 340 invalidation_state_tracker_.Call( 341 FROM_HERE, 342 &InvalidationStateTracker::SetSavedInvalidations, 343 unacked_invalidations_map_); 344 345 ObjectIdInvalidationMap object_id_invalidation_map; 346 for (UnackedInvalidationsMap::iterator map_it = 347 unacked_invalidations_map_.begin(); 348 map_it != unacked_invalidations_map_.end(); ++map_it) { 349 if (registered_ids_.find(map_it->first) == registered_ids_.end()) { 350 continue; 351 } 352 map_it->second.ExportInvalidations( 353 GetThisAsAckHandler(), 354 &object_id_invalidation_map); 355 } 356 357 // There's no need to run these through DispatchInvalidations(); they've 358 // already been saved to storage (that's where we found them) so all we need 359 // to do now is emit them. 360 EmitSavedInvalidations(object_id_invalidation_map); 361 } 362 363 void SyncInvalidationListener::StopForTest() { 364 DCHECK(CalledOnValidThread()); 365 Stop(); 366 } 367 368 void SyncInvalidationListener::Stop() { 369 DCHECK(CalledOnValidThread()); 370 if (!invalidation_client_) { 371 return; 372 } 373 374 registration_manager_.reset(); 375 sync_system_resources_.Stop(); 376 invalidation_client_->Stop(); 377 378 invalidation_client_.reset(); 379 delegate_ = NULL; 380 381 ticl_state_ = DEFAULT_INVALIDATION_ERROR; 382 push_client_state_ = DEFAULT_INVALIDATION_ERROR; 383 } 384 385 InvalidatorState SyncInvalidationListener::GetState() const { 386 DCHECK(CalledOnValidThread()); 387 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED || 388 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) { 389 // If either the ticl or the push client rejected our credentials, 390 // return INVALIDATION_CREDENTIALS_REJECTED. 391 return INVALIDATION_CREDENTIALS_REJECTED; 392 } 393 if (ticl_state_ == INVALIDATIONS_ENABLED && 394 push_client_state_ == INVALIDATIONS_ENABLED) { 395 // If the ticl is ready and the push client notifications are 396 // enabled, return INVALIDATIONS_ENABLED. 397 return INVALIDATIONS_ENABLED; 398 } 399 // Otherwise, we have a transient error. 400 return TRANSIENT_INVALIDATION_ERROR; 401 } 402 403 void SyncInvalidationListener::EmitStateChange() { 404 DCHECK(CalledOnValidThread()); 405 delegate_->OnInvalidatorStateChange(GetState()); 406 } 407 408 WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() { 409 DCHECK(CalledOnValidThread()); 410 return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr()); 411 } 412 413 void SyncInvalidationListener::OnNetworkChannelStateChanged( 414 InvalidatorState invalidator_state) { 415 DCHECK(CalledOnValidThread()); 416 push_client_state_ = invalidator_state; 417 EmitStateChange(); 418 } 419 420 } // namespace syncer 421