1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "chrome/browser/sync/notifier/chrome_invalidation_client.h" 6 7 #include <string> 8 #include <vector> 9 10 #include "base/compiler_specific.h" 11 #include "base/logging.h" 12 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" 13 #include "chrome/browser/sync/notifier/invalidation_util.h" 14 #include "chrome/browser/sync/notifier/registration_manager.h" 15 #include "chrome/browser/sync/syncable/model_type.h" 16 #include "google/cacheinvalidation/invalidation-client-impl.h" 17 18 namespace sync_notifier { 19 20 ChromeInvalidationClient::Listener::~Listener() {} 21 22 ChromeInvalidationClient::ChromeInvalidationClient() 23 : chrome_system_resources_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), 24 scoped_callback_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), 25 handle_outbound_packet_callback_( 26 scoped_callback_factory_.NewCallback( 27 &ChromeInvalidationClient::HandleOutboundPacket)), 28 listener_(NULL), 29 state_writer_(NULL) { 30 DCHECK(non_thread_safe_.CalledOnValidThread()); 31 } 32 33 ChromeInvalidationClient::~ChromeInvalidationClient() { 34 DCHECK(non_thread_safe_.CalledOnValidThread()); 35 Stop(); 36 DCHECK(!listener_); 37 DCHECK(!state_writer_); 38 } 39 40 void ChromeInvalidationClient::Start( 41 const std::string& client_id, const std::string& client_info, 42 const std::string& state, Listener* listener, 43 StateWriter* state_writer, base::WeakPtr<talk_base::Task> base_task) { 44 DCHECK(non_thread_safe_.CalledOnValidThread()); 45 Stop(); 46 47 chrome_system_resources_.StartScheduler(); 48 49 DCHECK(!listener_); 50 DCHECK(listener); 51 listener_ = listener; 52 DCHECK(!state_writer_); 53 DCHECK(state_writer); 54 state_writer_ = state_writer; 55 56 invalidation::ClientType client_type; 57 client_type.set_type(invalidation::ClientType::CHROME_SYNC); 58 // TODO(akalin): Use InvalidationClient::Create() once it supports 59 // taking a ClientConfig. 60 invalidation::ClientConfig client_config; 61 // Bump up limits so that we reduce the number of registration 62 // replies we get. 63 client_config.max_registrations_per_message = 20; 64 client_config.max_ops_per_message = 40; 65 invalidation_client_.reset( 66 new invalidation::InvalidationClientImpl( 67 &chrome_system_resources_, client_type, client_id, client_info, 68 client_config, this)); 69 invalidation_client_->Start(state); 70 invalidation::NetworkEndpoint* network_endpoint = 71 invalidation_client_->network_endpoint(); 72 CHECK(network_endpoint); 73 network_endpoint->RegisterOutboundListener( 74 handle_outbound_packet_callback_.get()); 75 ChangeBaseTask(base_task); 76 registration_manager_.reset( 77 new RegistrationManager(invalidation_client_.get())); 78 registration_manager_->SetRegisteredTypes(registered_types_); 79 } 80 81 void ChromeInvalidationClient::ChangeBaseTask( 82 base::WeakPtr<talk_base::Task> base_task) { 83 DCHECK(invalidation_client_.get()); 84 DCHECK(base_task.get()); 85 cache_invalidation_packet_handler_.reset( 86 new CacheInvalidationPacketHandler(base_task, 87 invalidation_client_.get())); 88 } 89 90 void ChromeInvalidationClient::Stop() { 91 DCHECK(non_thread_safe_.CalledOnValidThread()); 92 if (!invalidation_client_.get()) { 93 DCHECK(!cache_invalidation_packet_handler_.get()); 94 return; 95 } 96 97 chrome_system_resources_.StopScheduler(); 98 99 registration_manager_.reset(); 100 cache_invalidation_packet_handler_.reset(); 101 invalidation_client_.reset(); 102 state_writer_ = NULL; 103 listener_ = NULL; 104 } 105 106 void ChromeInvalidationClient::RegisterTypes( 107 const syncable::ModelTypeSet& types) { 108 DCHECK(non_thread_safe_.CalledOnValidThread()); 109 registered_types_ = types; 110 if (registration_manager_.get()) { 111 registration_manager_->SetRegisteredTypes(registered_types_); 112 } 113 // TODO(akalin): Clear invalidation versions for unregistered types. 114 } 115 116 void ChromeInvalidationClient::Invalidate( 117 const invalidation::Invalidation& invalidation, 118 invalidation::Closure* callback) { 119 DCHECK(non_thread_safe_.CalledOnValidThread()); 120 DCHECK(invalidation::IsCallbackRepeatable(callback)); 121 VLOG(1) << "Invalidate: " << InvalidationToString(invalidation); 122 syncable::ModelType model_type; 123 if (!ObjectIdToRealModelType(invalidation.object_id(), &model_type)) { 124 LOG(WARNING) << "Could not get invalidation model type; " 125 << "invalidating everything"; 126 EmitInvalidation(registered_types_, std::string()); 127 RunAndDeleteClosure(callback); 128 return; 129 } 130 // The invalidation API spec allows for the possibility of redundant 131 // invalidations, so keep track of the max versions and drop 132 // invalidations with old versions. 133 // 134 // TODO(akalin): Now that we keep track of registered types, we 135 // should drop invalidations for unregistered types. We may also 136 // have to filter it at a higher level, as invalidations for 137 // newly-unregistered types may already be in flight. 138 // 139 // TODO(akalin): Persist |max_invalidation_versions_| somehow. 140 if (invalidation.version() != UNKNOWN_OBJECT_VERSION) { 141 std::map<syncable::ModelType, int64>::const_iterator it = 142 max_invalidation_versions_.find(model_type); 143 if ((it != max_invalidation_versions_.end()) && 144 (invalidation.version() <= it->second)) { 145 // Drop redundant invalidations. 146 RunAndDeleteClosure(callback); 147 return; 148 } 149 max_invalidation_versions_[model_type] = invalidation.version(); 150 } 151 152 std::string payload; 153 // payload() CHECK()'s has_payload(), so we must check it ourselves first. 154 if (invalidation.has_payload()) 155 payload = invalidation.payload(); 156 157 syncable::ModelTypeSet types; 158 types.insert(model_type); 159 EmitInvalidation(types, payload); 160 // TODO(akalin): We should really |callback| only after we get the 161 // updates from the sync server. (see http://crbug.com/78462). 162 RunAndDeleteClosure(callback); 163 } 164 165 // This should behave as if we got an invalidation with version 166 // UNKNOWN_OBJECT_VERSION for all known data types. 167 void ChromeInvalidationClient::InvalidateAll( 168 invalidation::Closure* callback) { 169 DCHECK(non_thread_safe_.CalledOnValidThread()); 170 DCHECK(invalidation::IsCallbackRepeatable(callback)); 171 VLOG(1) << "InvalidateAll"; 172 EmitInvalidation(registered_types_, std::string()); 173 // TODO(akalin): We should really |callback| only after we get the 174 // updates from the sync server. (see http://crbug.com/76482). 175 RunAndDeleteClosure(callback); 176 } 177 178 void ChromeInvalidationClient::EmitInvalidation( 179 const syncable::ModelTypeSet& types, const std::string& payload) { 180 DCHECK(non_thread_safe_.CalledOnValidThread()); 181 // TODO(akalin): Move all uses of ModelTypeBitSet for invalidations 182 // to ModelTypeSet. 183 syncable::ModelTypePayloadMap type_payloads = 184 syncable::ModelTypePayloadMapFromBitSet( 185 syncable::ModelTypeBitSetFromSet(types), payload); 186 listener_->OnInvalidate(type_payloads); 187 } 188 189 void ChromeInvalidationClient::RegistrationStateChanged( 190 const invalidation::ObjectId& object_id, 191 invalidation::RegistrationState new_state, 192 const invalidation::UnknownHint& unknown_hint) { 193 DCHECK(non_thread_safe_.CalledOnValidThread()); 194 VLOG(1) << "RegistrationStateChanged: " 195 << ObjectIdToString(object_id) << " " << new_state; 196 if (new_state == invalidation::RegistrationState_UNKNOWN) { 197 VLOG(1) << "is_transient=" << unknown_hint.is_transient() 198 << ", message=" << unknown_hint.message(); 199 } 200 201 syncable::ModelType model_type; 202 if (!ObjectIdToRealModelType(object_id, &model_type)) { 203 LOG(WARNING) << "Could not get object id model type; ignoring"; 204 return; 205 } 206 207 if (new_state != invalidation::RegistrationState_REGISTERED) { 208 // We don't care about |unknown_hint|; we let 209 // |registration_manager_| handle the registration backoff policy. 210 registration_manager_->MarkRegistrationLost(model_type); 211 } 212 } 213 214 void ChromeInvalidationClient::AllRegistrationsLost( 215 invalidation::Closure* callback) { 216 DCHECK(non_thread_safe_.CalledOnValidThread()); 217 DCHECK(invalidation::IsCallbackRepeatable(callback)); 218 VLOG(1) << "AllRegistrationsLost"; 219 registration_manager_->MarkAllRegistrationsLost(); 220 RunAndDeleteClosure(callback); 221 } 222 223 void ChromeInvalidationClient::SessionStatusChanged(bool has_session) { 224 VLOG(1) << "SessionStatusChanged: " << has_session; 225 listener_->OnSessionStatusChanged(has_session); 226 } 227 228 void ChromeInvalidationClient::WriteState(const std::string& state) { 229 DCHECK(non_thread_safe_.CalledOnValidThread()); 230 CHECK(state_writer_); 231 state_writer_->WriteState(state); 232 } 233 234 void ChromeInvalidationClient::HandleOutboundPacket( 235 invalidation::NetworkEndpoint* const& network_endpoint) { 236 DCHECK(non_thread_safe_.CalledOnValidThread()); 237 CHECK(cache_invalidation_packet_handler_.get()); 238 cache_invalidation_packet_handler_-> 239 HandleOutboundPacket(network_endpoint); 240 } 241 242 } // namespace sync_notifier 243