Home | History | Annotate | Download | only in notifier
      1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "chrome/browser/sync/notifier/chrome_invalidation_client.h"
      6 
      7 #include <string>
      8 #include <vector>
      9 
     10 #include "base/compiler_specific.h"
     11 #include "base/logging.h"
     12 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
     13 #include "chrome/browser/sync/notifier/invalidation_util.h"
     14 #include "chrome/browser/sync/notifier/registration_manager.h"
     15 #include "chrome/browser/sync/syncable/model_type.h"
     16 #include "google/cacheinvalidation/invalidation-client-impl.h"
     17 
     18 namespace sync_notifier {
     19 
     20 ChromeInvalidationClient::Listener::~Listener() {}
     21 
     22 ChromeInvalidationClient::ChromeInvalidationClient()
     23     : chrome_system_resources_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
     24       scoped_callback_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
     25       handle_outbound_packet_callback_(
     26           scoped_callback_factory_.NewCallback(
     27               &ChromeInvalidationClient::HandleOutboundPacket)),
     28       listener_(NULL),
     29       state_writer_(NULL) {
     30   DCHECK(non_thread_safe_.CalledOnValidThread());
     31 }
     32 
     33 ChromeInvalidationClient::~ChromeInvalidationClient() {
     34   DCHECK(non_thread_safe_.CalledOnValidThread());
     35   Stop();
     36   DCHECK(!listener_);
     37   DCHECK(!state_writer_);
     38 }
     39 
     40 void ChromeInvalidationClient::Start(
     41     const std::string& client_id, const std::string& client_info,
     42     const std::string& state, Listener* listener,
     43     StateWriter* state_writer, base::WeakPtr<talk_base::Task> base_task) {
     44   DCHECK(non_thread_safe_.CalledOnValidThread());
     45   Stop();
     46 
     47   chrome_system_resources_.StartScheduler();
     48 
     49   DCHECK(!listener_);
     50   DCHECK(listener);
     51   listener_ = listener;
     52   DCHECK(!state_writer_);
     53   DCHECK(state_writer);
     54   state_writer_ = state_writer;
     55 
     56   invalidation::ClientType client_type;
     57   client_type.set_type(invalidation::ClientType::CHROME_SYNC);
     58   // TODO(akalin): Use InvalidationClient::Create() once it supports
     59   // taking a ClientConfig.
     60   invalidation::ClientConfig client_config;
     61   // Bump up limits so that we reduce the number of registration
     62   // replies we get.
     63   client_config.max_registrations_per_message = 20;
     64   client_config.max_ops_per_message = 40;
     65   invalidation_client_.reset(
     66       new invalidation::InvalidationClientImpl(
     67           &chrome_system_resources_, client_type, client_id, client_info,
     68           client_config, this));
     69   invalidation_client_->Start(state);
     70   invalidation::NetworkEndpoint* network_endpoint =
     71       invalidation_client_->network_endpoint();
     72   CHECK(network_endpoint);
     73   network_endpoint->RegisterOutboundListener(
     74       handle_outbound_packet_callback_.get());
     75   ChangeBaseTask(base_task);
     76   registration_manager_.reset(
     77       new RegistrationManager(invalidation_client_.get()));
     78   registration_manager_->SetRegisteredTypes(registered_types_);
     79 }
     80 
     81 void ChromeInvalidationClient::ChangeBaseTask(
     82     base::WeakPtr<talk_base::Task> base_task) {
     83   DCHECK(invalidation_client_.get());
     84   DCHECK(base_task.get());
     85   cache_invalidation_packet_handler_.reset(
     86       new CacheInvalidationPacketHandler(base_task,
     87                                          invalidation_client_.get()));
     88 }
     89 
     90 void ChromeInvalidationClient::Stop() {
     91   DCHECK(non_thread_safe_.CalledOnValidThread());
     92   if (!invalidation_client_.get()) {
     93     DCHECK(!cache_invalidation_packet_handler_.get());
     94     return;
     95   }
     96 
     97   chrome_system_resources_.StopScheduler();
     98 
     99   registration_manager_.reset();
    100   cache_invalidation_packet_handler_.reset();
    101   invalidation_client_.reset();
    102   state_writer_ = NULL;
    103   listener_ = NULL;
    104 }
    105 
    106 void ChromeInvalidationClient::RegisterTypes(
    107     const syncable::ModelTypeSet& types) {
    108   DCHECK(non_thread_safe_.CalledOnValidThread());
    109   registered_types_ = types;
    110   if (registration_manager_.get()) {
    111     registration_manager_->SetRegisteredTypes(registered_types_);
    112   }
    113   // TODO(akalin): Clear invalidation versions for unregistered types.
    114 }
    115 
    116 void ChromeInvalidationClient::Invalidate(
    117     const invalidation::Invalidation& invalidation,
    118     invalidation::Closure* callback) {
    119   DCHECK(non_thread_safe_.CalledOnValidThread());
    120   DCHECK(invalidation::IsCallbackRepeatable(callback));
    121   VLOG(1) << "Invalidate: " << InvalidationToString(invalidation);
    122   syncable::ModelType model_type;
    123   if (!ObjectIdToRealModelType(invalidation.object_id(), &model_type)) {
    124     LOG(WARNING) << "Could not get invalidation model type; "
    125                  << "invalidating everything";
    126     EmitInvalidation(registered_types_, std::string());
    127     RunAndDeleteClosure(callback);
    128     return;
    129   }
    130   // The invalidation API spec allows for the possibility of redundant
    131   // invalidations, so keep track of the max versions and drop
    132   // invalidations with old versions.
    133   //
    134   // TODO(akalin): Now that we keep track of registered types, we
    135   // should drop invalidations for unregistered types.  We may also
    136   // have to filter it at a higher level, as invalidations for
    137   // newly-unregistered types may already be in flight.
    138   //
    139   // TODO(akalin): Persist |max_invalidation_versions_| somehow.
    140   if (invalidation.version() != UNKNOWN_OBJECT_VERSION) {
    141     std::map<syncable::ModelType, int64>::const_iterator it =
    142         max_invalidation_versions_.find(model_type);
    143     if ((it != max_invalidation_versions_.end()) &&
    144         (invalidation.version() <= it->second)) {
    145       // Drop redundant invalidations.
    146       RunAndDeleteClosure(callback);
    147       return;
    148     }
    149     max_invalidation_versions_[model_type] = invalidation.version();
    150   }
    151 
    152   std::string payload;
    153   // payload() CHECK()'s has_payload(), so we must check it ourselves first.
    154   if (invalidation.has_payload())
    155     payload = invalidation.payload();
    156 
    157   syncable::ModelTypeSet types;
    158   types.insert(model_type);
    159   EmitInvalidation(types, payload);
    160   // TODO(akalin): We should really |callback| only after we get the
    161   // updates from the sync server. (see http://crbug.com/78462).
    162   RunAndDeleteClosure(callback);
    163 }
    164 
    165 // This should behave as if we got an invalidation with version
    166 // UNKNOWN_OBJECT_VERSION for all known data types.
    167 void ChromeInvalidationClient::InvalidateAll(
    168     invalidation::Closure* callback) {
    169   DCHECK(non_thread_safe_.CalledOnValidThread());
    170   DCHECK(invalidation::IsCallbackRepeatable(callback));
    171   VLOG(1) << "InvalidateAll";
    172   EmitInvalidation(registered_types_, std::string());
    173   // TODO(akalin): We should really |callback| only after we get the
    174   // updates from the sync server. (see http://crbug.com/76482).
    175   RunAndDeleteClosure(callback);
    176 }
    177 
    178 void ChromeInvalidationClient::EmitInvalidation(
    179     const syncable::ModelTypeSet& types, const std::string& payload) {
    180   DCHECK(non_thread_safe_.CalledOnValidThread());
    181   // TODO(akalin): Move all uses of ModelTypeBitSet for invalidations
    182   // to ModelTypeSet.
    183   syncable::ModelTypePayloadMap type_payloads =
    184       syncable::ModelTypePayloadMapFromBitSet(
    185           syncable::ModelTypeBitSetFromSet(types), payload);
    186   listener_->OnInvalidate(type_payloads);
    187 }
    188 
    189 void ChromeInvalidationClient::RegistrationStateChanged(
    190     const invalidation::ObjectId& object_id,
    191     invalidation::RegistrationState new_state,
    192     const invalidation::UnknownHint& unknown_hint) {
    193   DCHECK(non_thread_safe_.CalledOnValidThread());
    194   VLOG(1) << "RegistrationStateChanged: "
    195           << ObjectIdToString(object_id) << " " << new_state;
    196   if (new_state == invalidation::RegistrationState_UNKNOWN) {
    197     VLOG(1) << "is_transient=" << unknown_hint.is_transient()
    198             << ", message=" << unknown_hint.message();
    199   }
    200 
    201   syncable::ModelType model_type;
    202   if (!ObjectIdToRealModelType(object_id, &model_type)) {
    203     LOG(WARNING) << "Could not get object id model type; ignoring";
    204     return;
    205   }
    206 
    207   if (new_state != invalidation::RegistrationState_REGISTERED) {
    208     // We don't care about |unknown_hint|; we let
    209     // |registration_manager_| handle the registration backoff policy.
    210     registration_manager_->MarkRegistrationLost(model_type);
    211   }
    212 }
    213 
    214 void ChromeInvalidationClient::AllRegistrationsLost(
    215     invalidation::Closure* callback) {
    216   DCHECK(non_thread_safe_.CalledOnValidThread());
    217   DCHECK(invalidation::IsCallbackRepeatable(callback));
    218   VLOG(1) << "AllRegistrationsLost";
    219   registration_manager_->MarkAllRegistrationsLost();
    220   RunAndDeleteClosure(callback);
    221 }
    222 
    223 void ChromeInvalidationClient::SessionStatusChanged(bool has_session) {
    224   VLOG(1) << "SessionStatusChanged: " << has_session;
    225   listener_->OnSessionStatusChanged(has_session);
    226 }
    227 
    228 void ChromeInvalidationClient::WriteState(const std::string& state) {
    229   DCHECK(non_thread_safe_.CalledOnValidThread());
    230   CHECK(state_writer_);
    231   state_writer_->WriteState(state);
    232 }
    233 
    234 void ChromeInvalidationClient::HandleOutboundPacket(
    235     invalidation::NetworkEndpoint* const& network_endpoint) {
    236   DCHECK(non_thread_safe_.CalledOnValidThread());
    237   CHECK(cache_invalidation_packet_handler_.get());
    238   cache_invalidation_packet_handler_->
    239       HandleOutboundPacket(network_endpoint);
    240 }
    241 
    242 }  // namespace sync_notifier
    243