Home | History | Annotate | Download | only in notifier
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "sync/notifier/sync_invalidation_listener.h"
      6 
      7 #include <vector>
      8 
      9 #include "base/bind.h"
     10 #include "base/callback.h"
     11 #include "base/compiler_specific.h"
     12 #include "base/logging.h"
     13 #include "base/tracked_objects.h"
     14 #include "google/cacheinvalidation/include/invalidation-client.h"
     15 #include "google/cacheinvalidation/include/types.h"
     16 #include "google/cacheinvalidation/types.pb.h"
     17 #include "jingle/notifier/listener/push_client.h"
     18 #include "sync/notifier/invalidation_util.h"
     19 #include "sync/notifier/registration_manager.h"
     20 
     21 namespace {
     22 
     23 const char kApplicationName[] = "chrome-sync";
     24 
     25 }  // namespace
     26 
     27 namespace syncer {
     28 
     29 SyncInvalidationListener::Delegate::~Delegate() {}
     30 
     31 SyncInvalidationListener::SyncInvalidationListener(
     32     base::TickClock* tick_clock,
     33     scoped_ptr<notifier::PushClient> push_client)
     34     : weak_ptr_factory_(this),
     35       ack_tracker_(tick_clock, this),
     36       push_client_(push_client.get()),
     37       sync_system_resources_(push_client.Pass(), this),
     38       delegate_(NULL),
     39       ticl_state_(DEFAULT_INVALIDATION_ERROR),
     40       push_client_state_(DEFAULT_INVALIDATION_ERROR) {
     41   DCHECK(CalledOnValidThread());
     42   push_client_->AddObserver(this);
     43 }
     44 
     45 SyncInvalidationListener::~SyncInvalidationListener() {
     46   DCHECK(CalledOnValidThread());
     47   push_client_->RemoveObserver(this);
     48   Stop();
     49   DCHECK(!delegate_);
     50 }
     51 
     52 void SyncInvalidationListener::Start(
     53     const CreateInvalidationClientCallback&
     54         create_invalidation_client_callback,
     55     const std::string& client_id, const std::string& client_info,
     56     const std::string& invalidation_bootstrap_data,
     57     const InvalidationStateMap& initial_invalidation_state_map,
     58     const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker,
     59     Delegate* delegate) {
     60   DCHECK(CalledOnValidThread());
     61   Stop();
     62 
     63   sync_system_resources_.set_platform(client_info);
     64   sync_system_resources_.Start();
     65 
     66   // The Storage resource is implemented as a write-through cache.  We populate
     67   // it with the initial state on startup, so subsequent writes go to disk and
     68   // update the in-memory cache, while reads just return the cached state.
     69   sync_system_resources_.storage()->SetInitialState(
     70       invalidation_bootstrap_data);
     71 
     72   invalidation_state_map_ = initial_invalidation_state_map;
     73   if (invalidation_state_map_.empty()) {
     74     DVLOG(2) << "No initial max invalidation versions for any id";
     75   } else {
     76     for (InvalidationStateMap::const_iterator it =
     77              invalidation_state_map_.begin();
     78          it != invalidation_state_map_.end(); ++it) {
     79       DVLOG(2) << "Initial max invalidation version for "
     80                << ObjectIdToString(it->first) << " is "
     81                << it->second.version;
     82     }
     83   }
     84   invalidation_state_tracker_ = invalidation_state_tracker;
     85   DCHECK(invalidation_state_tracker_.IsInitialized());
     86 
     87   DCHECK(!delegate_);
     88   DCHECK(delegate);
     89   delegate_ = delegate;
     90 
     91 #if defined(OS_IOS)
     92   int client_type = ipc::invalidation::ClientType::CHROME_SYNC_IOS;
     93 #else
     94   int client_type = ipc::invalidation::ClientType::CHROME_SYNC;
     95 #endif
     96   invalidation_client_.reset(
     97       create_invalidation_client_callback.Run(
     98           &sync_system_resources_, client_type, client_id,
     99           kApplicationName, this));
    100   invalidation_client_->Start();
    101 
    102   registration_manager_.reset(
    103       new RegistrationManager(invalidation_client_.get()));
    104 
    105   // Set up reminders for any invalidations that have not been locally
    106   // acknowledged.
    107   ObjectIdSet unacknowledged_ids;
    108   for (InvalidationStateMap::const_iterator it =
    109            invalidation_state_map_.begin();
    110        it != invalidation_state_map_.end(); ++it) {
    111     if (it->second.expected.Equals(it->second.current))
    112       continue;
    113     unacknowledged_ids.insert(it->first);
    114   }
    115   if (!unacknowledged_ids.empty())
    116     ack_tracker_.Track(unacknowledged_ids);
    117 }
    118 
    119 void SyncInvalidationListener::UpdateCredentials(
    120     const std::string& email, const std::string& token) {
    121   DCHECK(CalledOnValidThread());
    122   sync_system_resources_.network()->UpdateCredentials(email, token);
    123 }
    124 
    125 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
    126   DCHECK(CalledOnValidThread());
    127   registered_ids_ = ids;
    128   // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a
    129   // working XMPP connection (as observed by us), so check it instead
    130   // of GetState() (see http://crbug.com/139424).
    131   if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) {
    132     DoRegistrationUpdate();
    133   }
    134 }
    135 
    136 void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id,
    137                                            const AckHandle& ack_handle) {
    138   DCHECK(CalledOnValidThread());
    139   InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id);
    140   if (state_it == invalidation_state_map_.end())
    141     return;
    142   invalidation_state_tracker_.Call(
    143       FROM_HERE,
    144       &InvalidationStateTracker::Acknowledge,
    145       id,
    146       ack_handle);
    147   state_it->second.current = ack_handle;
    148   if (state_it->second.expected.Equals(ack_handle)) {
    149     // If the received ack matches the expected ack, then we no longer need to
    150     // keep track of |id| since it is up-to-date.
    151     ObjectIdSet ids;
    152     ids.insert(id);
    153     ack_tracker_.Ack(ids);
    154   }
    155 }
    156 
    157 void SyncInvalidationListener::Ready(
    158     invalidation::InvalidationClient* client) {
    159   DCHECK(CalledOnValidThread());
    160   DCHECK_EQ(client, invalidation_client_.get());
    161   ticl_state_ = INVALIDATIONS_ENABLED;
    162   EmitStateChange();
    163   DoRegistrationUpdate();
    164 }
    165 
    166 void SyncInvalidationListener::Invalidate(
    167     invalidation::InvalidationClient* client,
    168     const invalidation::Invalidation& invalidation,
    169     const invalidation::AckHandle& ack_handle) {
    170   DCHECK(CalledOnValidThread());
    171   DCHECK_EQ(client, invalidation_client_.get());
    172   DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation);
    173 
    174   const invalidation::ObjectId& id = invalidation.object_id();
    175 
    176   // The invalidation API spec allows for the possibility of redundant
    177   // invalidations, so keep track of the max versions and drop
    178   // invalidations with old versions.
    179   //
    180   // TODO(akalin): Now that we keep track of registered ids, we
    181   // should drop invalidations for unregistered ids.  We may also
    182   // have to filter it at a higher level, as invalidations for
    183   // newly-unregistered ids may already be in flight.
    184   InvalidationStateMap::const_iterator it = invalidation_state_map_.find(id);
    185   if ((it != invalidation_state_map_.end()) &&
    186       (invalidation.version() <= it->second.version)) {
    187     // Drop redundant invalidations.
    188     client->Acknowledge(ack_handle);
    189     return;
    190   }
    191 
    192   std::string payload;
    193   // payload() CHECK()'s has_payload(), so we must check it ourselves first.
    194   if (invalidation.has_payload())
    195     payload = invalidation.payload();
    196 
    197   DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id)
    198            << " to " << invalidation.version();
    199   invalidation_state_map_[id].version = invalidation.version();
    200   invalidation_state_map_[id].payload = payload;
    201   invalidation_state_tracker_.Call(
    202       FROM_HERE,
    203       &InvalidationStateTracker::SetMaxVersionAndPayload,
    204       id, invalidation.version(), payload);
    205 
    206   ObjectIdSet ids;
    207   ids.insert(id);
    208   PrepareInvalidation(ids, invalidation.version(), payload, client, ack_handle);
    209 }
    210 
    211 void SyncInvalidationListener::InvalidateUnknownVersion(
    212     invalidation::InvalidationClient* client,
    213     const invalidation::ObjectId& object_id,
    214     const invalidation::AckHandle& ack_handle) {
    215   DCHECK(CalledOnValidThread());
    216   DCHECK_EQ(client, invalidation_client_.get());
    217   DVLOG(1) << "InvalidateUnknownVersion";
    218 
    219   ObjectIdSet ids;
    220   ids.insert(object_id);
    221   PrepareInvalidation(
    222       ids,
    223       Invalidation::kUnknownVersion,
    224       std::string(),
    225       client,
    226       ack_handle);
    227 }
    228 
    229 // This should behave as if we got an invalidation with version
    230 // UNKNOWN_OBJECT_VERSION for all known data types.
    231 void SyncInvalidationListener::InvalidateAll(
    232     invalidation::InvalidationClient* client,
    233     const invalidation::AckHandle& ack_handle) {
    234   DCHECK(CalledOnValidThread());
    235   DCHECK_EQ(client, invalidation_client_.get());
    236   DVLOG(1) << "InvalidateAll";
    237 
    238   PrepareInvalidation(
    239       registered_ids_,
    240       Invalidation::kUnknownVersion,
    241       std::string(),
    242       client,
    243       ack_handle);
    244 }
    245 
    246 void SyncInvalidationListener::PrepareInvalidation(
    247     const ObjectIdSet& ids,
    248     int64 version,
    249     const std::string& payload,
    250     invalidation::InvalidationClient* client,
    251     const invalidation::AckHandle& ack_handle) {
    252   DCHECK(CalledOnValidThread());
    253 
    254   // A server invalidation resets the local retry count.
    255   ack_tracker_.Ack(ids);
    256   invalidation_state_tracker_.Call(
    257       FROM_HERE,
    258       &InvalidationStateTracker::GenerateAckHandles,
    259       ids,
    260       base::MessageLoopProxy::current(),
    261       base::Bind(&SyncInvalidationListener::EmitInvalidation,
    262                  weak_ptr_factory_.GetWeakPtr(),
    263                  ids,
    264                  version,
    265                  payload,
    266                  client,
    267                  ack_handle));
    268 }
    269 
    270 void SyncInvalidationListener::EmitInvalidation(
    271     const ObjectIdSet& ids,
    272     int64 version,
    273     const std::string& payload,
    274     invalidation::InvalidationClient* client,
    275     const invalidation::AckHandle& ack_handle,
    276     const AckHandleMap& local_ack_handles) {
    277   DCHECK(CalledOnValidThread());
    278   ObjectIdInvalidationMap invalidation_map =
    279       ObjectIdSetToInvalidationMap(ids, version, payload);
    280   for (AckHandleMap::const_iterator it = local_ack_handles.begin();
    281        it != local_ack_handles.end(); ++it) {
    282     // Update in-memory copy of the invalidation state.
    283     invalidation_state_map_[it->first].expected = it->second;
    284     invalidation_map[it->first].ack_handle = it->second;
    285   }
    286   ack_tracker_.Track(ids);
    287   delegate_->OnInvalidate(invalidation_map);
    288   client->Acknowledge(ack_handle);
    289 }
    290 
    291 void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) {
    292   ObjectIdInvalidationMap invalidation_map;
    293   for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) {
    294     Invalidation invalidation;
    295     invalidation.ack_handle = invalidation_state_map_[*it].expected;
    296     invalidation.version = invalidation_state_map_[*it].version;
    297     invalidation.payload = invalidation_state_map_[*it].payload;
    298     invalidation_map.insert(std::make_pair(*it, invalidation));
    299   }
    300 
    301   delegate_->OnInvalidate(invalidation_map);
    302 }
    303 
    304 void SyncInvalidationListener::InformRegistrationStatus(
    305       invalidation::InvalidationClient* client,
    306       const invalidation::ObjectId& object_id,
    307       InvalidationListener::RegistrationState new_state) {
    308   DCHECK(CalledOnValidThread());
    309   DCHECK_EQ(client, invalidation_client_.get());
    310   DVLOG(1) << "InformRegistrationStatus: "
    311            << ObjectIdToString(object_id) << " " << new_state;
    312 
    313   if (new_state != InvalidationListener::REGISTERED) {
    314     // Let |registration_manager_| handle the registration backoff policy.
    315     registration_manager_->MarkRegistrationLost(object_id);
    316   }
    317 }
    318 
    319 void SyncInvalidationListener::InformRegistrationFailure(
    320     invalidation::InvalidationClient* client,
    321     const invalidation::ObjectId& object_id,
    322     bool is_transient,
    323     const std::string& error_message) {
    324   DCHECK(CalledOnValidThread());
    325   DCHECK_EQ(client, invalidation_client_.get());
    326   DVLOG(1) << "InformRegistrationFailure: "
    327            << ObjectIdToString(object_id)
    328            << "is_transient=" << is_transient
    329            << ", message=" << error_message;
    330 
    331   if (is_transient) {
    332     // We don't care about |unknown_hint|; we let
    333     // |registration_manager_| handle the registration backoff policy.
    334     registration_manager_->MarkRegistrationLost(object_id);
    335   } else {
    336     // Non-transient failures require an action to resolve. This could happen
    337     // because:
    338     // - the server doesn't yet recognize the data type, which could happen for
    339     //   brand-new data types.
    340     // - the user has changed his password and hasn't updated it yet locally.
    341     // Either way, block future registration attempts for |object_id|. However,
    342     // we don't forget any saved invalidation state since we may use it once the
    343     // error is addressed.
    344     registration_manager_->DisableId(object_id);
    345   }
    346 }
    347 
    348 void SyncInvalidationListener::ReissueRegistrations(
    349     invalidation::InvalidationClient* client,
    350     const std::string& prefix,
    351     int prefix_length) {
    352   DCHECK(CalledOnValidThread());
    353   DCHECK_EQ(client, invalidation_client_.get());
    354   DVLOG(1) << "AllRegistrationsLost";
    355   registration_manager_->MarkAllRegistrationsLost();
    356 }
    357 
    358 void SyncInvalidationListener::InformError(
    359     invalidation::InvalidationClient* client,
    360     const invalidation::ErrorInfo& error_info) {
    361   DCHECK(CalledOnValidThread());
    362   DCHECK_EQ(client, invalidation_client_.get());
    363   LOG(ERROR) << "Ticl error " << error_info.error_reason() << ": "
    364              << error_info.error_message()
    365              << " (transient = " << error_info.is_transient() << ")";
    366   if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) {
    367     ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED;
    368   } else {
    369     ticl_state_ = TRANSIENT_INVALIDATION_ERROR;
    370   }
    371   EmitStateChange();
    372 }
    373 
    374 void SyncInvalidationListener::WriteState(const std::string& state) {
    375   DCHECK(CalledOnValidThread());
    376   DVLOG(1) << "WriteState";
    377   invalidation_state_tracker_.Call(
    378       FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state);
    379 }
    380 
    381 void SyncInvalidationListener::DoRegistrationUpdate() {
    382   DCHECK(CalledOnValidThread());
    383   const ObjectIdSet& unregistered_ids =
    384       registration_manager_->UpdateRegisteredIds(registered_ids_);
    385   for (ObjectIdSet::const_iterator it = unregistered_ids.begin();
    386        it != unregistered_ids.end(); ++it) {
    387     invalidation_state_map_.erase(*it);
    388   }
    389   invalidation_state_tracker_.Call(
    390       FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids);
    391   ack_tracker_.Ack(unregistered_ids);
    392 }
    393 
    394 void SyncInvalidationListener::StopForTest() {
    395   DCHECK(CalledOnValidThread());
    396   Stop();
    397 }
    398 
    399 InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const {
    400   DCHECK(CalledOnValidThread());
    401   return invalidation_state_map_;
    402 }
    403 
    404 AckTracker* SyncInvalidationListener::GetAckTrackerForTest() {
    405   return &ack_tracker_;
    406 }
    407 
    408 void SyncInvalidationListener::Stop() {
    409   DCHECK(CalledOnValidThread());
    410   if (!invalidation_client_) {
    411     return;
    412   }
    413 
    414   ack_tracker_.Clear();
    415 
    416   registration_manager_.reset();
    417   sync_system_resources_.Stop();
    418   invalidation_client_->Stop();
    419 
    420   invalidation_client_.reset();
    421   delegate_ = NULL;
    422 
    423   invalidation_state_tracker_.Reset();
    424   invalidation_state_map_.clear();
    425   ticl_state_ = DEFAULT_INVALIDATION_ERROR;
    426   push_client_state_ = DEFAULT_INVALIDATION_ERROR;
    427 }
    428 
    429 InvalidatorState SyncInvalidationListener::GetState() const {
    430   DCHECK(CalledOnValidThread());
    431   if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED ||
    432       push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) {
    433     // If either the ticl or the push client rejected our credentials,
    434     // return INVALIDATION_CREDENTIALS_REJECTED.
    435     return INVALIDATION_CREDENTIALS_REJECTED;
    436   }
    437   if (ticl_state_ == INVALIDATIONS_ENABLED &&
    438       push_client_state_ == INVALIDATIONS_ENABLED) {
    439     // If the ticl is ready and the push client notifications are
    440     // enabled, return INVALIDATIONS_ENABLED.
    441     return INVALIDATIONS_ENABLED;
    442   }
    443   // Otherwise, we have a transient error.
    444   return TRANSIENT_INVALIDATION_ERROR;
    445 }
    446 
    447 void SyncInvalidationListener::EmitStateChange() {
    448   DCHECK(CalledOnValidThread());
    449   delegate_->OnInvalidatorStateChange(GetState());
    450 }
    451 
    452 void SyncInvalidationListener::OnNotificationsEnabled() {
    453   DCHECK(CalledOnValidThread());
    454   push_client_state_ = INVALIDATIONS_ENABLED;
    455   EmitStateChange();
    456 }
    457 
    458 void SyncInvalidationListener::OnNotificationsDisabled(
    459     notifier::NotificationsDisabledReason reason) {
    460   DCHECK(CalledOnValidThread());
    461   push_client_state_ = FromNotifierReason(reason);
    462   EmitStateChange();
    463 }
    464 
    465 void SyncInvalidationListener::OnIncomingNotification(
    466     const notifier::Notification& notification) {
    467   DCHECK(CalledOnValidThread());
    468   // Do nothing, since this is already handled by |invalidation_client_|.
    469 }
    470 
    471 }  // namespace syncer
    472