Home | History | Annotate | Download | only in invalidation
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "components/invalidation/sync_invalidation_listener.h"
      6 
      7 #include <vector>
      8 
      9 #include "base/bind.h"
     10 #include "base/callback.h"
     11 #include "base/compiler_specific.h"
     12 #include "base/logging.h"
     13 #include "base/tracked_objects.h"
     14 #include "google/cacheinvalidation/include/invalidation-client.h"
     15 #include "google/cacheinvalidation/include/types.h"
     16 #include "jingle/notifier/listener/push_client.h"
     17 #include "sync/notifier/invalidation_util.h"
     18 #include "sync/notifier/object_id_invalidation_map.h"
     19 #include "sync/notifier/registration_manager.h"
     20 
     21 namespace {
     22 
     23 const char kApplicationName[] = "chrome-sync";
     24 
     25 }  // namespace
     26 
     27 namespace syncer {
     28 
     29 SyncInvalidationListener::Delegate::~Delegate() {}
     30 
     31 SyncInvalidationListener::SyncInvalidationListener(
     32     scoped_ptr<SyncNetworkChannel> network_channel)
     33     : sync_network_channel_(network_channel.Pass()),
     34       sync_system_resources_(sync_network_channel_.get(), this),
     35       delegate_(NULL),
     36       ticl_state_(DEFAULT_INVALIDATION_ERROR),
     37       push_client_state_(DEFAULT_INVALIDATION_ERROR),
     38       weak_ptr_factory_(this) {
     39   DCHECK(CalledOnValidThread());
     40   sync_network_channel_->AddObserver(this);
     41 }
     42 
     43 SyncInvalidationListener::~SyncInvalidationListener() {
     44   DCHECK(CalledOnValidThread());
     45   sync_network_channel_->RemoveObserver(this);
     46   Stop();
     47   DCHECK(!delegate_);
     48 }
     49 
     50 void SyncInvalidationListener::Start(
     51     const CreateInvalidationClientCallback&
     52         create_invalidation_client_callback,
     53     const std::string& client_id, const std::string& client_info,
     54     const std::string& invalidation_bootstrap_data,
     55     const UnackedInvalidationsMap& initial_unacked_invalidations,
     56     const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker,
     57     Delegate* delegate) {
     58   DCHECK(CalledOnValidThread());
     59   Stop();
     60 
     61   sync_system_resources_.set_platform(client_info);
     62   sync_system_resources_.Start();
     63 
     64   // The Storage resource is implemented as a write-through cache.  We populate
     65   // it with the initial state on startup, so subsequent writes go to disk and
     66   // update the in-memory cache, while reads just return the cached state.
     67   sync_system_resources_.storage()->SetInitialState(
     68       invalidation_bootstrap_data);
     69 
     70   unacked_invalidations_map_ = initial_unacked_invalidations;
     71   invalidation_state_tracker_ = invalidation_state_tracker;
     72   DCHECK(invalidation_state_tracker_.IsInitialized());
     73 
     74   DCHECK(!delegate_);
     75   DCHECK(delegate);
     76   delegate_ = delegate;
     77 
     78   invalidation_client_.reset(create_invalidation_client_callback.Run(
     79       &sync_system_resources_,
     80       sync_network_channel_->GetInvalidationClientType(),
     81       client_id,
     82       kApplicationName,
     83       this));
     84   invalidation_client_->Start();
     85 
     86   registration_manager_.reset(
     87       new RegistrationManager(invalidation_client_.get()));
     88 }
     89 
     90 void SyncInvalidationListener::UpdateCredentials(
     91     const std::string& email, const std::string& token) {
     92   DCHECK(CalledOnValidThread());
     93   sync_network_channel_->UpdateCredentials(email, token);
     94 }
     95 
     96 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
     97   DCHECK(CalledOnValidThread());
     98   registered_ids_ = ids;
     99   // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a
    100   // working XMPP connection (as observed by us), so check it instead
    101   // of GetState() (see http://crbug.com/139424).
    102   if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) {
    103     DoRegistrationUpdate();
    104   }
    105 }
    106 
    107 void SyncInvalidationListener::Ready(
    108     invalidation::InvalidationClient* client) {
    109   DCHECK(CalledOnValidThread());
    110   DCHECK_EQ(client, invalidation_client_.get());
    111   ticl_state_ = INVALIDATIONS_ENABLED;
    112   EmitStateChange();
    113   DoRegistrationUpdate();
    114 }
    115 
    116 void SyncInvalidationListener::Invalidate(
    117     invalidation::InvalidationClient* client,
    118     const invalidation::Invalidation& invalidation,
    119     const invalidation::AckHandle& ack_handle) {
    120   DCHECK(CalledOnValidThread());
    121   DCHECK_EQ(client, invalidation_client_.get());
    122   client->Acknowledge(ack_handle);
    123 
    124   const invalidation::ObjectId& id = invalidation.object_id();
    125 
    126   std::string payload;
    127   // payload() CHECK()'s has_payload(), so we must check it ourselves first.
    128   if (invalidation.has_payload())
    129     payload = invalidation.payload();
    130 
    131   DVLOG(2) << "Received invalidation with version " << invalidation.version()
    132            << " for " << ObjectIdToString(id);
    133 
    134   ObjectIdInvalidationMap invalidations;
    135   Invalidation inv = Invalidation::Init(id, invalidation.version(), payload);
    136   inv.set_ack_handler(GetThisAsAckHandler());
    137   invalidations.Insert(inv);
    138 
    139   DispatchInvalidations(invalidations);
    140 }
    141 
    142 void SyncInvalidationListener::InvalidateUnknownVersion(
    143     invalidation::InvalidationClient* client,
    144     const invalidation::ObjectId& object_id,
    145     const invalidation::AckHandle& ack_handle) {
    146   DCHECK(CalledOnValidThread());
    147   DCHECK_EQ(client, invalidation_client_.get());
    148   DVLOG(1) << "InvalidateUnknownVersion";
    149   client->Acknowledge(ack_handle);
    150 
    151   ObjectIdInvalidationMap invalidations;
    152   Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id);
    153   unknown_version.set_ack_handler(GetThisAsAckHandler());
    154   invalidations.Insert(unknown_version);
    155 
    156   DispatchInvalidations(invalidations);
    157 }
    158 
    159 // This should behave as if we got an invalidation with version
    160 // UNKNOWN_OBJECT_VERSION for all known data types.
    161 void SyncInvalidationListener::InvalidateAll(
    162     invalidation::InvalidationClient* client,
    163     const invalidation::AckHandle& ack_handle) {
    164   DCHECK(CalledOnValidThread());
    165   DCHECK_EQ(client, invalidation_client_.get());
    166   DVLOG(1) << "InvalidateAll";
    167   client->Acknowledge(ack_handle);
    168 
    169   ObjectIdInvalidationMap invalidations;
    170   for (ObjectIdSet::iterator it = registered_ids_.begin();
    171        it != registered_ids_.end(); ++it) {
    172     Invalidation unknown_version = Invalidation::InitUnknownVersion(*it);
    173     unknown_version.set_ack_handler(GetThisAsAckHandler());
    174     invalidations.Insert(unknown_version);
    175   }
    176 
    177   DispatchInvalidations(invalidations);
    178 }
    179 
    180 // If a handler is registered, emit right away.  Otherwise, save it for later.
    181 void SyncInvalidationListener::DispatchInvalidations(
    182     const ObjectIdInvalidationMap& invalidations) {
    183   DCHECK(CalledOnValidThread());
    184 
    185   ObjectIdInvalidationMap to_save = invalidations;
    186   ObjectIdInvalidationMap to_emit =
    187       invalidations.GetSubsetWithObjectIds(registered_ids_);
    188 
    189   SaveInvalidations(to_save);
    190   EmitSavedInvalidations(to_emit);
    191 }
    192 
    193 void SyncInvalidationListener::SaveInvalidations(
    194     const ObjectIdInvalidationMap& to_save) {
    195   ObjectIdSet objects_to_save = to_save.GetObjectIds();
    196   for (ObjectIdSet::const_iterator it = objects_to_save.begin();
    197        it != objects_to_save.end(); ++it) {
    198     UnackedInvalidationsMap::iterator lookup =
    199         unacked_invalidations_map_.find(*it);
    200     if (lookup == unacked_invalidations_map_.end()) {
    201       lookup = unacked_invalidations_map_.insert(
    202           std::make_pair(*it, UnackedInvalidationSet(*it))).first;
    203     }
    204     lookup->second.AddSet(to_save.ForObject(*it));
    205   }
    206 
    207   invalidation_state_tracker_.Call(
    208       FROM_HERE,
    209       &InvalidationStateTracker::SetSavedInvalidations,
    210       unacked_invalidations_map_);
    211 }
    212 
    213 void SyncInvalidationListener::EmitSavedInvalidations(
    214     const ObjectIdInvalidationMap& to_emit) {
    215   DVLOG(2) << "Emitting invalidations: " << to_emit.ToString();
    216   delegate_->OnInvalidate(to_emit);
    217 }
    218 
    219 void SyncInvalidationListener::InformRegistrationStatus(
    220       invalidation::InvalidationClient* client,
    221       const invalidation::ObjectId& object_id,
    222       InvalidationListener::RegistrationState new_state) {
    223   DCHECK(CalledOnValidThread());
    224   DCHECK_EQ(client, invalidation_client_.get());
    225   DVLOG(1) << "InformRegistrationStatus: "
    226            << ObjectIdToString(object_id) << " " << new_state;
    227 
    228   if (new_state != InvalidationListener::REGISTERED) {
    229     // Let |registration_manager_| handle the registration backoff policy.
    230     registration_manager_->MarkRegistrationLost(object_id);
    231   }
    232 }
    233 
    234 void SyncInvalidationListener::InformRegistrationFailure(
    235     invalidation::InvalidationClient* client,
    236     const invalidation::ObjectId& object_id,
    237     bool is_transient,
    238     const std::string& error_message) {
    239   DCHECK(CalledOnValidThread());
    240   DCHECK_EQ(client, invalidation_client_.get());
    241   DVLOG(1) << "InformRegistrationFailure: "
    242            << ObjectIdToString(object_id)
    243            << "is_transient=" << is_transient
    244            << ", message=" << error_message;
    245 
    246   if (is_transient) {
    247     // We don't care about |unknown_hint|; we let
    248     // |registration_manager_| handle the registration backoff policy.
    249     registration_manager_->MarkRegistrationLost(object_id);
    250   } else {
    251     // Non-transient failures require an action to resolve. This could happen
    252     // because:
    253     // - the server doesn't yet recognize the data type, which could happen for
    254     //   brand-new data types.
    255     // - the user has changed his password and hasn't updated it yet locally.
    256     // Either way, block future registration attempts for |object_id|. However,
    257     // we don't forget any saved invalidation state since we may use it once the
    258     // error is addressed.
    259     registration_manager_->DisableId(object_id);
    260   }
    261 }
    262 
    263 void SyncInvalidationListener::ReissueRegistrations(
    264     invalidation::InvalidationClient* client,
    265     const std::string& prefix,
    266     int prefix_length) {
    267   DCHECK(CalledOnValidThread());
    268   DCHECK_EQ(client, invalidation_client_.get());
    269   DVLOG(1) << "AllRegistrationsLost";
    270   registration_manager_->MarkAllRegistrationsLost();
    271 }
    272 
    273 void SyncInvalidationListener::InformError(
    274     invalidation::InvalidationClient* client,
    275     const invalidation::ErrorInfo& error_info) {
    276   DCHECK(CalledOnValidThread());
    277   DCHECK_EQ(client, invalidation_client_.get());
    278   LOG(ERROR) << "Ticl error " << error_info.error_reason() << ": "
    279              << error_info.error_message()
    280              << " (transient = " << error_info.is_transient() << ")";
    281   if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) {
    282     ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED;
    283   } else {
    284     ticl_state_ = TRANSIENT_INVALIDATION_ERROR;
    285   }
    286   EmitStateChange();
    287 }
    288 
    289 void SyncInvalidationListener::Acknowledge(
    290   const invalidation::ObjectId& id,
    291   const syncer::AckHandle& handle) {
    292   UnackedInvalidationsMap::iterator lookup =
    293       unacked_invalidations_map_.find(id);
    294   if (lookup == unacked_invalidations_map_.end()) {
    295     DLOG(WARNING) << "Received acknowledgement for untracked object ID";
    296     return;
    297   }
    298   lookup->second.Acknowledge(handle);
    299   invalidation_state_tracker_.Call(
    300       FROM_HERE,
    301       &InvalidationStateTracker::SetSavedInvalidations,
    302       unacked_invalidations_map_);
    303 }
    304 
    305 void SyncInvalidationListener::Drop(
    306     const invalidation::ObjectId& id,
    307     const syncer::AckHandle& handle) {
    308   UnackedInvalidationsMap::iterator lookup =
    309       unacked_invalidations_map_.find(id);
    310   if (lookup == unacked_invalidations_map_.end()) {
    311     DLOG(WARNING) << "Received drop for untracked object ID";
    312     return;
    313   }
    314   lookup->second.Drop(handle);
    315   invalidation_state_tracker_.Call(
    316       FROM_HERE,
    317       &InvalidationStateTracker::SetSavedInvalidations,
    318       unacked_invalidations_map_);
    319 }
    320 
    321 void SyncInvalidationListener::WriteState(const std::string& state) {
    322   DCHECK(CalledOnValidThread());
    323   DVLOG(1) << "WriteState";
    324   invalidation_state_tracker_.Call(
    325       FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state);
    326 }
    327 
    328 void SyncInvalidationListener::DoRegistrationUpdate() {
    329   DCHECK(CalledOnValidThread());
    330   const ObjectIdSet& unregistered_ids =
    331       registration_manager_->UpdateRegisteredIds(registered_ids_);
    332   for (ObjectIdSet::iterator it = unregistered_ids.begin();
    333        it != unregistered_ids.end(); ++it) {
    334     unacked_invalidations_map_.erase(*it);
    335   }
    336   invalidation_state_tracker_.Call(
    337       FROM_HERE,
    338       &InvalidationStateTracker::SetSavedInvalidations,
    339       unacked_invalidations_map_);
    340 
    341   ObjectIdInvalidationMap object_id_invalidation_map;
    342   for (UnackedInvalidationsMap::iterator map_it =
    343        unacked_invalidations_map_.begin();
    344        map_it != unacked_invalidations_map_.end(); ++map_it) {
    345     if (registered_ids_.find(map_it->first) == registered_ids_.end()) {
    346       continue;
    347     }
    348     map_it->second.ExportInvalidations(
    349         GetThisAsAckHandler(),
    350         &object_id_invalidation_map);
    351   }
    352 
    353   // There's no need to run these through DispatchInvalidations(); they've
    354   // already been saved to storage (that's where we found them) so all we need
    355   // to do now is emit them.
    356   EmitSavedInvalidations(object_id_invalidation_map);
    357 }
    358 
    359 void SyncInvalidationListener::RequestDetailedStatus(
    360     base::Callback<void(const base::DictionaryValue&)> callback) const {
    361   DCHECK(CalledOnValidThread());
    362   sync_network_channel_->RequestDetailedStatus(callback);
    363   callback.Run(*CollectDebugData());
    364 }
    365 
    366 scoped_ptr<base::DictionaryValue>
    367 SyncInvalidationListener::CollectDebugData() const {
    368   scoped_ptr<base::DictionaryValue> return_value(new base::DictionaryValue());
    369   return_value->SetString(
    370       "SyncInvalidationListener.PushClientState",
    371       std::string(InvalidatorStateToString(push_client_state_)));
    372   return_value->SetString("SyncInvalidationListener.TiclState",
    373                           std::string(InvalidatorStateToString(ticl_state_)));
    374   scoped_ptr<base::DictionaryValue> unacked_map(new base::DictionaryValue());
    375   for (UnackedInvalidationsMap::const_iterator it =
    376            unacked_invalidations_map_.begin();
    377        it != unacked_invalidations_map_.end();
    378        ++it) {
    379     unacked_map->Set((it->first).name(), (it->second).ToValue().release());
    380   }
    381   return_value->Set("SyncInvalidationListener.UnackedInvalidationsMap",
    382                     unacked_map.release());
    383   return return_value.Pass();
    384 }
    385 
    386 void SyncInvalidationListener::StopForTest() {
    387   DCHECK(CalledOnValidThread());
    388   Stop();
    389 }
    390 
    391 void SyncInvalidationListener::Stop() {
    392   DCHECK(CalledOnValidThread());
    393   if (!invalidation_client_) {
    394     return;
    395   }
    396 
    397   registration_manager_.reset();
    398   sync_system_resources_.Stop();
    399   invalidation_client_->Stop();
    400 
    401   invalidation_client_.reset();
    402   delegate_ = NULL;
    403 
    404   ticl_state_ = DEFAULT_INVALIDATION_ERROR;
    405   push_client_state_ = DEFAULT_INVALIDATION_ERROR;
    406 }
    407 
    408 InvalidatorState SyncInvalidationListener::GetState() const {
    409   DCHECK(CalledOnValidThread());
    410   if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED ||
    411       push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) {
    412     // If either the ticl or the push client rejected our credentials,
    413     // return INVALIDATION_CREDENTIALS_REJECTED.
    414     return INVALIDATION_CREDENTIALS_REJECTED;
    415   }
    416   if (ticl_state_ == INVALIDATIONS_ENABLED &&
    417       push_client_state_ == INVALIDATIONS_ENABLED) {
    418     // If the ticl is ready and the push client notifications are
    419     // enabled, return INVALIDATIONS_ENABLED.
    420     return INVALIDATIONS_ENABLED;
    421   }
    422   // Otherwise, we have a transient error.
    423   return TRANSIENT_INVALIDATION_ERROR;
    424 }
    425 
    426 void SyncInvalidationListener::EmitStateChange() {
    427   DCHECK(CalledOnValidThread());
    428   delegate_->OnInvalidatorStateChange(GetState());
    429 }
    430 
    431 WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() {
    432   DCHECK(CalledOnValidThread());
    433   return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr());
    434 }
    435 
    436 void SyncInvalidationListener::OnNetworkChannelStateChanged(
    437     InvalidatorState invalidator_state) {
    438   DCHECK(CalledOnValidThread());
    439   push_client_state_ = invalidator_state;
    440   EmitStateChange();
    441 }
    442 
    443 }  // namespace syncer
    444