1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "sync/notifier/sync_invalidation_listener.h" 6 7 #include <vector> 8 9 #include "base/bind.h" 10 #include "base/callback.h" 11 #include "base/compiler_specific.h" 12 #include "base/logging.h" 13 #include "base/tracked_objects.h" 14 #include "google/cacheinvalidation/include/invalidation-client.h" 15 #include "google/cacheinvalidation/include/types.h" 16 #include "google/cacheinvalidation/types.pb.h" 17 #include "jingle/notifier/listener/push_client.h" 18 #include "sync/notifier/invalidation_util.h" 19 #include "sync/notifier/registration_manager.h" 20 21 namespace { 22 23 const char kApplicationName[] = "chrome-sync"; 24 25 } // namespace 26 27 namespace syncer { 28 29 SyncInvalidationListener::Delegate::~Delegate() {} 30 31 SyncInvalidationListener::SyncInvalidationListener( 32 base::TickClock* tick_clock, 33 scoped_ptr<notifier::PushClient> push_client) 34 : weak_ptr_factory_(this), 35 ack_tracker_(tick_clock, this), 36 push_client_(push_client.get()), 37 sync_system_resources_(push_client.Pass(), this), 38 delegate_(NULL), 39 ticl_state_(DEFAULT_INVALIDATION_ERROR), 40 push_client_state_(DEFAULT_INVALIDATION_ERROR) { 41 DCHECK(CalledOnValidThread()); 42 push_client_->AddObserver(this); 43 } 44 45 SyncInvalidationListener::~SyncInvalidationListener() { 46 DCHECK(CalledOnValidThread()); 47 push_client_->RemoveObserver(this); 48 Stop(); 49 DCHECK(!delegate_); 50 } 51 52 void SyncInvalidationListener::Start( 53 const CreateInvalidationClientCallback& 54 create_invalidation_client_callback, 55 const std::string& client_id, const std::string& client_info, 56 const std::string& invalidation_bootstrap_data, 57 const InvalidationStateMap& initial_invalidation_state_map, 58 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, 59 Delegate* delegate) { 60 DCHECK(CalledOnValidThread()); 61 Stop(); 62 63 sync_system_resources_.set_platform(client_info); 64 sync_system_resources_.Start(); 65 66 // The Storage resource is implemented as a write-through cache. We populate 67 // it with the initial state on startup, so subsequent writes go to disk and 68 // update the in-memory cache, while reads just return the cached state. 69 sync_system_resources_.storage()->SetInitialState( 70 invalidation_bootstrap_data); 71 72 invalidation_state_map_ = initial_invalidation_state_map; 73 if (invalidation_state_map_.empty()) { 74 DVLOG(2) << "No initial max invalidation versions for any id"; 75 } else { 76 for (InvalidationStateMap::const_iterator it = 77 invalidation_state_map_.begin(); 78 it != invalidation_state_map_.end(); ++it) { 79 DVLOG(2) << "Initial max invalidation version for " 80 << ObjectIdToString(it->first) << " is " 81 << it->second.version; 82 } 83 } 84 invalidation_state_tracker_ = invalidation_state_tracker; 85 DCHECK(invalidation_state_tracker_.IsInitialized()); 86 87 DCHECK(!delegate_); 88 DCHECK(delegate); 89 delegate_ = delegate; 90 91 #if defined(OS_IOS) 92 int client_type = ipc::invalidation::ClientType::CHROME_SYNC_IOS; 93 #else 94 int client_type = ipc::invalidation::ClientType::CHROME_SYNC; 95 #endif 96 invalidation_client_.reset( 97 create_invalidation_client_callback.Run( 98 &sync_system_resources_, client_type, client_id, 99 kApplicationName, this)); 100 invalidation_client_->Start(); 101 102 registration_manager_.reset( 103 new RegistrationManager(invalidation_client_.get())); 104 105 // Set up reminders for any invalidations that have not been locally 106 // acknowledged. 107 ObjectIdSet unacknowledged_ids; 108 for (InvalidationStateMap::const_iterator it = 109 invalidation_state_map_.begin(); 110 it != invalidation_state_map_.end(); ++it) { 111 if (it->second.expected.Equals(it->second.current)) 112 continue; 113 unacknowledged_ids.insert(it->first); 114 } 115 if (!unacknowledged_ids.empty()) 116 ack_tracker_.Track(unacknowledged_ids); 117 } 118 119 void SyncInvalidationListener::UpdateCredentials( 120 const std::string& email, const std::string& token) { 121 DCHECK(CalledOnValidThread()); 122 sync_system_resources_.network()->UpdateCredentials(email, token); 123 } 124 125 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { 126 DCHECK(CalledOnValidThread()); 127 registered_ids_ = ids; 128 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a 129 // working XMPP connection (as observed by us), so check it instead 130 // of GetState() (see http://crbug.com/139424). 131 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) { 132 DoRegistrationUpdate(); 133 } 134 } 135 136 void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id, 137 const AckHandle& ack_handle) { 138 DCHECK(CalledOnValidThread()); 139 InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id); 140 if (state_it == invalidation_state_map_.end()) 141 return; 142 invalidation_state_tracker_.Call( 143 FROM_HERE, 144 &InvalidationStateTracker::Acknowledge, 145 id, 146 ack_handle); 147 state_it->second.current = ack_handle; 148 if (state_it->second.expected.Equals(ack_handle)) { 149 // If the received ack matches the expected ack, then we no longer need to 150 // keep track of |id| since it is up-to-date. 151 ObjectIdSet ids; 152 ids.insert(id); 153 ack_tracker_.Ack(ids); 154 } 155 } 156 157 void SyncInvalidationListener::Ready( 158 invalidation::InvalidationClient* client) { 159 DCHECK(CalledOnValidThread()); 160 DCHECK_EQ(client, invalidation_client_.get()); 161 ticl_state_ = INVALIDATIONS_ENABLED; 162 EmitStateChange(); 163 DoRegistrationUpdate(); 164 } 165 166 void SyncInvalidationListener::Invalidate( 167 invalidation::InvalidationClient* client, 168 const invalidation::Invalidation& invalidation, 169 const invalidation::AckHandle& ack_handle) { 170 DCHECK(CalledOnValidThread()); 171 DCHECK_EQ(client, invalidation_client_.get()); 172 DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation); 173 174 const invalidation::ObjectId& id = invalidation.object_id(); 175 176 // The invalidation API spec allows for the possibility of redundant 177 // invalidations, so keep track of the max versions and drop 178 // invalidations with old versions. 179 // 180 // TODO(akalin): Now that we keep track of registered ids, we 181 // should drop invalidations for unregistered ids. We may also 182 // have to filter it at a higher level, as invalidations for 183 // newly-unregistered ids may already be in flight. 184 InvalidationStateMap::const_iterator it = invalidation_state_map_.find(id); 185 if ((it != invalidation_state_map_.end()) && 186 (invalidation.version() <= it->second.version)) { 187 // Drop redundant invalidations. 188 client->Acknowledge(ack_handle); 189 return; 190 } 191 192 std::string payload; 193 // payload() CHECK()'s has_payload(), so we must check it ourselves first. 194 if (invalidation.has_payload()) 195 payload = invalidation.payload(); 196 197 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) 198 << " to " << invalidation.version(); 199 invalidation_state_map_[id].version = invalidation.version(); 200 invalidation_state_map_[id].payload = payload; 201 invalidation_state_tracker_.Call( 202 FROM_HERE, 203 &InvalidationStateTracker::SetMaxVersionAndPayload, 204 id, invalidation.version(), payload); 205 206 ObjectIdSet ids; 207 ids.insert(id); 208 PrepareInvalidation(ids, invalidation.version(), payload, client, ack_handle); 209 } 210 211 void SyncInvalidationListener::InvalidateUnknownVersion( 212 invalidation::InvalidationClient* client, 213 const invalidation::ObjectId& object_id, 214 const invalidation::AckHandle& ack_handle) { 215 DCHECK(CalledOnValidThread()); 216 DCHECK_EQ(client, invalidation_client_.get()); 217 DVLOG(1) << "InvalidateUnknownVersion"; 218 219 ObjectIdSet ids; 220 ids.insert(object_id); 221 PrepareInvalidation( 222 ids, 223 Invalidation::kUnknownVersion, 224 std::string(), 225 client, 226 ack_handle); 227 } 228 229 // This should behave as if we got an invalidation with version 230 // UNKNOWN_OBJECT_VERSION for all known data types. 231 void SyncInvalidationListener::InvalidateAll( 232 invalidation::InvalidationClient* client, 233 const invalidation::AckHandle& ack_handle) { 234 DCHECK(CalledOnValidThread()); 235 DCHECK_EQ(client, invalidation_client_.get()); 236 DVLOG(1) << "InvalidateAll"; 237 238 PrepareInvalidation( 239 registered_ids_, 240 Invalidation::kUnknownVersion, 241 std::string(), 242 client, 243 ack_handle); 244 } 245 246 void SyncInvalidationListener::PrepareInvalidation( 247 const ObjectIdSet& ids, 248 int64 version, 249 const std::string& payload, 250 invalidation::InvalidationClient* client, 251 const invalidation::AckHandle& ack_handle) { 252 DCHECK(CalledOnValidThread()); 253 254 // A server invalidation resets the local retry count. 255 ack_tracker_.Ack(ids); 256 invalidation_state_tracker_.Call( 257 FROM_HERE, 258 &InvalidationStateTracker::GenerateAckHandles, 259 ids, 260 base::MessageLoopProxy::current(), 261 base::Bind(&SyncInvalidationListener::EmitInvalidation, 262 weak_ptr_factory_.GetWeakPtr(), 263 ids, 264 version, 265 payload, 266 client, 267 ack_handle)); 268 } 269 270 void SyncInvalidationListener::EmitInvalidation( 271 const ObjectIdSet& ids, 272 int64 version, 273 const std::string& payload, 274 invalidation::InvalidationClient* client, 275 const invalidation::AckHandle& ack_handle, 276 const AckHandleMap& local_ack_handles) { 277 DCHECK(CalledOnValidThread()); 278 ObjectIdInvalidationMap invalidation_map = 279 ObjectIdSetToInvalidationMap(ids, version, payload); 280 for (AckHandleMap::const_iterator it = local_ack_handles.begin(); 281 it != local_ack_handles.end(); ++it) { 282 // Update in-memory copy of the invalidation state. 283 invalidation_state_map_[it->first].expected = it->second; 284 invalidation_map[it->first].ack_handle = it->second; 285 } 286 ack_tracker_.Track(ids); 287 delegate_->OnInvalidate(invalidation_map); 288 client->Acknowledge(ack_handle); 289 } 290 291 void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) { 292 ObjectIdInvalidationMap invalidation_map; 293 for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) { 294 Invalidation invalidation; 295 invalidation.ack_handle = invalidation_state_map_[*it].expected; 296 invalidation.version = invalidation_state_map_[*it].version; 297 invalidation.payload = invalidation_state_map_[*it].payload; 298 invalidation_map.insert(std::make_pair(*it, invalidation)); 299 } 300 301 delegate_->OnInvalidate(invalidation_map); 302 } 303 304 void SyncInvalidationListener::InformRegistrationStatus( 305 invalidation::InvalidationClient* client, 306 const invalidation::ObjectId& object_id, 307 InvalidationListener::RegistrationState new_state) { 308 DCHECK(CalledOnValidThread()); 309 DCHECK_EQ(client, invalidation_client_.get()); 310 DVLOG(1) << "InformRegistrationStatus: " 311 << ObjectIdToString(object_id) << " " << new_state; 312 313 if (new_state != InvalidationListener::REGISTERED) { 314 // Let |registration_manager_| handle the registration backoff policy. 315 registration_manager_->MarkRegistrationLost(object_id); 316 } 317 } 318 319 void SyncInvalidationListener::InformRegistrationFailure( 320 invalidation::InvalidationClient* client, 321 const invalidation::ObjectId& object_id, 322 bool is_transient, 323 const std::string& error_message) { 324 DCHECK(CalledOnValidThread()); 325 DCHECK_EQ(client, invalidation_client_.get()); 326 DVLOG(1) << "InformRegistrationFailure: " 327 << ObjectIdToString(object_id) 328 << "is_transient=" << is_transient 329 << ", message=" << error_message; 330 331 if (is_transient) { 332 // We don't care about |unknown_hint|; we let 333 // |registration_manager_| handle the registration backoff policy. 334 registration_manager_->MarkRegistrationLost(object_id); 335 } else { 336 // Non-transient failures require an action to resolve. This could happen 337 // because: 338 // - the server doesn't yet recognize the data type, which could happen for 339 // brand-new data types. 340 // - the user has changed his password and hasn't updated it yet locally. 341 // Either way, block future registration attempts for |object_id|. However, 342 // we don't forget any saved invalidation state since we may use it once the 343 // error is addressed. 344 registration_manager_->DisableId(object_id); 345 } 346 } 347 348 void SyncInvalidationListener::ReissueRegistrations( 349 invalidation::InvalidationClient* client, 350 const std::string& prefix, 351 int prefix_length) { 352 DCHECK(CalledOnValidThread()); 353 DCHECK_EQ(client, invalidation_client_.get()); 354 DVLOG(1) << "AllRegistrationsLost"; 355 registration_manager_->MarkAllRegistrationsLost(); 356 } 357 358 void SyncInvalidationListener::InformError( 359 invalidation::InvalidationClient* client, 360 const invalidation::ErrorInfo& error_info) { 361 DCHECK(CalledOnValidThread()); 362 DCHECK_EQ(client, invalidation_client_.get()); 363 LOG(ERROR) << "Ticl error " << error_info.error_reason() << ": " 364 << error_info.error_message() 365 << " (transient = " << error_info.is_transient() << ")"; 366 if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) { 367 ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED; 368 } else { 369 ticl_state_ = TRANSIENT_INVALIDATION_ERROR; 370 } 371 EmitStateChange(); 372 } 373 374 void SyncInvalidationListener::WriteState(const std::string& state) { 375 DCHECK(CalledOnValidThread()); 376 DVLOG(1) << "WriteState"; 377 invalidation_state_tracker_.Call( 378 FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state); 379 } 380 381 void SyncInvalidationListener::DoRegistrationUpdate() { 382 DCHECK(CalledOnValidThread()); 383 const ObjectIdSet& unregistered_ids = 384 registration_manager_->UpdateRegisteredIds(registered_ids_); 385 for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); 386 it != unregistered_ids.end(); ++it) { 387 invalidation_state_map_.erase(*it); 388 } 389 invalidation_state_tracker_.Call( 390 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); 391 ack_tracker_.Ack(unregistered_ids); 392 } 393 394 void SyncInvalidationListener::StopForTest() { 395 DCHECK(CalledOnValidThread()); 396 Stop(); 397 } 398 399 InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const { 400 DCHECK(CalledOnValidThread()); 401 return invalidation_state_map_; 402 } 403 404 AckTracker* SyncInvalidationListener::GetAckTrackerForTest() { 405 return &ack_tracker_; 406 } 407 408 void SyncInvalidationListener::Stop() { 409 DCHECK(CalledOnValidThread()); 410 if (!invalidation_client_) { 411 return; 412 } 413 414 ack_tracker_.Clear(); 415 416 registration_manager_.reset(); 417 sync_system_resources_.Stop(); 418 invalidation_client_->Stop(); 419 420 invalidation_client_.reset(); 421 delegate_ = NULL; 422 423 invalidation_state_tracker_.Reset(); 424 invalidation_state_map_.clear(); 425 ticl_state_ = DEFAULT_INVALIDATION_ERROR; 426 push_client_state_ = DEFAULT_INVALIDATION_ERROR; 427 } 428 429 InvalidatorState SyncInvalidationListener::GetState() const { 430 DCHECK(CalledOnValidThread()); 431 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED || 432 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) { 433 // If either the ticl or the push client rejected our credentials, 434 // return INVALIDATION_CREDENTIALS_REJECTED. 435 return INVALIDATION_CREDENTIALS_REJECTED; 436 } 437 if (ticl_state_ == INVALIDATIONS_ENABLED && 438 push_client_state_ == INVALIDATIONS_ENABLED) { 439 // If the ticl is ready and the push client notifications are 440 // enabled, return INVALIDATIONS_ENABLED. 441 return INVALIDATIONS_ENABLED; 442 } 443 // Otherwise, we have a transient error. 444 return TRANSIENT_INVALIDATION_ERROR; 445 } 446 447 void SyncInvalidationListener::EmitStateChange() { 448 DCHECK(CalledOnValidThread()); 449 delegate_->OnInvalidatorStateChange(GetState()); 450 } 451 452 void SyncInvalidationListener::OnNotificationsEnabled() { 453 DCHECK(CalledOnValidThread()); 454 push_client_state_ = INVALIDATIONS_ENABLED; 455 EmitStateChange(); 456 } 457 458 void SyncInvalidationListener::OnNotificationsDisabled( 459 notifier::NotificationsDisabledReason reason) { 460 DCHECK(CalledOnValidThread()); 461 push_client_state_ = FromNotifierReason(reason); 462 EmitStateChange(); 463 } 464 465 void SyncInvalidationListener::OnIncomingNotification( 466 const notifier::Notification& notification) { 467 DCHECK(CalledOnValidThread()); 468 // Do nothing, since this is already handled by |invalidation_client_|. 469 } 470 471 } // namespace syncer 472