1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "components/invalidation/sync_invalidation_listener.h" 6 7 #include <vector> 8 9 #include "base/bind.h" 10 #include "base/callback.h" 11 #include "base/compiler_specific.h" 12 #include "base/logging.h" 13 #include "base/tracked_objects.h" 14 #include "components/invalidation/invalidation_util.h" 15 #include "components/invalidation/object_id_invalidation_map.h" 16 #include "components/invalidation/registration_manager.h" 17 #include "google/cacheinvalidation/include/invalidation-client.h" 18 #include "google/cacheinvalidation/include/types.h" 19 #include "jingle/notifier/listener/push_client.h" 20 21 namespace { 22 23 const char kApplicationName[] = "chrome-sync"; 24 25 } // namespace 26 27 namespace syncer { 28 29 SyncInvalidationListener::Delegate::~Delegate() {} 30 31 SyncInvalidationListener::SyncInvalidationListener( 32 scoped_ptr<SyncNetworkChannel> network_channel) 33 : sync_network_channel_(network_channel.Pass()), 34 sync_system_resources_(sync_network_channel_.get(), this), 35 delegate_(NULL), 36 ticl_state_(DEFAULT_INVALIDATION_ERROR), 37 push_client_state_(DEFAULT_INVALIDATION_ERROR), 38 weak_ptr_factory_(this) { 39 DCHECK(CalledOnValidThread()); 40 sync_network_channel_->AddObserver(this); 41 } 42 43 SyncInvalidationListener::~SyncInvalidationListener() { 44 DCHECK(CalledOnValidThread()); 45 sync_network_channel_->RemoveObserver(this); 46 Stop(); 47 DCHECK(!delegate_); 48 } 49 50 void SyncInvalidationListener::Start( 51 const CreateInvalidationClientCallback& create_invalidation_client_callback, 52 const std::string& client_id, 53 const std::string& client_info, 54 const std::string& invalidation_bootstrap_data, 55 const UnackedInvalidationsMap& initial_unacked_invalidations, 56 const base::WeakPtr<InvalidationStateTracker>& invalidation_state_tracker, 57 const scoped_refptr<base::SequencedTaskRunner>& 58 invalidation_state_tracker_task_runner, 59 Delegate* delegate) { 60 DCHECK(CalledOnValidThread()); 61 Stop(); 62 63 sync_system_resources_.set_platform(client_info); 64 sync_system_resources_.Start(); 65 66 // The Storage resource is implemented as a write-through cache. We populate 67 // it with the initial state on startup, so subsequent writes go to disk and 68 // update the in-memory cache, while reads just return the cached state. 69 sync_system_resources_.storage()->SetInitialState( 70 invalidation_bootstrap_data); 71 72 unacked_invalidations_map_ = initial_unacked_invalidations; 73 invalidation_state_tracker_ = invalidation_state_tracker; 74 invalidation_state_tracker_task_runner_ = 75 invalidation_state_tracker_task_runner; 76 DCHECK(invalidation_state_tracker_task_runner_.get()); 77 78 DCHECK(!delegate_); 79 DCHECK(delegate); 80 delegate_ = delegate; 81 82 invalidation_client_.reset(create_invalidation_client_callback.Run( 83 &sync_system_resources_, 84 sync_network_channel_->GetInvalidationClientType(), 85 client_id, 86 kApplicationName, 87 this)); 88 invalidation_client_->Start(); 89 90 registration_manager_.reset( 91 new RegistrationManager(invalidation_client_.get())); 92 } 93 94 void SyncInvalidationListener::UpdateCredentials( 95 const std::string& email, const std::string& token) { 96 DCHECK(CalledOnValidThread()); 97 sync_network_channel_->UpdateCredentials(email, token); 98 } 99 100 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { 101 DCHECK(CalledOnValidThread()); 102 registered_ids_ = ids; 103 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a 104 // working XMPP connection (as observed by us), so check it instead 105 // of GetState() (see http://crbug.com/139424). 106 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) { 107 DoRegistrationUpdate(); 108 } 109 } 110 111 void SyncInvalidationListener::Ready( 112 invalidation::InvalidationClient* client) { 113 DCHECK(CalledOnValidThread()); 114 DCHECK_EQ(client, invalidation_client_.get()); 115 ticl_state_ = INVALIDATIONS_ENABLED; 116 EmitStateChange(); 117 DoRegistrationUpdate(); 118 } 119 120 void SyncInvalidationListener::Invalidate( 121 invalidation::InvalidationClient* client, 122 const invalidation::Invalidation& invalidation, 123 const invalidation::AckHandle& ack_handle) { 124 DCHECK(CalledOnValidThread()); 125 DCHECK_EQ(client, invalidation_client_.get()); 126 client->Acknowledge(ack_handle); 127 128 const invalidation::ObjectId& id = invalidation.object_id(); 129 130 std::string payload; 131 // payload() CHECK()'s has_payload(), so we must check it ourselves first. 132 if (invalidation.has_payload()) 133 payload = invalidation.payload(); 134 135 DVLOG(2) << "Received invalidation with version " << invalidation.version() 136 << " for " << ObjectIdToString(id); 137 138 ObjectIdInvalidationMap invalidations; 139 Invalidation inv = Invalidation::Init(id, invalidation.version(), payload); 140 inv.SetAckHandler(AsWeakPtr(), base::MessageLoopProxy::current()); 141 invalidations.Insert(inv); 142 143 DispatchInvalidations(invalidations); 144 } 145 146 void SyncInvalidationListener::InvalidateUnknownVersion( 147 invalidation::InvalidationClient* client, 148 const invalidation::ObjectId& object_id, 149 const invalidation::AckHandle& ack_handle) { 150 DCHECK(CalledOnValidThread()); 151 DCHECK_EQ(client, invalidation_client_.get()); 152 DVLOG(1) << "InvalidateUnknownVersion"; 153 client->Acknowledge(ack_handle); 154 155 ObjectIdInvalidationMap invalidations; 156 Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id); 157 unknown_version.SetAckHandler(AsWeakPtr(), base::MessageLoopProxy::current()); 158 invalidations.Insert(unknown_version); 159 160 DispatchInvalidations(invalidations); 161 } 162 163 // This should behave as if we got an invalidation with version 164 // UNKNOWN_OBJECT_VERSION for all known data types. 165 void SyncInvalidationListener::InvalidateAll( 166 invalidation::InvalidationClient* client, 167 const invalidation::AckHandle& ack_handle) { 168 DCHECK(CalledOnValidThread()); 169 DCHECK_EQ(client, invalidation_client_.get()); 170 DVLOG(1) << "InvalidateAll"; 171 client->Acknowledge(ack_handle); 172 173 ObjectIdInvalidationMap invalidations; 174 for (ObjectIdSet::iterator it = registered_ids_.begin(); 175 it != registered_ids_.end(); ++it) { 176 Invalidation unknown_version = Invalidation::InitUnknownVersion(*it); 177 unknown_version.SetAckHandler(AsWeakPtr(), 178 base::MessageLoopProxy::current()); 179 invalidations.Insert(unknown_version); 180 } 181 182 DispatchInvalidations(invalidations); 183 } 184 185 // If a handler is registered, emit right away. Otherwise, save it for later. 186 void SyncInvalidationListener::DispatchInvalidations( 187 const ObjectIdInvalidationMap& invalidations) { 188 DCHECK(CalledOnValidThread()); 189 190 ObjectIdInvalidationMap to_save = invalidations; 191 ObjectIdInvalidationMap to_emit = 192 invalidations.GetSubsetWithObjectIds(registered_ids_); 193 194 SaveInvalidations(to_save); 195 EmitSavedInvalidations(to_emit); 196 } 197 198 void SyncInvalidationListener::SaveInvalidations( 199 const ObjectIdInvalidationMap& to_save) { 200 ObjectIdSet objects_to_save = to_save.GetObjectIds(); 201 for (ObjectIdSet::const_iterator it = objects_to_save.begin(); 202 it != objects_to_save.end(); ++it) { 203 UnackedInvalidationsMap::iterator lookup = 204 unacked_invalidations_map_.find(*it); 205 if (lookup == unacked_invalidations_map_.end()) { 206 lookup = unacked_invalidations_map_.insert( 207 std::make_pair(*it, UnackedInvalidationSet(*it))).first; 208 } 209 lookup->second.AddSet(to_save.ForObject(*it)); 210 } 211 212 invalidation_state_tracker_task_runner_->PostTask( 213 FROM_HERE, 214 base::Bind(&InvalidationStateTracker::SetSavedInvalidations, 215 invalidation_state_tracker_, 216 unacked_invalidations_map_)); 217 } 218 219 void SyncInvalidationListener::EmitSavedInvalidations( 220 const ObjectIdInvalidationMap& to_emit) { 221 DVLOG(2) << "Emitting invalidations: " << to_emit.ToString(); 222 delegate_->OnInvalidate(to_emit); 223 } 224 225 void SyncInvalidationListener::InformRegistrationStatus( 226 invalidation::InvalidationClient* client, 227 const invalidation::ObjectId& object_id, 228 InvalidationListener::RegistrationState new_state) { 229 DCHECK(CalledOnValidThread()); 230 DCHECK_EQ(client, invalidation_client_.get()); 231 DVLOG(1) << "InformRegistrationStatus: " 232 << ObjectIdToString(object_id) << " " << new_state; 233 234 if (new_state != InvalidationListener::REGISTERED) { 235 // Let |registration_manager_| handle the registration backoff policy. 236 registration_manager_->MarkRegistrationLost(object_id); 237 } 238 } 239 240 void SyncInvalidationListener::InformRegistrationFailure( 241 invalidation::InvalidationClient* client, 242 const invalidation::ObjectId& object_id, 243 bool is_transient, 244 const std::string& error_message) { 245 DCHECK(CalledOnValidThread()); 246 DCHECK_EQ(client, invalidation_client_.get()); 247 DVLOG(1) << "InformRegistrationFailure: " 248 << ObjectIdToString(object_id) 249 << "is_transient=" << is_transient 250 << ", message=" << error_message; 251 252 if (is_transient) { 253 // We don't care about |unknown_hint|; we let 254 // |registration_manager_| handle the registration backoff policy. 255 registration_manager_->MarkRegistrationLost(object_id); 256 } else { 257 // Non-transient failures require an action to resolve. This could happen 258 // because: 259 // - the server doesn't yet recognize the data type, which could happen for 260 // brand-new data types. 261 // - the user has changed his password and hasn't updated it yet locally. 262 // Either way, block future registration attempts for |object_id|. However, 263 // we don't forget any saved invalidation state since we may use it once the 264 // error is addressed. 265 registration_manager_->DisableId(object_id); 266 } 267 } 268 269 void SyncInvalidationListener::ReissueRegistrations( 270 invalidation::InvalidationClient* client, 271 const std::string& prefix, 272 int prefix_length) { 273 DCHECK(CalledOnValidThread()); 274 DCHECK_EQ(client, invalidation_client_.get()); 275 DVLOG(1) << "AllRegistrationsLost"; 276 registration_manager_->MarkAllRegistrationsLost(); 277 } 278 279 void SyncInvalidationListener::InformError( 280 invalidation::InvalidationClient* client, 281 const invalidation::ErrorInfo& error_info) { 282 DCHECK(CalledOnValidThread()); 283 DCHECK_EQ(client, invalidation_client_.get()); 284 LOG(ERROR) << "Ticl error " << error_info.error_reason() << ": " 285 << error_info.error_message() 286 << " (transient = " << error_info.is_transient() << ")"; 287 if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) { 288 ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED; 289 } else { 290 ticl_state_ = TRANSIENT_INVALIDATION_ERROR; 291 } 292 EmitStateChange(); 293 } 294 295 void SyncInvalidationListener::Acknowledge( 296 const invalidation::ObjectId& id, 297 const syncer::AckHandle& handle) { 298 UnackedInvalidationsMap::iterator lookup = 299 unacked_invalidations_map_.find(id); 300 if (lookup == unacked_invalidations_map_.end()) { 301 DLOG(WARNING) << "Received acknowledgement for untracked object ID"; 302 return; 303 } 304 lookup->second.Acknowledge(handle); 305 invalidation_state_tracker_task_runner_->PostTask( 306 FROM_HERE, 307 base::Bind(&InvalidationStateTracker::SetSavedInvalidations, 308 invalidation_state_tracker_, 309 unacked_invalidations_map_)); 310 } 311 312 void SyncInvalidationListener::Drop( 313 const invalidation::ObjectId& id, 314 const syncer::AckHandle& handle) { 315 UnackedInvalidationsMap::iterator lookup = 316 unacked_invalidations_map_.find(id); 317 if (lookup == unacked_invalidations_map_.end()) { 318 DLOG(WARNING) << "Received drop for untracked object ID"; 319 return; 320 } 321 lookup->second.Drop(handle); 322 invalidation_state_tracker_task_runner_->PostTask( 323 FROM_HERE, 324 base::Bind(&InvalidationStateTracker::SetSavedInvalidations, 325 invalidation_state_tracker_, 326 unacked_invalidations_map_)); 327 } 328 329 void SyncInvalidationListener::WriteState(const std::string& state) { 330 DCHECK(CalledOnValidThread()); 331 DVLOG(1) << "WriteState"; 332 invalidation_state_tracker_task_runner_->PostTask( 333 FROM_HERE, 334 base::Bind(&InvalidationStateTracker::SetBootstrapData, 335 invalidation_state_tracker_, 336 state)); 337 } 338 339 void SyncInvalidationListener::DoRegistrationUpdate() { 340 DCHECK(CalledOnValidThread()); 341 const ObjectIdSet& unregistered_ids = 342 registration_manager_->UpdateRegisteredIds(registered_ids_); 343 for (ObjectIdSet::iterator it = unregistered_ids.begin(); 344 it != unregistered_ids.end(); ++it) { 345 unacked_invalidations_map_.erase(*it); 346 } 347 invalidation_state_tracker_task_runner_->PostTask( 348 FROM_HERE, 349 base::Bind(&InvalidationStateTracker::SetSavedInvalidations, 350 invalidation_state_tracker_, 351 unacked_invalidations_map_)); 352 353 ObjectIdInvalidationMap object_id_invalidation_map; 354 for (UnackedInvalidationsMap::iterator map_it = 355 unacked_invalidations_map_.begin(); 356 map_it != unacked_invalidations_map_.end(); ++map_it) { 357 if (registered_ids_.find(map_it->first) == registered_ids_.end()) { 358 continue; 359 } 360 map_it->second.ExportInvalidations(AsWeakPtr(), 361 base::MessageLoopProxy::current(), 362 &object_id_invalidation_map); 363 } 364 365 // There's no need to run these through DispatchInvalidations(); they've 366 // already been saved to storage (that's where we found them) so all we need 367 // to do now is emit them. 368 EmitSavedInvalidations(object_id_invalidation_map); 369 } 370 371 void SyncInvalidationListener::RequestDetailedStatus( 372 base::Callback<void(const base::DictionaryValue&)> callback) const { 373 DCHECK(CalledOnValidThread()); 374 sync_network_channel_->RequestDetailedStatus(callback); 375 callback.Run(*CollectDebugData()); 376 } 377 378 scoped_ptr<base::DictionaryValue> 379 SyncInvalidationListener::CollectDebugData() const { 380 scoped_ptr<base::DictionaryValue> return_value(new base::DictionaryValue()); 381 return_value->SetString( 382 "SyncInvalidationListener.PushClientState", 383 std::string(InvalidatorStateToString(push_client_state_))); 384 return_value->SetString("SyncInvalidationListener.TiclState", 385 std::string(InvalidatorStateToString(ticl_state_))); 386 scoped_ptr<base::DictionaryValue> unacked_map(new base::DictionaryValue()); 387 for (UnackedInvalidationsMap::const_iterator it = 388 unacked_invalidations_map_.begin(); 389 it != unacked_invalidations_map_.end(); 390 ++it) { 391 unacked_map->Set((it->first).name(), (it->second).ToValue().release()); 392 } 393 return_value->Set("SyncInvalidationListener.UnackedInvalidationsMap", 394 unacked_map.release()); 395 return return_value.Pass(); 396 } 397 398 void SyncInvalidationListener::StopForTest() { 399 DCHECK(CalledOnValidThread()); 400 Stop(); 401 } 402 403 void SyncInvalidationListener::Stop() { 404 DCHECK(CalledOnValidThread()); 405 if (!invalidation_client_) { 406 return; 407 } 408 409 registration_manager_.reset(); 410 sync_system_resources_.Stop(); 411 invalidation_client_->Stop(); 412 413 invalidation_client_.reset(); 414 delegate_ = NULL; 415 416 ticl_state_ = DEFAULT_INVALIDATION_ERROR; 417 push_client_state_ = DEFAULT_INVALIDATION_ERROR; 418 } 419 420 InvalidatorState SyncInvalidationListener::GetState() const { 421 DCHECK(CalledOnValidThread()); 422 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED || 423 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) { 424 // If either the ticl or the push client rejected our credentials, 425 // return INVALIDATION_CREDENTIALS_REJECTED. 426 return INVALIDATION_CREDENTIALS_REJECTED; 427 } 428 if (ticl_state_ == INVALIDATIONS_ENABLED && 429 push_client_state_ == INVALIDATIONS_ENABLED) { 430 // If the ticl is ready and the push client notifications are 431 // enabled, return INVALIDATIONS_ENABLED. 432 return INVALIDATIONS_ENABLED; 433 } 434 // Otherwise, we have a transient error. 435 return TRANSIENT_INVALIDATION_ERROR; 436 } 437 438 void SyncInvalidationListener::EmitStateChange() { 439 DCHECK(CalledOnValidThread()); 440 delegate_->OnInvalidatorStateChange(GetState()); 441 } 442 443 base::WeakPtr<AckHandler> SyncInvalidationListener::AsWeakPtr() { 444 DCHECK(CalledOnValidThread()); 445 base::WeakPtr<AckHandler> weak_ptr = weak_ptr_factory_.GetWeakPtr(); 446 weak_ptr.get(); // Binds the pointer to this thread. 447 return weak_ptr; 448 } 449 450 void SyncInvalidationListener::OnNetworkChannelStateChanged( 451 InvalidatorState invalidator_state) { 452 DCHECK(CalledOnValidThread()); 453 push_client_state_ = invalidator_state; 454 EmitStateChange(); 455 } 456 457 } // namespace syncer 458