1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "sync/engine/non_blocking_type_processor.h" 6 7 #include "base/bind.h" 8 #include "base/location.h" 9 #include "base/message_loop/message_loop_proxy.h" 10 #include "sync/engine/model_thread_sync_entity.h" 11 #include "sync/engine/non_blocking_type_processor_core_interface.h" 12 #include "sync/internal_api/public/sync_core_proxy.h" 13 #include "sync/syncable/syncable_util.h" 14 15 namespace syncer { 16 17 NonBlockingTypeProcessor::NonBlockingTypeProcessor(ModelType type) 18 : type_(type), 19 is_preferred_(false), 20 is_connected_(false), 21 entities_deleter_(&entities_), 22 weak_ptr_factory_for_ui_(this), 23 weak_ptr_factory_for_sync_(this) { 24 } 25 26 NonBlockingTypeProcessor::~NonBlockingTypeProcessor() { 27 } 28 29 bool NonBlockingTypeProcessor::IsPreferred() const { 30 DCHECK(CalledOnValidThread()); 31 return is_preferred_; 32 } 33 34 bool NonBlockingTypeProcessor::IsConnected() const { 35 DCHECK(CalledOnValidThread()); 36 return is_connected_; 37 } 38 39 ModelType NonBlockingTypeProcessor::GetModelType() const { 40 DCHECK(CalledOnValidThread()); 41 return type_; 42 } 43 44 void NonBlockingTypeProcessor::Enable( 45 scoped_ptr<SyncCoreProxy> sync_core_proxy) { 46 DCHECK(CalledOnValidThread()); 47 DVLOG(1) << "Asked to enable " << ModelTypeToString(type_); 48 49 is_preferred_ = true; 50 51 // TODO(rlarocque): At some point, this should be loaded from storage. 52 data_type_state_.progress_marker.set_data_type_id( 53 GetSpecificsFieldNumberFromModelType(type_)); 54 55 sync_core_proxy_ = sync_core_proxy.Pass(); 56 sync_core_proxy_->ConnectTypeToCore(GetModelType(), 57 data_type_state_, 58 weak_ptr_factory_for_sync_.GetWeakPtr()); 59 } 60 61 void NonBlockingTypeProcessor::Disable() { 62 DCHECK(CalledOnValidThread()); 63 is_preferred_ = false; 64 Disconnect(); 65 } 66 67 void NonBlockingTypeProcessor::Disconnect() { 68 DCHECK(CalledOnValidThread()); 69 DVLOG(1) << "Asked to disconnect " << ModelTypeToString(type_); 70 is_connected_ = false; 71 72 if (sync_core_proxy_) { 73 sync_core_proxy_->Disconnect(GetModelType()); 74 sync_core_proxy_.reset(); 75 } 76 77 weak_ptr_factory_for_sync_.InvalidateWeakPtrs(); 78 core_interface_.reset(); 79 } 80 81 base::WeakPtr<NonBlockingTypeProcessor> 82 NonBlockingTypeProcessor::AsWeakPtrForUI() { 83 DCHECK(CalledOnValidThread()); 84 return weak_ptr_factory_for_ui_.GetWeakPtr(); 85 } 86 87 void NonBlockingTypeProcessor::OnConnect( 88 scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface) { 89 DCHECK(CalledOnValidThread()); 90 DVLOG(1) << "Successfully connected " << ModelTypeToString(type_); 91 92 is_connected_ = true; 93 core_interface_ = core_interface.Pass(); 94 95 FlushPendingCommitRequests(); 96 } 97 98 void NonBlockingTypeProcessor::Put(const std::string& client_tag, 99 const sync_pb::EntitySpecifics& specifics) { 100 DCHECK_EQ(type_, GetModelTypeFromSpecifics(specifics)); 101 102 const std::string client_tag_hash( 103 syncable::GenerateSyncableHash(type_, client_tag)); 104 105 EntityMap::iterator it = entities_.find(client_tag_hash); 106 if (it == entities_.end()) { 107 scoped_ptr<ModelThreadSyncEntity> entity( 108 ModelThreadSyncEntity::NewLocalItem( 109 client_tag, specifics, base::Time::Now())); 110 entities_.insert(std::make_pair(client_tag_hash, entity.release())); 111 } else { 112 ModelThreadSyncEntity* entity = it->second; 113 entity->MakeLocalChange(specifics); 114 } 115 116 FlushPendingCommitRequests(); 117 } 118 119 void NonBlockingTypeProcessor::Delete(const std::string& client_tag) { 120 const std::string client_tag_hash( 121 syncable::GenerateSyncableHash(type_, client_tag)); 122 123 EntityMap::iterator it = entities_.find(client_tag_hash); 124 if (it == entities_.end()) { 125 // That's unusual, but not necessarily a bad thing. 126 // Missing is as good as deleted as far as the model is concerned. 127 DLOG(WARNING) << "Attempted to delete missing item." 128 << " client tag: " << client_tag; 129 } else { 130 ModelThreadSyncEntity* entity = it->second; 131 entity->Delete(); 132 } 133 134 FlushPendingCommitRequests(); 135 } 136 137 void NonBlockingTypeProcessor::FlushPendingCommitRequests() { 138 CommitRequestDataList commit_requests; 139 140 // Don't bother sending anything if there's no one to send to. 141 if (!IsConnected()) 142 return; 143 144 // Don't send anything if the type is not ready to handle commits. 145 if (!data_type_state_.initial_sync_done) 146 return; 147 148 // TODO(rlarocque): Do something smarter than iterate here. 149 for (EntityMap::iterator it = entities_.begin(); it != entities_.end(); 150 ++it) { 151 if (it->second->RequiresCommitRequest()) { 152 CommitRequestData request; 153 it->second->InitializeCommitRequestData(&request); 154 commit_requests.push_back(request); 155 it->second->SetCommitRequestInProgress(); 156 } 157 } 158 159 if (!commit_requests.empty()) 160 core_interface_->RequestCommits(commit_requests); 161 } 162 163 void NonBlockingTypeProcessor::OnCommitCompletion( 164 const DataTypeState& type_state, 165 const CommitResponseDataList& response_list) { 166 data_type_state_ = type_state; 167 168 for (CommitResponseDataList::const_iterator list_it = response_list.begin(); 169 list_it != response_list.end(); 170 ++list_it) { 171 const CommitResponseData& response_data = *list_it; 172 const std::string& client_tag_hash = response_data.client_tag_hash; 173 174 EntityMap::iterator it = entities_.find(client_tag_hash); 175 if (it == entities_.end()) { 176 NOTREACHED() << "Received commit response for missing item." 177 << " type: " << type_ << " client_tag: " << client_tag_hash; 178 return; 179 } else { 180 it->second->ReceiveCommitResponse(response_data.id, 181 response_data.sequence_number, 182 response_data.response_version); 183 } 184 } 185 } 186 187 void NonBlockingTypeProcessor::OnUpdateReceived( 188 const DataTypeState& data_type_state, 189 const UpdateResponseDataList& response_list) { 190 bool initial_sync_just_finished = 191 !data_type_state_.initial_sync_done && data_type_state.initial_sync_done; 192 193 data_type_state_ = data_type_state; 194 195 for (UpdateResponseDataList::const_iterator list_it = response_list.begin(); 196 list_it != response_list.end(); 197 ++list_it) { 198 const UpdateResponseData& response_data = *list_it; 199 const std::string& client_tag_hash = response_data.client_tag_hash; 200 201 EntityMap::iterator it = entities_.find(client_tag_hash); 202 if (it == entities_.end()) { 203 scoped_ptr<ModelThreadSyncEntity> entity = 204 ModelThreadSyncEntity::FromServerUpdate( 205 response_data.id, 206 response_data.client_tag_hash, 207 response_data.non_unique_name, 208 response_data.response_version, 209 response_data.specifics, 210 response_data.deleted, 211 response_data.ctime, 212 response_data.mtime); 213 entities_.insert(std::make_pair(client_tag_hash, entity.release())); 214 } else { 215 ModelThreadSyncEntity* entity = it->second; 216 entity->ApplyUpdateFromServer(response_data.response_version, 217 response_data.deleted, 218 response_data.specifics, 219 response_data.mtime); 220 // TODO: Do something special when conflicts are detected. 221 } 222 } 223 224 if (initial_sync_just_finished) 225 FlushPendingCommitRequests(); 226 227 // TODO: Inform the model of the new or updated data. 228 } 229 230 } // namespace syncer 231