Home | History | Annotate | Download | only in notifier
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "sync/notifier/sync_invalidation_listener.h"
      6 
      7 #include <vector>
      8 
      9 #include "base/bind.h"
     10 #include "base/callback.h"
     11 #include "base/compiler_specific.h"
     12 #include "base/logging.h"
     13 #include "base/tracked_objects.h"
     14 #include "google/cacheinvalidation/include/invalidation-client.h"
     15 #include "google/cacheinvalidation/include/types.h"
     16 #include "google/cacheinvalidation/types.pb.h"
     17 #include "jingle/notifier/listener/push_client.h"
     18 #include "sync/notifier/invalidation_util.h"
     19 #include "sync/notifier/object_id_invalidation_map.h"
     20 #include "sync/notifier/registration_manager.h"
     21 
     22 namespace {
     23 
     24 const char kApplicationName[] = "chrome-sync";
     25 
     26 }  // namespace
     27 
     28 namespace syncer {
     29 
     30 SyncInvalidationListener::Delegate::~Delegate() {}
     31 
     32 SyncInvalidationListener::SyncInvalidationListener(
     33     scoped_ptr<notifier::PushClient> push_client)
     34     : push_client_channel_(push_client.Pass()),
     35       sync_system_resources_(&push_client_channel_, this),
     36       delegate_(NULL),
     37       ticl_state_(DEFAULT_INVALIDATION_ERROR),
     38       push_client_state_(DEFAULT_INVALIDATION_ERROR),
     39       weak_ptr_factory_(this) {
     40   DCHECK(CalledOnValidThread());
     41   push_client_channel_.AddObserver(this);
     42 }
     43 
     44 SyncInvalidationListener::~SyncInvalidationListener() {
     45   DCHECK(CalledOnValidThread());
     46   push_client_channel_.RemoveObserver(this);
     47   Stop();
     48   DCHECK(!delegate_);
     49 }
     50 
     51 void SyncInvalidationListener::Start(
     52     const CreateInvalidationClientCallback&
     53         create_invalidation_client_callback,
     54     const std::string& client_id, const std::string& client_info,
     55     const std::string& invalidation_bootstrap_data,
     56     const UnackedInvalidationsMap& initial_unacked_invalidations,
     57     const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker,
     58     Delegate* delegate) {
     59   DCHECK(CalledOnValidThread());
     60   Stop();
     61 
     62   sync_system_resources_.set_platform(client_info);
     63   sync_system_resources_.Start();
     64 
     65   // The Storage resource is implemented as a write-through cache.  We populate
     66   // it with the initial state on startup, so subsequent writes go to disk and
     67   // update the in-memory cache, while reads just return the cached state.
     68   sync_system_resources_.storage()->SetInitialState(
     69       invalidation_bootstrap_data);
     70 
     71   unacked_invalidations_map_ = initial_unacked_invalidations;
     72   invalidation_state_tracker_ = invalidation_state_tracker;
     73   DCHECK(invalidation_state_tracker_.IsInitialized());
     74 
     75   DCHECK(!delegate_);
     76   DCHECK(delegate);
     77   delegate_ = delegate;
     78 
     79 #if defined(OS_IOS)
     80   int client_type = ipc::invalidation::ClientType::CHROME_SYNC_IOS;
     81 #else
     82   int client_type = ipc::invalidation::ClientType::CHROME_SYNC;
     83 #endif
     84   invalidation_client_.reset(
     85       create_invalidation_client_callback.Run(
     86           &sync_system_resources_, client_type, client_id,
     87           kApplicationName, this));
     88   invalidation_client_->Start();
     89 
     90   registration_manager_.reset(
     91       new RegistrationManager(invalidation_client_.get()));
     92 }
     93 
     94 void SyncInvalidationListener::UpdateCredentials(
     95     const std::string& email, const std::string& token) {
     96   DCHECK(CalledOnValidThread());
     97   push_client_channel_.UpdateCredentials(email, token);
     98 }
     99 
    100 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
    101   DCHECK(CalledOnValidThread());
    102   registered_ids_ = ids;
    103   // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a
    104   // working XMPP connection (as observed by us), so check it instead
    105   // of GetState() (see http://crbug.com/139424).
    106   if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) {
    107     DoRegistrationUpdate();
    108   }
    109 }
    110 
    111 void SyncInvalidationListener::Ready(
    112     invalidation::InvalidationClient* client) {
    113   DCHECK(CalledOnValidThread());
    114   DCHECK_EQ(client, invalidation_client_.get());
    115   ticl_state_ = INVALIDATIONS_ENABLED;
    116   EmitStateChange();
    117   DoRegistrationUpdate();
    118 }
    119 
    120 void SyncInvalidationListener::Invalidate(
    121     invalidation::InvalidationClient* client,
    122     const invalidation::Invalidation& invalidation,
    123     const invalidation::AckHandle& ack_handle) {
    124   DCHECK(CalledOnValidThread());
    125   DCHECK_EQ(client, invalidation_client_.get());
    126   client->Acknowledge(ack_handle);
    127 
    128   const invalidation::ObjectId& id = invalidation.object_id();
    129 
    130   std::string payload;
    131   // payload() CHECK()'s has_payload(), so we must check it ourselves first.
    132   if (invalidation.has_payload())
    133     payload = invalidation.payload();
    134 
    135   DVLOG(2) << "Received invalidation with version " << invalidation.version()
    136            << " for " << ObjectIdToString(id);
    137 
    138   ObjectIdInvalidationMap invalidations;
    139   Invalidation inv = Invalidation::Init(id, invalidation.version(), payload);
    140   inv.set_ack_handler(GetThisAsAckHandler());
    141   invalidations.Insert(inv);
    142 
    143   DispatchInvalidations(invalidations);
    144 }
    145 
    146 void SyncInvalidationListener::InvalidateUnknownVersion(
    147     invalidation::InvalidationClient* client,
    148     const invalidation::ObjectId& object_id,
    149     const invalidation::AckHandle& ack_handle) {
    150   DCHECK(CalledOnValidThread());
    151   DCHECK_EQ(client, invalidation_client_.get());
    152   DVLOG(1) << "InvalidateUnknownVersion";
    153   client->Acknowledge(ack_handle);
    154 
    155   ObjectIdInvalidationMap invalidations;
    156   Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id);
    157   unknown_version.set_ack_handler(GetThisAsAckHandler());
    158   invalidations.Insert(unknown_version);
    159 
    160   DispatchInvalidations(invalidations);
    161 }
    162 
    163 // This should behave as if we got an invalidation with version
    164 // UNKNOWN_OBJECT_VERSION for all known data types.
    165 void SyncInvalidationListener::InvalidateAll(
    166     invalidation::InvalidationClient* client,
    167     const invalidation::AckHandle& ack_handle) {
    168   DCHECK(CalledOnValidThread());
    169   DCHECK_EQ(client, invalidation_client_.get());
    170   DVLOG(1) << "InvalidateAll";
    171   client->Acknowledge(ack_handle);
    172 
    173   ObjectIdInvalidationMap invalidations;
    174   for (ObjectIdSet::iterator it = registered_ids_.begin();
    175        it != registered_ids_.end(); ++it) {
    176     Invalidation unknown_version = Invalidation::InitUnknownVersion(*it);
    177     unknown_version.set_ack_handler(GetThisAsAckHandler());
    178     invalidations.Insert(unknown_version);
    179   }
    180 
    181   DispatchInvalidations(invalidations);
    182 }
    183 
    184 // If a handler is registered, emit right away.  Otherwise, save it for later.
    185 void SyncInvalidationListener::DispatchInvalidations(
    186     const ObjectIdInvalidationMap& invalidations) {
    187   DCHECK(CalledOnValidThread());
    188 
    189   ObjectIdInvalidationMap to_save = invalidations;
    190   ObjectIdInvalidationMap to_emit =
    191       invalidations.GetSubsetWithObjectIds(registered_ids_);
    192 
    193   SaveInvalidations(to_save);
    194   EmitSavedInvalidations(to_emit);
    195 }
    196 
    197 void SyncInvalidationListener::SaveInvalidations(
    198     const ObjectIdInvalidationMap& to_save) {
    199   ObjectIdSet objects_to_save = to_save.GetObjectIds();
    200   for (ObjectIdSet::const_iterator it = objects_to_save.begin();
    201        it != objects_to_save.end(); ++it) {
    202     UnackedInvalidationsMap::iterator lookup =
    203         unacked_invalidations_map_.find(*it);
    204     if (lookup == unacked_invalidations_map_.end()) {
    205       lookup = unacked_invalidations_map_.insert(
    206           std::make_pair(*it, UnackedInvalidationSet(*it))).first;
    207     }
    208     lookup->second.AddSet(to_save.ForObject(*it));
    209   }
    210 
    211   invalidation_state_tracker_.Call(
    212       FROM_HERE,
    213       &InvalidationStateTracker::SetSavedInvalidations,
    214       unacked_invalidations_map_);
    215 }
    216 
    217 void SyncInvalidationListener::EmitSavedInvalidations(
    218     const ObjectIdInvalidationMap& to_emit) {
    219   DVLOG(2) << "Emitting invalidations: " << to_emit.ToString();
    220   delegate_->OnInvalidate(to_emit);
    221 }
    222 
    223 void SyncInvalidationListener::InformRegistrationStatus(
    224       invalidation::InvalidationClient* client,
    225       const invalidation::ObjectId& object_id,
    226       InvalidationListener::RegistrationState new_state) {
    227   DCHECK(CalledOnValidThread());
    228   DCHECK_EQ(client, invalidation_client_.get());
    229   DVLOG(1) << "InformRegistrationStatus: "
    230            << ObjectIdToString(object_id) << " " << new_state;
    231 
    232   if (new_state != InvalidationListener::REGISTERED) {
    233     // Let |registration_manager_| handle the registration backoff policy.
    234     registration_manager_->MarkRegistrationLost(object_id);
    235   }
    236 }
    237 
    238 void SyncInvalidationListener::InformRegistrationFailure(
    239     invalidation::InvalidationClient* client,
    240     const invalidation::ObjectId& object_id,
    241     bool is_transient,
    242     const std::string& error_message) {
    243   DCHECK(CalledOnValidThread());
    244   DCHECK_EQ(client, invalidation_client_.get());
    245   DVLOG(1) << "InformRegistrationFailure: "
    246            << ObjectIdToString(object_id)
    247            << "is_transient=" << is_transient
    248            << ", message=" << error_message;
    249 
    250   if (is_transient) {
    251     // We don't care about |unknown_hint|; we let
    252     // |registration_manager_| handle the registration backoff policy.
    253     registration_manager_->MarkRegistrationLost(object_id);
    254   } else {
    255     // Non-transient failures require an action to resolve. This could happen
    256     // because:
    257     // - the server doesn't yet recognize the data type, which could happen for
    258     //   brand-new data types.
    259     // - the user has changed his password and hasn't updated it yet locally.
    260     // Either way, block future registration attempts for |object_id|. However,
    261     // we don't forget any saved invalidation state since we may use it once the
    262     // error is addressed.
    263     registration_manager_->DisableId(object_id);
    264   }
    265 }
    266 
    267 void SyncInvalidationListener::ReissueRegistrations(
    268     invalidation::InvalidationClient* client,
    269     const std::string& prefix,
    270     int prefix_length) {
    271   DCHECK(CalledOnValidThread());
    272   DCHECK_EQ(client, invalidation_client_.get());
    273   DVLOG(1) << "AllRegistrationsLost";
    274   registration_manager_->MarkAllRegistrationsLost();
    275 }
    276 
    277 void SyncInvalidationListener::InformError(
    278     invalidation::InvalidationClient* client,
    279     const invalidation::ErrorInfo& error_info) {
    280   DCHECK(CalledOnValidThread());
    281   DCHECK_EQ(client, invalidation_client_.get());
    282   LOG(ERROR) << "Ticl error " << error_info.error_reason() << ": "
    283              << error_info.error_message()
    284              << " (transient = " << error_info.is_transient() << ")";
    285   if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) {
    286     ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED;
    287   } else {
    288     ticl_state_ = TRANSIENT_INVALIDATION_ERROR;
    289   }
    290   EmitStateChange();
    291 }
    292 
    293 void SyncInvalidationListener::Acknowledge(
    294   const invalidation::ObjectId& id,
    295   const syncer::AckHandle& handle) {
    296   UnackedInvalidationsMap::iterator lookup =
    297       unacked_invalidations_map_.find(id);
    298   if (lookup == unacked_invalidations_map_.end()) {
    299     DLOG(WARNING) << "Received acknowledgement for untracked object ID";
    300     return;
    301   }
    302   lookup->second.Acknowledge(handle);
    303   invalidation_state_tracker_.Call(
    304       FROM_HERE,
    305       &InvalidationStateTracker::SetSavedInvalidations,
    306       unacked_invalidations_map_);
    307 }
    308 
    309 void SyncInvalidationListener::Drop(
    310     const invalidation::ObjectId& id,
    311     const syncer::AckHandle& handle) {
    312   UnackedInvalidationsMap::iterator lookup =
    313       unacked_invalidations_map_.find(id);
    314   if (lookup == unacked_invalidations_map_.end()) {
    315     DLOG(WARNING) << "Received drop for untracked object ID";
    316     return;
    317   }
    318   lookup->second.Drop(handle);
    319   invalidation_state_tracker_.Call(
    320       FROM_HERE,
    321       &InvalidationStateTracker::SetSavedInvalidations,
    322       unacked_invalidations_map_);
    323 }
    324 
    325 void SyncInvalidationListener::WriteState(const std::string& state) {
    326   DCHECK(CalledOnValidThread());
    327   DVLOG(1) << "WriteState";
    328   invalidation_state_tracker_.Call(
    329       FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state);
    330 }
    331 
    332 void SyncInvalidationListener::DoRegistrationUpdate() {
    333   DCHECK(CalledOnValidThread());
    334   const ObjectIdSet& unregistered_ids =
    335       registration_manager_->UpdateRegisteredIds(registered_ids_);
    336   for (ObjectIdSet::iterator it = unregistered_ids.begin();
    337        it != unregistered_ids.end(); ++it) {
    338     unacked_invalidations_map_.erase(*it);
    339   }
    340   invalidation_state_tracker_.Call(
    341       FROM_HERE,
    342       &InvalidationStateTracker::SetSavedInvalidations,
    343       unacked_invalidations_map_);
    344 
    345   ObjectIdInvalidationMap object_id_invalidation_map;
    346   for (UnackedInvalidationsMap::iterator map_it =
    347        unacked_invalidations_map_.begin();
    348        map_it != unacked_invalidations_map_.end(); ++map_it) {
    349     if (registered_ids_.find(map_it->first) == registered_ids_.end()) {
    350       continue;
    351     }
    352     map_it->second.ExportInvalidations(
    353         GetThisAsAckHandler(),
    354         &object_id_invalidation_map);
    355   }
    356 
    357   // There's no need to run these through DispatchInvalidations(); they've
    358   // already been saved to storage (that's where we found them) so all we need
    359   // to do now is emit them.
    360   EmitSavedInvalidations(object_id_invalidation_map);
    361 }
    362 
    363 void SyncInvalidationListener::StopForTest() {
    364   DCHECK(CalledOnValidThread());
    365   Stop();
    366 }
    367 
    368 void SyncInvalidationListener::Stop() {
    369   DCHECK(CalledOnValidThread());
    370   if (!invalidation_client_) {
    371     return;
    372   }
    373 
    374   registration_manager_.reset();
    375   sync_system_resources_.Stop();
    376   invalidation_client_->Stop();
    377 
    378   invalidation_client_.reset();
    379   delegate_ = NULL;
    380 
    381   ticl_state_ = DEFAULT_INVALIDATION_ERROR;
    382   push_client_state_ = DEFAULT_INVALIDATION_ERROR;
    383 }
    384 
    385 InvalidatorState SyncInvalidationListener::GetState() const {
    386   DCHECK(CalledOnValidThread());
    387   if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED ||
    388       push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) {
    389     // If either the ticl or the push client rejected our credentials,
    390     // return INVALIDATION_CREDENTIALS_REJECTED.
    391     return INVALIDATION_CREDENTIALS_REJECTED;
    392   }
    393   if (ticl_state_ == INVALIDATIONS_ENABLED &&
    394       push_client_state_ == INVALIDATIONS_ENABLED) {
    395     // If the ticl is ready and the push client notifications are
    396     // enabled, return INVALIDATIONS_ENABLED.
    397     return INVALIDATIONS_ENABLED;
    398   }
    399   // Otherwise, we have a transient error.
    400   return TRANSIENT_INVALIDATION_ERROR;
    401 }
    402 
    403 void SyncInvalidationListener::EmitStateChange() {
    404   DCHECK(CalledOnValidThread());
    405   delegate_->OnInvalidatorStateChange(GetState());
    406 }
    407 
    408 WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() {
    409   DCHECK(CalledOnValidThread());
    410   return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr());
    411 }
    412 
    413 void SyncInvalidationListener::OnNetworkChannelStateChanged(
    414     InvalidatorState invalidator_state) {
    415   DCHECK(CalledOnValidThread());
    416   push_client_state_ = invalidator_state;
    417   EmitStateChange();
    418 }
    419 
    420 }  // namespace syncer
    421