1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "sync/engine/model_type_sync_proxy_impl.h" 6 7 #include "base/bind.h" 8 #include "base/location.h" 9 #include "sync/engine/model_type_entity.h" 10 #include "sync/engine/model_type_sync_worker.h" 11 #include "sync/internal_api/public/sync_context_proxy.h" 12 #include "sync/syncable/syncable_util.h" 13 14 namespace syncer { 15 16 ModelTypeSyncProxyImpl::ModelTypeSyncProxyImpl(ModelType type) 17 : type_(type), 18 is_preferred_(false), 19 is_connected_(false), 20 entities_deleter_(&entities_), 21 pending_updates_map_deleter_(&pending_updates_map_), 22 weak_ptr_factory_for_ui_(this), 23 weak_ptr_factory_for_sync_(this) { 24 } 25 26 ModelTypeSyncProxyImpl::~ModelTypeSyncProxyImpl() { 27 } 28 29 bool ModelTypeSyncProxyImpl::IsPreferred() const { 30 DCHECK(CalledOnValidThread()); 31 return is_preferred_; 32 } 33 34 bool ModelTypeSyncProxyImpl::IsConnected() const { 35 DCHECK(CalledOnValidThread()); 36 return is_connected_; 37 } 38 39 ModelType ModelTypeSyncProxyImpl::GetModelType() const { 40 DCHECK(CalledOnValidThread()); 41 return type_; 42 } 43 44 void ModelTypeSyncProxyImpl::Enable( 45 scoped_ptr<SyncContextProxy> sync_context_proxy) { 46 DCHECK(CalledOnValidThread()); 47 DVLOG(1) << "Asked to enable " << ModelTypeToString(type_); 48 49 is_preferred_ = true; 50 51 // TODO(rlarocque): At some point, this should be loaded from storage. 52 data_type_state_.progress_marker.set_data_type_id( 53 GetSpecificsFieldNumberFromModelType(type_)); 54 55 UpdateResponseDataList saved_pending_updates = GetPendingUpdates(); 56 sync_context_proxy_ = sync_context_proxy.Pass(); 57 sync_context_proxy_->ConnectTypeToSync( 58 GetModelType(), 59 data_type_state_, 60 saved_pending_updates, 61 weak_ptr_factory_for_sync_.GetWeakPtr()); 62 } 63 64 void ModelTypeSyncProxyImpl::Disable() { 65 DCHECK(CalledOnValidThread()); 66 is_preferred_ = false; 67 Disconnect(); 68 69 ClearSyncState(); 70 } 71 72 void ModelTypeSyncProxyImpl::Disconnect() { 73 DCHECK(CalledOnValidThread()); 74 DVLOG(1) << "Asked to disconnect " << ModelTypeToString(type_); 75 is_connected_ = false; 76 77 if (sync_context_proxy_) { 78 sync_context_proxy_->Disconnect(GetModelType()); 79 sync_context_proxy_.reset(); 80 } 81 82 weak_ptr_factory_for_sync_.InvalidateWeakPtrs(); 83 worker_.reset(); 84 85 ClearTransientSyncState(); 86 } 87 88 base::WeakPtr<ModelTypeSyncProxyImpl> ModelTypeSyncProxyImpl::AsWeakPtrForUI() { 89 DCHECK(CalledOnValidThread()); 90 return weak_ptr_factory_for_ui_.GetWeakPtr(); 91 } 92 93 void ModelTypeSyncProxyImpl::OnConnect(scoped_ptr<ModelTypeSyncWorker> worker) { 94 DCHECK(CalledOnValidThread()); 95 DVLOG(1) << "Successfully connected " << ModelTypeToString(type_); 96 97 is_connected_ = true; 98 worker_ = worker.Pass(); 99 100 FlushPendingCommitRequests(); 101 } 102 103 void ModelTypeSyncProxyImpl::Put(const std::string& client_tag, 104 const sync_pb::EntitySpecifics& specifics) { 105 DCHECK_EQ(type_, GetModelTypeFromSpecifics(specifics)); 106 107 const std::string client_tag_hash( 108 syncable::GenerateSyncableHash(type_, client_tag)); 109 110 EntityMap::iterator it = entities_.find(client_tag_hash); 111 if (it == entities_.end()) { 112 scoped_ptr<ModelTypeEntity> entity(ModelTypeEntity::NewLocalItem( 113 client_tag, specifics, base::Time::Now())); 114 entities_.insert(std::make_pair(client_tag_hash, entity.release())); 115 } else { 116 ModelTypeEntity* entity = it->second; 117 entity->MakeLocalChange(specifics); 118 } 119 120 FlushPendingCommitRequests(); 121 } 122 123 void ModelTypeSyncProxyImpl::Delete(const std::string& client_tag) { 124 const std::string client_tag_hash( 125 syncable::GenerateSyncableHash(type_, client_tag)); 126 127 EntityMap::iterator it = entities_.find(client_tag_hash); 128 if (it == entities_.end()) { 129 // That's unusual, but not necessarily a bad thing. 130 // Missing is as good as deleted as far as the model is concerned. 131 DLOG(WARNING) << "Attempted to delete missing item." 132 << " client tag: " << client_tag; 133 } else { 134 ModelTypeEntity* entity = it->second; 135 entity->Delete(); 136 } 137 138 FlushPendingCommitRequests(); 139 } 140 141 void ModelTypeSyncProxyImpl::FlushPendingCommitRequests() { 142 CommitRequestDataList commit_requests; 143 144 // Don't bother sending anything if there's no one to send to. 145 if (!IsConnected()) 146 return; 147 148 // Don't send anything if the type is not ready to handle commits. 149 if (!data_type_state_.initial_sync_done) 150 return; 151 152 // TODO(rlarocque): Do something smarter than iterate here. 153 for (EntityMap::iterator it = entities_.begin(); it != entities_.end(); 154 ++it) { 155 if (it->second->RequiresCommitRequest()) { 156 CommitRequestData request; 157 it->second->InitializeCommitRequestData(&request); 158 commit_requests.push_back(request); 159 it->second->SetCommitRequestInProgress(); 160 } 161 } 162 163 if (!commit_requests.empty()) 164 worker_->EnqueueForCommit(commit_requests); 165 } 166 167 void ModelTypeSyncProxyImpl::OnCommitCompleted( 168 const DataTypeState& type_state, 169 const CommitResponseDataList& response_list) { 170 data_type_state_ = type_state; 171 172 for (CommitResponseDataList::const_iterator list_it = response_list.begin(); 173 list_it != response_list.end(); 174 ++list_it) { 175 const CommitResponseData& response_data = *list_it; 176 const std::string& client_tag_hash = response_data.client_tag_hash; 177 178 EntityMap::iterator it = entities_.find(client_tag_hash); 179 if (it == entities_.end()) { 180 NOTREACHED() << "Received commit response for missing item." 181 << " type: " << type_ << " client_tag: " << client_tag_hash; 182 return; 183 } else { 184 it->second->ReceiveCommitResponse(response_data.id, 185 response_data.sequence_number, 186 response_data.response_version, 187 data_type_state_.encryption_key_name); 188 } 189 } 190 } 191 192 void ModelTypeSyncProxyImpl::OnUpdateReceived( 193 const DataTypeState& data_type_state, 194 const UpdateResponseDataList& response_list, 195 const UpdateResponseDataList& pending_updates) { 196 bool got_new_encryption_requirements = data_type_state_.encryption_key_name != 197 data_type_state.encryption_key_name; 198 199 data_type_state_ = data_type_state; 200 201 for (UpdateResponseDataList::const_iterator list_it = response_list.begin(); 202 list_it != response_list.end(); 203 ++list_it) { 204 const UpdateResponseData& response_data = *list_it; 205 const std::string& client_tag_hash = response_data.client_tag_hash; 206 207 UpdateMap::iterator old_it = pending_updates_map_.find(client_tag_hash); 208 if (old_it != pending_updates_map_.end()) { 209 // If we're being asked to apply an update to this entity, this overrides 210 // the previous pending updates. 211 delete old_it->second; 212 pending_updates_map_.erase(old_it); 213 } 214 215 EntityMap::iterator it = entities_.find(client_tag_hash); 216 if (it == entities_.end()) { 217 scoped_ptr<ModelTypeEntity> entity = 218 ModelTypeEntity::FromServerUpdate(response_data.id, 219 response_data.client_tag_hash, 220 response_data.non_unique_name, 221 response_data.response_version, 222 response_data.specifics, 223 response_data.deleted, 224 response_data.ctime, 225 response_data.mtime, 226 response_data.encryption_key_name); 227 entities_.insert(std::make_pair(client_tag_hash, entity.release())); 228 } else { 229 ModelTypeEntity* entity = it->second; 230 entity->ApplyUpdateFromServer(response_data.response_version, 231 response_data.deleted, 232 response_data.specifics, 233 response_data.mtime, 234 response_data.encryption_key_name); 235 236 // TODO: Do something special when conflicts are detected. 237 } 238 239 // If the received entity has out of date encryption, we schedule another 240 // commit to fix it. 241 if (data_type_state_.encryption_key_name != 242 response_data.encryption_key_name) { 243 DVLOG(2) << ModelTypeToString(type_) << ": Requesting re-encrypt commit " 244 << response_data.encryption_key_name << " -> " 245 << data_type_state_.encryption_key_name; 246 EntityMap::iterator it2 = entities_.find(client_tag_hash); 247 it2->second->UpdateDesiredEncryptionKey( 248 data_type_state_.encryption_key_name); 249 } 250 } 251 252 // Save pending updates in the appropriate data structure. 253 for (UpdateResponseDataList::const_iterator list_it = pending_updates.begin(); 254 list_it != pending_updates.end(); 255 ++list_it) { 256 const UpdateResponseData& update = *list_it; 257 const std::string& client_tag_hash = update.client_tag_hash; 258 259 UpdateMap::iterator lookup_it = pending_updates_map_.find(client_tag_hash); 260 if (lookup_it == pending_updates_map_.end()) { 261 pending_updates_map_.insert( 262 std::make_pair(client_tag_hash, new UpdateResponseData(update))); 263 } else if (lookup_it->second->response_version <= update.response_version) { 264 delete lookup_it->second; 265 pending_updates_map_.erase(lookup_it); 266 pending_updates_map_.insert( 267 std::make_pair(client_tag_hash, new UpdateResponseData(update))); 268 } else { 269 // Received update is stale, do not overwrite existing. 270 } 271 } 272 273 if (got_new_encryption_requirements) { 274 for (EntityMap::iterator it = entities_.begin(); it != entities_.end(); 275 ++it) { 276 it->second->UpdateDesiredEncryptionKey( 277 data_type_state_.encryption_key_name); 278 } 279 } 280 281 // We may have new reasons to commit by the time this function is done. 282 FlushPendingCommitRequests(); 283 284 // TODO: Inform the model of the new or updated data. 285 // TODO: Persist the new data on disk. 286 } 287 288 UpdateResponseDataList ModelTypeSyncProxyImpl::GetPendingUpdates() { 289 UpdateResponseDataList pending_updates_list; 290 for (UpdateMap::const_iterator it = pending_updates_map_.begin(); 291 it != pending_updates_map_.end(); 292 ++it) { 293 pending_updates_list.push_back(*it->second); 294 } 295 return pending_updates_list; 296 } 297 298 void ModelTypeSyncProxyImpl::ClearTransientSyncState() { 299 for (EntityMap::iterator it = entities_.begin(); it != entities_.end(); 300 ++it) { 301 it->second->ClearTransientSyncState(); 302 } 303 } 304 305 void ModelTypeSyncProxyImpl::ClearSyncState() { 306 for (EntityMap::iterator it = entities_.begin(); it != entities_.end(); 307 ++it) { 308 it->second->ClearSyncState(); 309 } 310 STLDeleteValues(&pending_updates_map_); 311 data_type_state_ = DataTypeState(); 312 } 313 314 } // namespace syncer 315