Home | History | Annotate | Download | only in invalidation
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "components/invalidation/sync_invalidation_listener.h"
      6 
      7 #include <vector>
      8 
      9 #include "base/bind.h"
     10 #include "base/callback.h"
     11 #include "base/compiler_specific.h"
     12 #include "base/logging.h"
     13 #include "base/tracked_objects.h"
     14 #include "components/invalidation/invalidation_util.h"
     15 #include "components/invalidation/object_id_invalidation_map.h"
     16 #include "components/invalidation/registration_manager.h"
     17 #include "google/cacheinvalidation/include/invalidation-client.h"
     18 #include "google/cacheinvalidation/include/types.h"
     19 #include "jingle/notifier/listener/push_client.h"
     20 
     21 namespace {
     22 
     23 const char kApplicationName[] = "chrome-sync";
     24 
     25 }  // namespace
     26 
     27 namespace syncer {
     28 
     29 SyncInvalidationListener::Delegate::~Delegate() {}
     30 
     31 SyncInvalidationListener::SyncInvalidationListener(
     32     scoped_ptr<SyncNetworkChannel> network_channel)
     33     : sync_network_channel_(network_channel.Pass()),
     34       sync_system_resources_(sync_network_channel_.get(), this),
     35       delegate_(NULL),
     36       ticl_state_(DEFAULT_INVALIDATION_ERROR),
     37       push_client_state_(DEFAULT_INVALIDATION_ERROR),
     38       weak_ptr_factory_(this) {
     39   DCHECK(CalledOnValidThread());
     40   sync_network_channel_->AddObserver(this);
     41 }
     42 
     43 SyncInvalidationListener::~SyncInvalidationListener() {
     44   DCHECK(CalledOnValidThread());
     45   sync_network_channel_->RemoveObserver(this);
     46   Stop();
     47   DCHECK(!delegate_);
     48 }
     49 
     50 void SyncInvalidationListener::Start(
     51     const CreateInvalidationClientCallback& create_invalidation_client_callback,
     52     const std::string& client_id,
     53     const std::string& client_info,
     54     const std::string& invalidation_bootstrap_data,
     55     const UnackedInvalidationsMap& initial_unacked_invalidations,
     56     const base::WeakPtr<InvalidationStateTracker>& invalidation_state_tracker,
     57     const scoped_refptr<base::SequencedTaskRunner>&
     58         invalidation_state_tracker_task_runner,
     59     Delegate* delegate) {
     60   DCHECK(CalledOnValidThread());
     61   Stop();
     62 
     63   sync_system_resources_.set_platform(client_info);
     64   sync_system_resources_.Start();
     65 
     66   // The Storage resource is implemented as a write-through cache.  We populate
     67   // it with the initial state on startup, so subsequent writes go to disk and
     68   // update the in-memory cache, while reads just return the cached state.
     69   sync_system_resources_.storage()->SetInitialState(
     70       invalidation_bootstrap_data);
     71 
     72   unacked_invalidations_map_ = initial_unacked_invalidations;
     73   invalidation_state_tracker_ = invalidation_state_tracker;
     74   invalidation_state_tracker_task_runner_ =
     75       invalidation_state_tracker_task_runner;
     76   DCHECK(invalidation_state_tracker_task_runner_.get());
     77 
     78   DCHECK(!delegate_);
     79   DCHECK(delegate);
     80   delegate_ = delegate;
     81 
     82   invalidation_client_.reset(create_invalidation_client_callback.Run(
     83       &sync_system_resources_,
     84       sync_network_channel_->GetInvalidationClientType(),
     85       client_id,
     86       kApplicationName,
     87       this));
     88   invalidation_client_->Start();
     89 
     90   registration_manager_.reset(
     91       new RegistrationManager(invalidation_client_.get()));
     92 }
     93 
     94 void SyncInvalidationListener::UpdateCredentials(
     95     const std::string& email, const std::string& token) {
     96   DCHECK(CalledOnValidThread());
     97   sync_network_channel_->UpdateCredentials(email, token);
     98 }
     99 
    100 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
    101   DCHECK(CalledOnValidThread());
    102   registered_ids_ = ids;
    103   // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a
    104   // working XMPP connection (as observed by us), so check it instead
    105   // of GetState() (see http://crbug.com/139424).
    106   if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) {
    107     DoRegistrationUpdate();
    108   }
    109 }
    110 
    111 void SyncInvalidationListener::Ready(
    112     invalidation::InvalidationClient* client) {
    113   DCHECK(CalledOnValidThread());
    114   DCHECK_EQ(client, invalidation_client_.get());
    115   ticl_state_ = INVALIDATIONS_ENABLED;
    116   EmitStateChange();
    117   DoRegistrationUpdate();
    118 }
    119 
    120 void SyncInvalidationListener::Invalidate(
    121     invalidation::InvalidationClient* client,
    122     const invalidation::Invalidation& invalidation,
    123     const invalidation::AckHandle& ack_handle) {
    124   DCHECK(CalledOnValidThread());
    125   DCHECK_EQ(client, invalidation_client_.get());
    126   client->Acknowledge(ack_handle);
    127 
    128   const invalidation::ObjectId& id = invalidation.object_id();
    129 
    130   std::string payload;
    131   // payload() CHECK()'s has_payload(), so we must check it ourselves first.
    132   if (invalidation.has_payload())
    133     payload = invalidation.payload();
    134 
    135   DVLOG(2) << "Received invalidation with version " << invalidation.version()
    136            << " for " << ObjectIdToString(id);
    137 
    138   ObjectIdInvalidationMap invalidations;
    139   Invalidation inv = Invalidation::Init(id, invalidation.version(), payload);
    140   inv.SetAckHandler(AsWeakPtr(), base::MessageLoopProxy::current());
    141   invalidations.Insert(inv);
    142 
    143   DispatchInvalidations(invalidations);
    144 }
    145 
    146 void SyncInvalidationListener::InvalidateUnknownVersion(
    147     invalidation::InvalidationClient* client,
    148     const invalidation::ObjectId& object_id,
    149     const invalidation::AckHandle& ack_handle) {
    150   DCHECK(CalledOnValidThread());
    151   DCHECK_EQ(client, invalidation_client_.get());
    152   DVLOG(1) << "InvalidateUnknownVersion";
    153   client->Acknowledge(ack_handle);
    154 
    155   ObjectIdInvalidationMap invalidations;
    156   Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id);
    157   unknown_version.SetAckHandler(AsWeakPtr(), base::MessageLoopProxy::current());
    158   invalidations.Insert(unknown_version);
    159 
    160   DispatchInvalidations(invalidations);
    161 }
    162 
    163 // This should behave as if we got an invalidation with version
    164 // UNKNOWN_OBJECT_VERSION for all known data types.
    165 void SyncInvalidationListener::InvalidateAll(
    166     invalidation::InvalidationClient* client,
    167     const invalidation::AckHandle& ack_handle) {
    168   DCHECK(CalledOnValidThread());
    169   DCHECK_EQ(client, invalidation_client_.get());
    170   DVLOG(1) << "InvalidateAll";
    171   client->Acknowledge(ack_handle);
    172 
    173   ObjectIdInvalidationMap invalidations;
    174   for (ObjectIdSet::iterator it = registered_ids_.begin();
    175        it != registered_ids_.end(); ++it) {
    176     Invalidation unknown_version = Invalidation::InitUnknownVersion(*it);
    177     unknown_version.SetAckHandler(AsWeakPtr(),
    178                                   base::MessageLoopProxy::current());
    179     invalidations.Insert(unknown_version);
    180   }
    181 
    182   DispatchInvalidations(invalidations);
    183 }
    184 
    185 // If a handler is registered, emit right away.  Otherwise, save it for later.
    186 void SyncInvalidationListener::DispatchInvalidations(
    187     const ObjectIdInvalidationMap& invalidations) {
    188   DCHECK(CalledOnValidThread());
    189 
    190   ObjectIdInvalidationMap to_save = invalidations;
    191   ObjectIdInvalidationMap to_emit =
    192       invalidations.GetSubsetWithObjectIds(registered_ids_);
    193 
    194   SaveInvalidations(to_save);
    195   EmitSavedInvalidations(to_emit);
    196 }
    197 
    198 void SyncInvalidationListener::SaveInvalidations(
    199     const ObjectIdInvalidationMap& to_save) {
    200   ObjectIdSet objects_to_save = to_save.GetObjectIds();
    201   for (ObjectIdSet::const_iterator it = objects_to_save.begin();
    202        it != objects_to_save.end(); ++it) {
    203     UnackedInvalidationsMap::iterator lookup =
    204         unacked_invalidations_map_.find(*it);
    205     if (lookup == unacked_invalidations_map_.end()) {
    206       lookup = unacked_invalidations_map_.insert(
    207           std::make_pair(*it, UnackedInvalidationSet(*it))).first;
    208     }
    209     lookup->second.AddSet(to_save.ForObject(*it));
    210   }
    211 
    212   invalidation_state_tracker_task_runner_->PostTask(
    213       FROM_HERE,
    214       base::Bind(&InvalidationStateTracker::SetSavedInvalidations,
    215                  invalidation_state_tracker_,
    216                  unacked_invalidations_map_));
    217 }
    218 
    219 void SyncInvalidationListener::EmitSavedInvalidations(
    220     const ObjectIdInvalidationMap& to_emit) {
    221   DVLOG(2) << "Emitting invalidations: " << to_emit.ToString();
    222   delegate_->OnInvalidate(to_emit);
    223 }
    224 
    225 void SyncInvalidationListener::InformRegistrationStatus(
    226       invalidation::InvalidationClient* client,
    227       const invalidation::ObjectId& object_id,
    228       InvalidationListener::RegistrationState new_state) {
    229   DCHECK(CalledOnValidThread());
    230   DCHECK_EQ(client, invalidation_client_.get());
    231   DVLOG(1) << "InformRegistrationStatus: "
    232            << ObjectIdToString(object_id) << " " << new_state;
    233 
    234   if (new_state != InvalidationListener::REGISTERED) {
    235     // Let |registration_manager_| handle the registration backoff policy.
    236     registration_manager_->MarkRegistrationLost(object_id);
    237   }
    238 }
    239 
    240 void SyncInvalidationListener::InformRegistrationFailure(
    241     invalidation::InvalidationClient* client,
    242     const invalidation::ObjectId& object_id,
    243     bool is_transient,
    244     const std::string& error_message) {
    245   DCHECK(CalledOnValidThread());
    246   DCHECK_EQ(client, invalidation_client_.get());
    247   DVLOG(1) << "InformRegistrationFailure: "
    248            << ObjectIdToString(object_id)
    249            << "is_transient=" << is_transient
    250            << ", message=" << error_message;
    251 
    252   if (is_transient) {
    253     // We don't care about |unknown_hint|; we let
    254     // |registration_manager_| handle the registration backoff policy.
    255     registration_manager_->MarkRegistrationLost(object_id);
    256   } else {
    257     // Non-transient failures require an action to resolve. This could happen
    258     // because:
    259     // - the server doesn't yet recognize the data type, which could happen for
    260     //   brand-new data types.
    261     // - the user has changed his password and hasn't updated it yet locally.
    262     // Either way, block future registration attempts for |object_id|. However,
    263     // we don't forget any saved invalidation state since we may use it once the
    264     // error is addressed.
    265     registration_manager_->DisableId(object_id);
    266   }
    267 }
    268 
    269 void SyncInvalidationListener::ReissueRegistrations(
    270     invalidation::InvalidationClient* client,
    271     const std::string& prefix,
    272     int prefix_length) {
    273   DCHECK(CalledOnValidThread());
    274   DCHECK_EQ(client, invalidation_client_.get());
    275   DVLOG(1) << "AllRegistrationsLost";
    276   registration_manager_->MarkAllRegistrationsLost();
    277 }
    278 
    279 void SyncInvalidationListener::InformError(
    280     invalidation::InvalidationClient* client,
    281     const invalidation::ErrorInfo& error_info) {
    282   DCHECK(CalledOnValidThread());
    283   DCHECK_EQ(client, invalidation_client_.get());
    284   LOG(ERROR) << "Ticl error " << error_info.error_reason() << ": "
    285              << error_info.error_message()
    286              << " (transient = " << error_info.is_transient() << ")";
    287   if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) {
    288     ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED;
    289   } else {
    290     ticl_state_ = TRANSIENT_INVALIDATION_ERROR;
    291   }
    292   EmitStateChange();
    293 }
    294 
    295 void SyncInvalidationListener::Acknowledge(
    296   const invalidation::ObjectId& id,
    297   const syncer::AckHandle& handle) {
    298   UnackedInvalidationsMap::iterator lookup =
    299       unacked_invalidations_map_.find(id);
    300   if (lookup == unacked_invalidations_map_.end()) {
    301     DLOG(WARNING) << "Received acknowledgement for untracked object ID";
    302     return;
    303   }
    304   lookup->second.Acknowledge(handle);
    305   invalidation_state_tracker_task_runner_->PostTask(
    306       FROM_HERE,
    307       base::Bind(&InvalidationStateTracker::SetSavedInvalidations,
    308                  invalidation_state_tracker_,
    309                  unacked_invalidations_map_));
    310 }
    311 
    312 void SyncInvalidationListener::Drop(
    313     const invalidation::ObjectId& id,
    314     const syncer::AckHandle& handle) {
    315   UnackedInvalidationsMap::iterator lookup =
    316       unacked_invalidations_map_.find(id);
    317   if (lookup == unacked_invalidations_map_.end()) {
    318     DLOG(WARNING) << "Received drop for untracked object ID";
    319     return;
    320   }
    321   lookup->second.Drop(handle);
    322   invalidation_state_tracker_task_runner_->PostTask(
    323       FROM_HERE,
    324       base::Bind(&InvalidationStateTracker::SetSavedInvalidations,
    325                  invalidation_state_tracker_,
    326                  unacked_invalidations_map_));
    327 }
    328 
    329 void SyncInvalidationListener::WriteState(const std::string& state) {
    330   DCHECK(CalledOnValidThread());
    331   DVLOG(1) << "WriteState";
    332   invalidation_state_tracker_task_runner_->PostTask(
    333       FROM_HERE,
    334       base::Bind(&InvalidationStateTracker::SetBootstrapData,
    335                  invalidation_state_tracker_,
    336                  state));
    337 }
    338 
    339 void SyncInvalidationListener::DoRegistrationUpdate() {
    340   DCHECK(CalledOnValidThread());
    341   const ObjectIdSet& unregistered_ids =
    342       registration_manager_->UpdateRegisteredIds(registered_ids_);
    343   for (ObjectIdSet::iterator it = unregistered_ids.begin();
    344        it != unregistered_ids.end(); ++it) {
    345     unacked_invalidations_map_.erase(*it);
    346   }
    347   invalidation_state_tracker_task_runner_->PostTask(
    348       FROM_HERE,
    349       base::Bind(&InvalidationStateTracker::SetSavedInvalidations,
    350                  invalidation_state_tracker_,
    351                  unacked_invalidations_map_));
    352 
    353   ObjectIdInvalidationMap object_id_invalidation_map;
    354   for (UnackedInvalidationsMap::iterator map_it =
    355        unacked_invalidations_map_.begin();
    356        map_it != unacked_invalidations_map_.end(); ++map_it) {
    357     if (registered_ids_.find(map_it->first) == registered_ids_.end()) {
    358       continue;
    359     }
    360     map_it->second.ExportInvalidations(AsWeakPtr(),
    361                                        base::MessageLoopProxy::current(),
    362                                        &object_id_invalidation_map);
    363   }
    364 
    365   // There's no need to run these through DispatchInvalidations(); they've
    366   // already been saved to storage (that's where we found them) so all we need
    367   // to do now is emit them.
    368   EmitSavedInvalidations(object_id_invalidation_map);
    369 }
    370 
    371 void SyncInvalidationListener::RequestDetailedStatus(
    372     base::Callback<void(const base::DictionaryValue&)> callback) const {
    373   DCHECK(CalledOnValidThread());
    374   sync_network_channel_->RequestDetailedStatus(callback);
    375   callback.Run(*CollectDebugData());
    376 }
    377 
    378 scoped_ptr<base::DictionaryValue>
    379 SyncInvalidationListener::CollectDebugData() const {
    380   scoped_ptr<base::DictionaryValue> return_value(new base::DictionaryValue());
    381   return_value->SetString(
    382       "SyncInvalidationListener.PushClientState",
    383       std::string(InvalidatorStateToString(push_client_state_)));
    384   return_value->SetString("SyncInvalidationListener.TiclState",
    385                           std::string(InvalidatorStateToString(ticl_state_)));
    386   scoped_ptr<base::DictionaryValue> unacked_map(new base::DictionaryValue());
    387   for (UnackedInvalidationsMap::const_iterator it =
    388            unacked_invalidations_map_.begin();
    389        it != unacked_invalidations_map_.end();
    390        ++it) {
    391     unacked_map->Set((it->first).name(), (it->second).ToValue().release());
    392   }
    393   return_value->Set("SyncInvalidationListener.UnackedInvalidationsMap",
    394                     unacked_map.release());
    395   return return_value.Pass();
    396 }
    397 
    398 void SyncInvalidationListener::StopForTest() {
    399   DCHECK(CalledOnValidThread());
    400   Stop();
    401 }
    402 
    403 void SyncInvalidationListener::Stop() {
    404   DCHECK(CalledOnValidThread());
    405   if (!invalidation_client_) {
    406     return;
    407   }
    408 
    409   registration_manager_.reset();
    410   sync_system_resources_.Stop();
    411   invalidation_client_->Stop();
    412 
    413   invalidation_client_.reset();
    414   delegate_ = NULL;
    415 
    416   ticl_state_ = DEFAULT_INVALIDATION_ERROR;
    417   push_client_state_ = DEFAULT_INVALIDATION_ERROR;
    418 }
    419 
    420 InvalidatorState SyncInvalidationListener::GetState() const {
    421   DCHECK(CalledOnValidThread());
    422   if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED ||
    423       push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) {
    424     // If either the ticl or the push client rejected our credentials,
    425     // return INVALIDATION_CREDENTIALS_REJECTED.
    426     return INVALIDATION_CREDENTIALS_REJECTED;
    427   }
    428   if (ticl_state_ == INVALIDATIONS_ENABLED &&
    429       push_client_state_ == INVALIDATIONS_ENABLED) {
    430     // If the ticl is ready and the push client notifications are
    431     // enabled, return INVALIDATIONS_ENABLED.
    432     return INVALIDATIONS_ENABLED;
    433   }
    434   // Otherwise, we have a transient error.
    435   return TRANSIENT_INVALIDATION_ERROR;
    436 }
    437 
    438 void SyncInvalidationListener::EmitStateChange() {
    439   DCHECK(CalledOnValidThread());
    440   delegate_->OnInvalidatorStateChange(GetState());
    441 }
    442 
    443 base::WeakPtr<AckHandler> SyncInvalidationListener::AsWeakPtr() {
    444   DCHECK(CalledOnValidThread());
    445   base::WeakPtr<AckHandler> weak_ptr = weak_ptr_factory_.GetWeakPtr();
    446   weak_ptr.get();  // Binds the pointer to this thread.
    447   return weak_ptr;
    448 }
    449 
    450 void SyncInvalidationListener::OnNetworkChannelStateChanged(
    451     InvalidatorState invalidator_state) {
    452   DCHECK(CalledOnValidThread());
    453   push_client_state_ = invalidator_state;
    454   EmitStateChange();
    455 }
    456 
    457 }  // namespace syncer
    458