1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "sync/sessions/model_type_registry.h" 6 7 #include "base/bind.h" 8 #include "base/observer_list.h" 9 #include "base/thread_task_runner_handle.h" 10 #include "sync/engine/directory_commit_contributor.h" 11 #include "sync/engine/directory_update_handler.h" 12 #include "sync/engine/model_type_sync_proxy.h" 13 #include "sync/engine/model_type_sync_proxy_impl.h" 14 #include "sync/engine/model_type_sync_worker.h" 15 #include "sync/engine/model_type_sync_worker_impl.h" 16 #include "sync/internal_api/public/non_blocking_sync_common.h" 17 #include "sync/sessions/directory_type_debug_info_emitter.h" 18 #include "sync/util/cryptographer.h" 19 20 namespace syncer { 21 22 namespace { 23 24 class ModelTypeSyncProxyWrapper : public ModelTypeSyncProxy { 25 public: 26 ModelTypeSyncProxyWrapper( 27 const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy, 28 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner); 29 virtual ~ModelTypeSyncProxyWrapper(); 30 31 virtual void OnCommitCompleted( 32 const DataTypeState& type_state, 33 const CommitResponseDataList& response_list) OVERRIDE; 34 virtual void OnUpdateReceived( 35 const DataTypeState& type_state, 36 const UpdateResponseDataList& response_list, 37 const UpdateResponseDataList& pending_updates) OVERRIDE; 38 39 private: 40 base::WeakPtr<ModelTypeSyncProxyImpl> processor_; 41 scoped_refptr<base::SequencedTaskRunner> processor_task_runner_; 42 }; 43 44 ModelTypeSyncProxyWrapper::ModelTypeSyncProxyWrapper( 45 const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy, 46 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner) 47 : processor_(proxy), processor_task_runner_(processor_task_runner) { 48 } 49 50 ModelTypeSyncProxyWrapper::~ModelTypeSyncProxyWrapper() { 51 } 52 53 void ModelTypeSyncProxyWrapper::OnCommitCompleted( 54 const DataTypeState& type_state, 55 const CommitResponseDataList& response_list) { 56 processor_task_runner_->PostTask( 57 FROM_HERE, 58 base::Bind(&ModelTypeSyncProxyImpl::OnCommitCompleted, 59 processor_, 60 type_state, 61 response_list)); 62 } 63 64 void ModelTypeSyncProxyWrapper::OnUpdateReceived( 65 const DataTypeState& type_state, 66 const UpdateResponseDataList& response_list, 67 const UpdateResponseDataList& pending_updates) { 68 processor_task_runner_->PostTask( 69 FROM_HERE, 70 base::Bind(&ModelTypeSyncProxyImpl::OnUpdateReceived, 71 processor_, 72 type_state, 73 response_list, 74 pending_updates)); 75 } 76 77 class ModelTypeSyncWorkerWrapper : public ModelTypeSyncWorker { 78 public: 79 ModelTypeSyncWorkerWrapper( 80 const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker, 81 const scoped_refptr<base::SequencedTaskRunner>& sync_thread); 82 virtual ~ModelTypeSyncWorkerWrapper(); 83 84 virtual void EnqueueForCommit(const CommitRequestDataList& list) OVERRIDE; 85 86 private: 87 base::WeakPtr<ModelTypeSyncWorkerImpl> worker_; 88 scoped_refptr<base::SequencedTaskRunner> sync_thread_; 89 }; 90 91 ModelTypeSyncWorkerWrapper::ModelTypeSyncWorkerWrapper( 92 const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker, 93 const scoped_refptr<base::SequencedTaskRunner>& sync_thread) 94 : worker_(worker), sync_thread_(sync_thread) { 95 } 96 97 ModelTypeSyncWorkerWrapper::~ModelTypeSyncWorkerWrapper() { 98 } 99 100 void ModelTypeSyncWorkerWrapper::EnqueueForCommit( 101 const CommitRequestDataList& list) { 102 sync_thread_->PostTask( 103 FROM_HERE, 104 base::Bind(&ModelTypeSyncWorkerImpl::EnqueueForCommit, worker_, list)); 105 } 106 107 } // namespace 108 109 ModelTypeRegistry::ModelTypeRegistry( 110 const std::vector<scoped_refptr<ModelSafeWorker> >& workers, 111 syncable::Directory* directory, 112 NudgeHandler* nudge_handler) 113 : directory_(directory), 114 nudge_handler_(nudge_handler), 115 weak_ptr_factory_(this) { 116 for (size_t i = 0u; i < workers.size(); ++i) { 117 workers_map_.insert( 118 std::make_pair(workers[i]->GetModelSafeGroup(), workers[i])); 119 } 120 } 121 122 ModelTypeRegistry::~ModelTypeRegistry() { 123 } 124 125 void ModelTypeRegistry::SetEnabledDirectoryTypes( 126 const ModelSafeRoutingInfo& routing_info) { 127 // Remove all existing directory processors and delete them. The 128 // DebugInfoEmitters are not deleted here, since we want to preserve their 129 // counters. 130 for (ModelTypeSet::Iterator it = enabled_directory_types_.First(); 131 it.Good(); it.Inc()) { 132 size_t result1 = update_handler_map_.erase(it.Get()); 133 size_t result2 = commit_contributor_map_.erase(it.Get()); 134 DCHECK_EQ(1U, result1); 135 DCHECK_EQ(1U, result2); 136 } 137 138 // Clear the old instances of directory update handlers and commit 139 // contributors, deleting their contents in the processs. 140 directory_update_handlers_.clear(); 141 directory_commit_contributors_.clear(); 142 143 // Create new ones and add them to the appropriate containers. 144 for (ModelSafeRoutingInfo::const_iterator routing_iter = routing_info.begin(); 145 routing_iter != routing_info.end(); ++routing_iter) { 146 ModelType type = routing_iter->first; 147 ModelSafeGroup group = routing_iter->second; 148 std::map<ModelSafeGroup, scoped_refptr<ModelSafeWorker> >::iterator 149 worker_it = workers_map_.find(group); 150 DCHECK(worker_it != workers_map_.end()); 151 scoped_refptr<ModelSafeWorker> worker = worker_it->second; 152 153 // DebugInfoEmitters are never deleted. Use existing one if we have it. 154 DirectoryTypeDebugInfoEmitter* emitter = NULL; 155 DirectoryTypeDebugInfoEmitterMap::iterator it = 156 directory_type_debug_info_emitter_map_.find(type); 157 if (it != directory_type_debug_info_emitter_map_.end()) { 158 emitter = it->second; 159 } else { 160 emitter = new DirectoryTypeDebugInfoEmitter(directory_, type, 161 &type_debug_info_observers_); 162 directory_type_debug_info_emitter_map_.insert( 163 std::make_pair(type, emitter)); 164 directory_type_debug_info_emitters_.push_back(emitter); 165 } 166 167 DirectoryCommitContributor* committer = 168 new DirectoryCommitContributor(directory_, type, emitter); 169 DirectoryUpdateHandler* updater = 170 new DirectoryUpdateHandler(directory_, type, worker, emitter); 171 172 // These containers take ownership of their contents. 173 directory_commit_contributors_.push_back(committer); 174 directory_update_handlers_.push_back(updater); 175 176 bool inserted1 = 177 update_handler_map_.insert(std::make_pair(type, updater)).second; 178 DCHECK(inserted1) << "Attempt to override existing type handler in map"; 179 180 bool inserted2 = 181 commit_contributor_map_.insert(std::make_pair(type, committer)).second; 182 DCHECK(inserted2) << "Attempt to override existing type handler in map"; 183 } 184 185 enabled_directory_types_ = GetRoutingInfoTypes(routing_info); 186 DCHECK(Intersection(GetEnabledDirectoryTypes(), 187 GetEnabledNonBlockingTypes()).Empty()); 188 } 189 190 void ModelTypeRegistry::ConnectSyncTypeToWorker( 191 ModelType type, 192 const DataTypeState& data_type_state, 193 const UpdateResponseDataList& saved_pending_updates, 194 const scoped_refptr<base::SequencedTaskRunner>& type_task_runner, 195 const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy_impl) { 196 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type); 197 198 // Initialize Worker -> Proxy communication channel. 199 scoped_ptr<ModelTypeSyncProxy> proxy( 200 new ModelTypeSyncProxyWrapper(proxy_impl, type_task_runner)); 201 scoped_ptr<Cryptographer> cryptographer_copy; 202 if (encrypted_types_.Has(type)) 203 cryptographer_copy.reset(new Cryptographer(*cryptographer_)); 204 205 scoped_ptr<ModelTypeSyncWorkerImpl> worker( 206 new ModelTypeSyncWorkerImpl(type, 207 data_type_state, 208 saved_pending_updates, 209 cryptographer_copy.Pass(), 210 nudge_handler_, 211 proxy.Pass())); 212 213 // Initialize Proxy -> Worker communication channel. 214 scoped_ptr<ModelTypeSyncWorker> wrapped_worker( 215 new ModelTypeSyncWorkerWrapper(worker->AsWeakPtr(), 216 scoped_refptr<base::SequencedTaskRunner>( 217 base::ThreadTaskRunnerHandle::Get()))); 218 type_task_runner->PostTask(FROM_HERE, 219 base::Bind(&ModelTypeSyncProxyImpl::OnConnect, 220 proxy_impl, 221 base::Passed(&wrapped_worker))); 222 223 DCHECK(update_handler_map_.find(type) == update_handler_map_.end()); 224 DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end()); 225 226 update_handler_map_.insert(std::make_pair(type, worker.get())); 227 commit_contributor_map_.insert(std::make_pair(type, worker.get())); 228 229 // The container takes ownership. 230 model_type_sync_workers_.push_back(worker.release()); 231 232 DCHECK(Intersection(GetEnabledDirectoryTypes(), 233 GetEnabledNonBlockingTypes()).Empty()); 234 } 235 236 void ModelTypeRegistry::DisconnectSyncWorker(ModelType type) { 237 DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type); 238 DCHECK(update_handler_map_.find(type) != update_handler_map_.end()); 239 DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end()); 240 241 size_t updaters_erased = update_handler_map_.erase(type); 242 size_t committers_erased = commit_contributor_map_.erase(type); 243 244 DCHECK_EQ(1U, updaters_erased); 245 DCHECK_EQ(1U, committers_erased); 246 247 // Remove from the ScopedVector, deleting the worker in the process. 248 for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it = 249 model_type_sync_workers_.begin(); 250 it != model_type_sync_workers_.end(); 251 ++it) { 252 if ((*it)->GetModelType() == type) { 253 model_type_sync_workers_.erase(it); 254 break; 255 } 256 } 257 } 258 259 ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const { 260 return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes()); 261 } 262 263 UpdateHandlerMap* ModelTypeRegistry::update_handler_map() { 264 return &update_handler_map_; 265 } 266 267 CommitContributorMap* ModelTypeRegistry::commit_contributor_map() { 268 return &commit_contributor_map_; 269 } 270 271 DirectoryTypeDebugInfoEmitterMap* 272 ModelTypeRegistry::directory_type_debug_info_emitter_map() { 273 return &directory_type_debug_info_emitter_map_; 274 } 275 276 void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver( 277 syncer::TypeDebugInfoObserver* observer) { 278 if (!type_debug_info_observers_.HasObserver(observer)) 279 type_debug_info_observers_.AddObserver(observer); 280 } 281 282 void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver( 283 syncer::TypeDebugInfoObserver* observer) { 284 type_debug_info_observers_.RemoveObserver(observer); 285 } 286 287 bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver( 288 syncer::TypeDebugInfoObserver* observer) { 289 return type_debug_info_observers_.HasObserver(observer); 290 } 291 292 void ModelTypeRegistry::RequestEmitDebugInfo() { 293 for (DirectoryTypeDebugInfoEmitterMap::iterator it = 294 directory_type_debug_info_emitter_map_.begin(); 295 it != directory_type_debug_info_emitter_map_.end(); ++it) { 296 it->second->EmitCommitCountersUpdate(); 297 it->second->EmitUpdateCountersUpdate(); 298 it->second->EmitStatusCountersUpdate(); 299 } 300 } 301 302 base::WeakPtr<SyncContext> ModelTypeRegistry::AsWeakPtr() { 303 return weak_ptr_factory_.GetWeakPtr(); 304 } 305 306 void ModelTypeRegistry::OnPassphraseRequired( 307 PassphraseRequiredReason reason, 308 const sync_pb::EncryptedData& pending_keys) { 309 } 310 311 void ModelTypeRegistry::OnPassphraseAccepted() { 312 } 313 314 void ModelTypeRegistry::OnBootstrapTokenUpdated( 315 const std::string& bootstrap_token, 316 BootstrapTokenType type) { 317 } 318 319 void ModelTypeRegistry::OnEncryptedTypesChanged(ModelTypeSet encrypted_types, 320 bool encrypt_everything) { 321 encrypted_types_ = encrypted_types; 322 OnEncryptionStateChanged(); 323 } 324 325 void ModelTypeRegistry::OnEncryptionComplete() { 326 } 327 328 void ModelTypeRegistry::OnCryptographerStateChanged( 329 Cryptographer* cryptographer) { 330 cryptographer_.reset(new Cryptographer(*cryptographer)); 331 OnEncryptionStateChanged(); 332 } 333 334 void ModelTypeRegistry::OnPassphraseTypeChanged(PassphraseType type, 335 base::Time passphrase_time) { 336 } 337 338 ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const { 339 return enabled_directory_types_; 340 } 341 342 void ModelTypeRegistry::OnEncryptionStateChanged() { 343 for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it = 344 model_type_sync_workers_.begin(); 345 it != model_type_sync_workers_.end(); 346 ++it) { 347 if (encrypted_types_.Has((*it)->GetModelType())) { 348 (*it)->UpdateCryptographer( 349 make_scoped_ptr(new Cryptographer(*cryptographer_))); 350 } 351 } 352 } 353 354 ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const { 355 ModelTypeSet enabled_off_thread_types; 356 for (ScopedVector<ModelTypeSyncWorkerImpl>::const_iterator it = 357 model_type_sync_workers_.begin(); 358 it != model_type_sync_workers_.end(); 359 ++it) { 360 enabled_off_thread_types.Put((*it)->GetModelType()); 361 } 362 return enabled_off_thread_types; 363 } 364 365 } // namespace syncer 366